· 6 years ago · Jan 11, 2020, 07:18 PM
1"""A streaming python pipeline to read in PUBSUB tweets and perform
2classification"""
3
4from __future__ import absolute_import
5
6import argparse
7import datetime
8import json
9import logging
10import numpy as np
11import os
12
13import apache_beam as beam
14import apache_beam.transforms.window as window
15
16from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
17from apache_beam.options.pipeline_options import StandardOptions
18from apache_beam.options.pipeline_options import GoogleCloudOptions
19from apache_beam.options.pipeline_options import SetupOptions
20from apache_beam.options.pipeline_options import PipelineOptions
21from apache_beam.transforms.util import BatchElements
22
23from googleapiclient import discovery
24
25# Project parameters.
26PROJECT_ID = os.getenv('PROJECT_ID')
27PIPELINE_MODE = 'DataflowRunner'
28SERVICE_URL = (
29 'https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json')
30BUCKET = 'project-id-dev'
31STAGING_LOCATION = 'gs://project-id-dev/project-id-twitter/staging'
32TEMP_LOCATION = 'gs://project-id-dev/project-id-twitter/tmp'
33REGION = 'us-central1'
34DATASET = 'project_dataset_twitter'
35TWITTER_TABLE = 'twitter_posts_test'
36TWITTER_TABLE_SENTIMENT = 'twitter_mean_sentiment_test'
37MODEL_URL = 'projects/project-id-257304/models/twitter'
38
39_RATE_LIMIT_RETRIES = 3
40_RETRY_DELAY = 1
41_RETRY_MULTIPLIER = 1
42_RETRY_MAX_DELAY = 4
43
44api_client = None
45
46
47def main(argv=None):
48 """Main pipeline run def.
49
50 :param argv:
51 :return:
52 """
53
54 def initialize_api():
55 """
56
57 :return:
58 """
59 global api_client
60 try:
61 if api_client is None:
62 api_client = discovery.build('ml', 'v1',
63 discoveryServiceUrl=SERVICE_URL,
64 cache_discovery=True)
65 except Exception as e:
66 logging.exception(e)
67
68 def aggregate_format(key_values):
69 """
70
71 :param key_values:
72 :return:
73 """
74 # Aggregate tweets per 10 second window
75 (key, values) = key_values
76
77 mean_sentiment = np.mean([tweet['sentiment'] for tweet in values])
78 mean_timestamp = datetime.datetime.utcfromtimestamp(np.mean([
79 (datetime.datetime.strptime(tweet["posted_at"],
80 '%Y-%m-%d %H:%M:%S') -
81 datetime.datetime.fromtimestamp(
82 0)).total_seconds() for tweet in values
83 ]))
84
85 logging.info("mean sentiment")
86 logging.info(mean_sentiment)
87 logging.info("mean timestamp")
88 logging.info(mean_timestamp)
89 # Return in correct format, according to BQ schema
90 return {"posted_at": mean_timestamp.strftime('%Y-%m-%d %H:%M:%S'),
91 "sentiment": mean_sentiment}
92
93 def predict(instances):
94 """
95 Calls the AI Platform to get predictions.
96
97 Args:
98 instances: list of strings.
99 Returns:
100 float: estimated values
101 """
102 # Init the Platform API
103 initialize_api()
104 logging.info('Making request to the AI Platform API')
105 # Call the model
106 try:
107 response = api_client.projects().predict(
108 body={'instances': instances},
109 name=MODEL_URL).execute()
110 # Read out the scores
111 values = [item['score'] for item in response['predictions']]
112 return values
113 except Exception as e:
114 logging.exception(e)
115
116 def prediction_handler(messages):
117 """
118
119 :param messages:
120 :return:
121 """
122 # Handle single string
123 if not isinstance(messages, list):
124 messages = [messages]
125 # Messages from PubSub are JSON strings
126 instances = list(map(lambda message: json.loads(message), messages))
127 # Estimate the sentiment of the 'text' of each tweet.
128 scores = predict([instance['text'] for instance in instances])
129 # Join them together
130 if scores:
131 for i, instance in enumerate(instances):
132 instance['sentiment'] = scores[i]
133 logging.info('First message in batch:')
134 logging.info(instances[0])
135 return instances
136 else:
137 logging.error('No scores')
138 return
139
140 # Make explicit BQ schema for output tables:
141 """Tweets tables."""
142 bigquery_schema_json = '{"fields": [' \
143 '{"name":"id","type":"STRING"},' \
144 '{"name":"text","type":"STRING"},' \
145 '{"name":"user_id","type":"STRING"},' \
146 '{"name":"sentiment","type":"FLOAT"},' \
147 '{"name":"posted_at","type":"TIMESTAMP"}' \
148 ']}'
149 bigquery_schema = parse_table_schema_from_json(bigquery_schema_json)
150 """Tweets sentiment tables."""
151 bigquery_schema_mean_json = '{"fields": [' \
152 '{"name":"posted_at","type":"TIMESTAMP"},' \
153 '{"name":"sentiment","type":"FLOAT"}' \
154 ']}'
155 bigquery_schema_mean = parse_table_schema_from_json(
156 bigquery_schema_mean_json)
157
158 """Build and run the pipeline."""
159 parser = argparse.ArgumentParser()
160 group = parser.add_mutually_exclusive_group(required=False)
161 group.add_argument(
162 '--input_subscription',
163 help=('Input PubSub subscription of the form: '
164 '"projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>."'),
165 default="projects/project-id-257304/subscriptions/project-id-test"
166
167 )
168 group.add_argument(
169 '--input_topic',
170 help=('Input PubSub topic of the form: '
171 '"projects/<PROJECT_ID>/topics/<TOPIC>."'),
172 default="projects/project-id-257304/topics/project-id-test"
173 )
174
175 known_args, pipeline_args = parser.parse_known_args(argv)
176
177 # We use the save_main_session option because one or more DoFn's in this
178 # workflow rely on global context (e.g., a module imported at module level).
179 pipeline_options = PipelineOptions(pipeline_args)
180 pipeline_options.view_as(SetupOptions).save_main_session = True
181 pipeline_options.view_as(StandardOptions).streaming = True
182
183 # Run on Cloud DataFlow by default
184 pipeline_options.view_as(StandardOptions).runner = PIPELINE_MODE
185 google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
186 google_cloud_options.project = PROJECT_ID
187 google_cloud_options.staging_location = STAGING_LOCATION
188 google_cloud_options.temp_location = TEMP_LOCATION
189 google_cloud_options.region = REGION
190
191 p = beam.Pipeline(options=pipeline_options)
192
193 # Read from PubSub into a PCollection.
194 if known_args.input_subscription:
195 lines = p | 'read in tweets' >> beam.io.ReadFromPubSub(
196 subscription=known_args.input_subscription,
197 with_attributes=False,
198 id_label='tweet_id'
199 )
200 else:
201 lines = p | 'read in tweets' >> beam.io.ReadFromPubSub(
202 topic=known_args.input_topic,
203 with_attributes=False,
204 id_label='tweet_id')
205
206 # Window them, and batch them into batches of 50
207 output_tweets = (lines
208 | 'assign window key' >> beam.WindowInto(
209 window.FixedWindows(10))
210 | 'batch into n batches' >> BatchElements(
211 min_batch_size=99, max_batch_size=100)
212 | 'predict sentiment' >> beam.FlatMap(
213 lambda messages: prediction_handler(messages))
214 )
215
216 # Write to BigQuery/
217 output_tweets | 'store twitter posts' >> beam.io.WriteToBigQuery(
218 table=TWITTER_TABLE,
219 dataset=DATASET,
220 schema=bigquery_schema,
221 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
222 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
223 project=PROJECT_ID
224 )
225
226 # Average out and log the mean value
227 (output_tweets
228 | 'pair with key' >> beam.Map(lambda x: (1, x))
229 | 'group by key' >> beam.GroupByKey()
230 | 'aggregate and format' >> beam.Map(aggregate_format)
231 | 'store aggregated sentiment' >> beam.io.WriteToBigQuery(
232 table=TWITTER_TABLE_SENTIMENT,
233 dataset=DATASET,
234 schema=bigquery_schema_mean,
235 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
236 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
237 project=PROJECT_ID))
238
239 result = p.run()
240 result.wait_until_finish()
241
242
243if __name__ == '__main__':
244 logging.getLogger().setLevel(logging.INFO)
245 main()