· 6 years ago · Dec 07, 2019, 06:54 AM
1"""A streaming python pipeline to read in PUBSUB tweets and perform
2classification"""
3
4from __future__ import absolute_import
5
6import argparse
7import datetime
8import json
9import logging
10import numpy as np
11import os
12
13import apache_beam as beam
14import apache_beam.transforms.window as window
15from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
16from apache_beam.options.pipeline_options import StandardOptions
17from apache_beam.options.pipeline_options import GoogleCloudOptions
18from apache_beam.options.pipeline_options import SetupOptions
19from apache_beam.options.pipeline_options import PipelineOptions
20
21from apache_beam.transforms.util import BatchElements
22
23from googleapiclient import discovery
24
25
26PROJECT_ID = os.getenv('PROJECT_ID')
27PIPELINE_MODE = 'DataflowRunner'
28DISCOVERY_SERVICE = 'https://storage.googleapis.com/cloud-ml/discovery' \
29 '/ml_v1_discovery.json'
30BUCKET = 'my-project-dev'
31STAGING_LOCATION = 'gs://my-project-dev/my-project-twitter/staging'
32TEMP_LOCATION = 'gs://my-project-dev/my-project-twitter/tmp'
33REGION = 'us-central1'
34DATASET = 'newsml_dataset_twitter'
35TWITTER_TABLE = 'twitter_posts'
36TWITTER_TABLE_SENTIMENT = 'twitter_mean_sentiment'
37MODEL_URL = 'projects/my-project-257304/models/twitter'
38
39
40service = None
41
42
43def init_service():
44 """
45
46 :return:
47 """
48 global service
49 try:
50 if service is None:
51 service = discovery.build('ml', 'v1',
52 discoveryServiceUrl=DISCOVERY_SERVICE,
53 cache_discovery=True)
54 except Exception as e:
55 logging.exception(e)
56
57
58def aggregate_format(key_values):
59 # Aggregate tweets per 10 second window
60 (key, values) = key_values
61
62 mean_sentiment = np.mean([x['sentiment'] for x in values])
63 mean_timestamp = datetime.datetime.utcfromtimestamp(np.mean([
64 (datetime.datetime.strptime(x["posted_at"],
65 '%Y-%m-%d %H:%M:%S') -
66 datetime.datetime.fromtimestamp(
67 0)).total_seconds() for x in values
68 ]))
69
70 logging.info("mean sentiment")
71 logging.info(mean_sentiment)
72
73 logging.info("mean timestamp")
74 logging.info(mean_timestamp)
75
76 # Return in correct format, according to BQ schema
77 return {"posted_at": mean_timestamp.strftime('%Y-%m-%d %H:%M:%S'),
78 "sentiment": mean_sentiment}
79
80
81def prediction(instances):
82 """
83 Calls the tweet_sentiment_classifier API on AI Platform to get predictions.
84
85 Args:
86 instances: list of strings.
87 Returns:
88 float: estimated values
89 """
90
91 # Init the Platform API
92 init_service()
93
94 request_data = {'instances': instances}
95 logging.info("Making request to the ML API")
96
97 # Call the model
98 response = service.projects().predict(body=request_data,
99 name=MODEL_URL).execute()
100
101 # Read out the scores
102 values = [item["score"] for item in response['predictions']]
103 return values
104
105
106def estimate(messages):
107 # Be able to cope with a single string as well
108 if not isinstance(messages, list):
109 messages = [messages]
110
111 # Messages from PubSub are JSON strings
112 instances = list(map(lambda message: json.loads(message), messages))
113
114 # Estimate the sentiment of the 'text' of each tweet
115 scores = prediction([instance["text"] for instance in instances])
116
117 # Join them together
118 for i, instance in enumerate(instances):
119 instance['sentiment'] = scores[i]
120
121 logging.info("First message in batch:")
122 logging.info(instances[0])
123
124 return instances
125
126
127def run(argv=None):
128 """Main pipeline run def.
129
130 :param argv:
131 :return:
132 """
133
134 # Make explicit BQ schema for output tables:
135 # Tweets tables.
136 bigqueryschema_json = '{"fields": [' \
137 '{"name":"id","type":"STRING"},' \
138 '{"name":"text","type":"STRING"},' \
139 '{"name":"user_id","type":"STRING"},' \
140 '{"name":"sentiment","type":"FLOAT"},' \
141 '{"name":"posted_at","type":"TIMESTAMP"}' \
142 ']}'
143 bigqueryschema = parse_table_schema_from_json(bigqueryschema_json)
144 # Tweets sentiment tables.
145 bigqueryschema_mean_json = '{"fields": [' \
146 '{"name":"posted_at","type":"TIMESTAMP"},' \
147 '{"name":"sentiment","type":"FLOAT"}' \
148 ']}'
149 bigqueryschema_mean = parse_table_schema_from_json(bigqueryschema_mean_json)
150
151 """Build and run the pipeline."""
152 parser = argparse.ArgumentParser()
153 group = parser.add_mutually_exclusive_group(required=False)
154 group.add_argument(
155 '--input_subscription',
156 help=('Input PubSub subscription of the form '
157 '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'),
158 default="projects/my-project-257304/subscriptions/my-project-twitter"
159
160 )
161 group.add_argument(
162 '--input_topic',
163 help=('Input PubSub topic of the form '
164 '"projects/<PROJECT>/topics/<TOPIC>."'),
165 default="projects/my-project-257304/topics/my-project-twitter"
166 )
167
168 known_args, pipeline_args = parser.parse_known_args(argv)
169
170 # We use the save_main_session option because one or more DoFn's in this
171 # workflow rely on global context (e.g., a module imported at module level).
172 pipeline_options = PipelineOptions(pipeline_args)
173 pipeline_options.view_as(SetupOptions).save_main_session = True
174 pipeline_options.view_as(StandardOptions).streaming = True
175
176 # Run on Cloud DataFlow by default
177 pipeline_options.view_as(StandardOptions).runner = PIPELINE_MODE
178 google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
179 google_cloud_options.project = PROJECT_ID
180 google_cloud_options.staging_location = STAGING_LOCATION
181 google_cloud_options.temp_location = TEMP_LOCATION
182 google_cloud_options.region = REGION
183
184 p = beam.Pipeline(options=pipeline_options)
185
186 # Read from PubSub into a PCollection.
187 if known_args.input_subscription:
188 lines = p | "read in tweets" >> beam.io.ReadFromPubSub(
189 subscription=known_args.input_subscription,
190 with_attributes=False,
191 id_label="tweet_id"
192 )
193 else:
194 lines = p | "read in tweets" >> beam.io.ReadFromPubSub(
195 topic=known_args.input_topic,
196 with_attributes=False,
197 id_label="tweet_id")
198
199 # Window them, and batch them into batches of 50 (not too large)
200 output_tweets = (lines
201 | 'Assign window key' >> beam.WindowInto(
202 window.FixedWindows(10))
203 | 'Batch into n batches' >> BatchElements(
204 min_batch_size=49, max_batch_size=50)
205 | 'Predict sentiment' >> beam.FlatMap(
206 lambda messages: estimate(messages))
207 )
208
209 # Write to Bigquery
210 output_tweets | 'store twitter posts' >> beam.io.WriteToBigQuery(
211 table=TWITTER_TABLE,
212 dataset=DATASET,
213 schema=bigqueryschema,
214 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
215 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
216 project=PROJECT_ID
217 )
218
219 # Average out and log the mean value
220 (output_tweets
221 | 'pair with key' >> beam.Map(lambda x: (1, x))
222 | 'group by key' >> beam.GroupByKey()
223 | 'aggregate and format' >> beam.Map(aggregate_format)
224 | 'store aggregated sentiment' >> beam.io.WriteToBigQuery(
225 table=TWITTER_TABLE_SENTIMENT,
226 dataset=DATASET,
227 schema=bigqueryschema_mean,
228 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
229 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
230 project=PROJECT_ID))
231 result = p.run()
232 result.wait_until_finish()
233
234
235if __name__ == '__main__':
236 logging.getLogger().setLevel(logging.INFO)
237 run()