· 5 years ago · Oct 04, 2020, 08:02 AM
1
2# coding: utf-8
3
4# # Homework 2
5#
6# In this homework, we are going to play with Twitter data.
7#
8# The data is represented as rows of of [JSON](https://en.wikipedia.org/wiki/JSON#Example) strings.
9# It consists of [tweets](https://dev.twitter.com/overview/api/tweets), [messages](https://dev.twitter.com/streaming/overview/messages-types), and a small amount of broken data (cannot be parsed as JSON).
10#
11# For this homework, we will only focus on tweets and ignore all other messages.
12#
13#
14# ## Tweets
15#
16# A tweet consists of many data fields. [Here is an example](https://gist.github.com/arapat/03d02c9b327e6ff3f6c3c5c602eeaf8b). You can learn all about them in the Twitter API doc. We are going to briefly introduce only the data fields that will be used in this homework.
17#
18# * `created_at`: Posted time of this tweet (time zone is included)
19# * `id_str`: Tweet ID - we recommend using `id_str` over using `id` as Tweet IDs, becauase `id` is an integer and may bring some overflow problems.
20# * `text`: Tweet content
21# * `user`: A JSON object for information about the author of the tweet
22# * `id_str`: User ID
23# * `name`: User name (may contain spaces)
24# * `screen_name`: User screen name (no spaces)
25# * `retweeted_status`: A JSON object for information about the retweeted tweet (i.e. this tweet is not original but retweeteed some other tweet)
26# * All data fields of a tweet except `retweeted_status`
27# * `entities`: A JSON object for all entities in this tweet
28# * `hashtags`: An array for all the hashtags that are mentioned in this tweet
29# * `urls`: An array for all the URLs that are mentioned in this tweet
30#
31#
32# ## Data source
33#
34# All tweets are collected using the [Twitter Streaming API](https://dev.twitter.com/streaming/overview).
35#
36#
37# ## Users partition
38#
39# Besides the original tweets, we will provide you with a Pickle file, which contains a partition over 452,743 Twitter users. It contains a Python dictionary `{user_id: partition_id}`. The users are partitioned into 7 groups.
40
41# In[2]:
42
43
44import os
45os.environ['PYSPARK_PYTHON'] = "/usr/bin/python3"
46os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/bin/python3"
47
48# You can load pickle_content from a file on the local file system
49# while testing on your laptop
50# To test on your laptop, set `ON_EMR=False`
51# To test on AWS for final submission, set `ON_EMR=True`
52
53ON_EMR = True
54
55
56# ## Grading
57#
58# We ask you use the `OutputLogger` object `my_output` to store the
59# results of your program.
60# We have provided function calls to `my_output.append()` method for
61# storing the results in all necessary places.
62# Please make sure NOT to remove these lines
63#
64# In the last cell of this file, we write the content of `my_output`
65# to a pickle file which the grader will read in and use for grading.
66
67# In[3]:
68
69
70import os
71import pickle
72
73
74class OutputLogger:
75 def __init__(self):
76 self.ans = {}
77
78 def append(self, key, value):
79 self.ans[key] = value
80
81 def write_to_disk(self):
82 if ON_EMR:
83 filepath = os.path.expanduser("answer.pickle")
84 print("FilePath = {}".format(filepath))
85 with open(filepath, 'wb') as f:
86 pickle.dump(self.ans, f)
87 proc = subprocess.Popen(["/usr/local/hadoop/hadoop-2.7.4/bin/hadoop", "fs", "-copyFromLocal", filepath, "/user/spark/answer.pickle"])
88 proc.wait()
89 os.remove(filepath)
90 else:
91 filepath = os.path.expanduser("~/answer.pickle")
92 with open(filepath, 'wb') as f:
93 pickle.dump(self.ans, f)
94
95
96my_output = OutputLogger()
97
98
99# # Part 0: Load data to a RDD
100
101# The tweets data is stored on AWS S3. We have in total a little over 1 TB of tweets. We provide 10 MB of tweets for your local development. For the testing and grading on the homework server, we will use different data.
102#
103# ## Testing on the homework server
104# On EdX, we provide three different input sizes to test your program: 10 MB, 1 GB, and 10 GB. For any run, we will only be using one of these four datasets.
105#
106# For submission and for local testing, make sure to read the path of the file you want to operate with from `./hw2-files.txt`. Otherwise your program will receive no points.
107#
108# ## Local test
109#
110# For local testing, please create your own `hw2-files.txt` file, which contains a single file path on the local disk, e.g.
111# `file://<absolute_path_to_current_directory>/hw2-files-10mb.txt`. For final submission, we will create this file on our server for testing with the appropriate file path. If your implementation is correct, you should not worry about which file system (i.e. local file system or HDFS) Spark will read data from.
112#
113# Now let's see how many lines there are in the input files.
114#
115# 1. Make RDD from the data in the file given by the file path present in `hw2-files.txt`. (hint: use [the `textFile()` method](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile))
116# 2. Mark the RDD to be cached (so in next operation data will be loaded in memory)
117# 3. call the `count` method to print number of lines in all these files
118#
119# <b>It should print<b>
120# ```
121# Number of elements: 2150
122# ```
123
124# In[4]:
125
126
127"""
128This is a useful cell for debugging.
129Use timer_start() and timer_stop() at different parts of your code
130for checking the amount of time a segment takes.
131"""
132from time import time
133
134
135timer = []
136prev_ts = None
137
138
139def timer_start():
140 global prev_ts
141 prev_ts = time()
142
143
144def timer_stop(title):
145 timer.append((title, time() - prev_ts))
146
147
148# In[5]:
149
150
151timer_start()
152
153from pyspark import SparkContext
154
155sc = SparkContext()
156
157timer_stop("set up sc")
158
159
160# In[6]:
161
162
163timer_start()
164
165
166with open('./hw2-files.txt') as f:
167 file_path = [w.strip() for w in f.readlines() if w.strip()]
168
169# Your code here
170# raise NotImplementedError()
171twitterRDD=sc.textFile(file_path[0])
172twitterRDD.cache()
173count=twitterRDD.count()
174
175
176my_output.append("num-tweets", count)
177print('Number of elements:', count)
178timer_stop("read data")
179
180
181# # Part 1: Parse JSON strings to JSON objects
182
183# Python has built-in support for JSON.
184
185# In[7]:
186
187
188import json
189
190json_example = '''
191{
192 "id": 1,
193 "name": "A green door",
194 "price": 12.50,
195 "tags": ["home", "green"]
196}
197'''
198
199json_obj = json.loads(json_example)
200json_obj
201
202
203# ## Broken tweets and irrelevant messages
204#
205# The data of this assignment may contain broken tweets (invalid JSON strings). So make sure that your code is robust for such cases.
206#
207# You can filter out such broken tweet by checking if:
208# * the line is not in json format
209#
210# In addition, some lines in the input file might not be tweets, but messages that the Twitter server sent to the developer (such as [limit notices](https://dev.twitter.com/streaming/overview/messages-types#limit_notices)). Your program should also ignore these messages.
211#
212# These messages would not contain the `created_at` field and can be filtered out accordingly.
213# * Check if json object of the broken tweet has a `created_at` field
214#
215# *Hint:* [Catch the ValueError](http://stackoverflow.com/questions/11294535/verify-if-a-string-is-json-in-python)
216#
217# **********************************************************************************
218#
219# **Tasks**
220#
221# (1) Parse raw JSON tweets to obtain valid JSON objects.
222#
223# (2) From all valid tweets, construct a pair RDD of `(user_id, text)`, where `user_id` is the `id_str` data field of the `user` dictionary (read [Tweets](#Tweets) section above), `text` is the `text` data field.
224
225# In[8]:
226
227
228import json
229
230def safe_parse(raw_json):
231 """
232 Input is a String
233 Output is a JSON object if the tweet is valid and None if not valid
234 """
235 # YOUR CODE HERE
236 # raise NotImplementedError()
237 try:
238 json_object=json.loads(raw_json)
239 if json_object.get('created_at') is None:
240 return None
241 else:
242 return json_object
243 except ValueError as e:
244 return None
245 return True
246
247
248# In[9]:
249
250
251"""
252# Remember to construct an RDD of (user_id, text) here.
253"""
254
255# YOUR CODE HERE
256# raise NotImplementedError()
257parsedRDD=twitterRDD.map(safe_parse) .filter(lambda json_obj: json_obj is not None) .map(lambda json_obj: (json_obj['user']['id_str'],json_obj['text']))
258
259
260# ## Number of unique users
261#
262# Count the number of different users in all valid tweets (hint: use [the `distinct()` method](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct)).
263#
264# *******************************************************************************
265#
266# **It should print**
267# ```
268# The number of unique users is: 1748
269# ```
270
271# In[10]:
272
273
274timer_start()
275
276# YOUR CODE HERE
277# raise NotImplementedError()
278users_count=len(parsedRDD.countByKey())
279
280
281my_output.append("num-unique-users", users_count)
282print('The number of unique users is:', users_count)
283timer_stop("Count unique users")
284
285
286# # Part 2: Number of posts from each user partition
287
288# Load the Pickle file `/twitter/users-partition.pickle`, you will get a dictionary which represents a partition over 452,743 Twitter users, `{user_id: partition_id}`. The users are partitioned into 7 groups. For example, if the dictionary is loaded into a variable named `partition`, the partition ID of the user `59458445` is `partition["59458445"]`. These users are partitioned into 7 groups. The partition ID is an integer between 0-6.
289#
290# Note that the user partition we provide doesn't cover all users appear in the input data.
291
292# ## Load the pickle file
293#
294# For local testing, you can load the pickle file from the local file system, namely
295#
296# ```
297# proc = subprocess.Popen(["cat", "./users-partition.pickle"],
298# stdout=subprocess.PIPE)
299# pickle_content = proc.communicate()[0]
300# ```
301# However, for submission, please keep following code block unchanged, since on the server the pickle file is located on the HDFS.
302
303# In[11]:
304
305
306import subprocess
307import pickle
308
309if ON_EMR:
310 proc = subprocess.Popen(["/usr/local/hadoop/hadoop-2.7.4/bin/hadoop", "fs", "-cat", "/user/spark/twitter/users-partition.pickle"],
311 stdout=subprocess.PIPE)
312 pickle_content = proc.communicate()[0]
313
314else:
315 #!wget 'http://mas-dse-open.s3.amazonaws.com/Twitter/users-partition.pickle' -O './users-partition.pickle'
316 proc = subprocess.Popen(["cat", "./users-partition.pickle"],
317 stdout=subprocess.PIPE)
318 pickle_content = proc.communicate()[0]
319
320partition = pickle.loads(pickle_content)
321len(partition)
322
323
324# ## Tweets per user partition
325#
326# 1. Count the number of posts from each user partition
327#
328# 2. Count the number of posts from group 0, 1, ..., 6, plus the number of posts from users who are not in any partition. Assign users who are not in any partition to the group 7.
329#
330# 3. Put the results of this step into a pair RDD `(group_id, count)` that is sorted by key.
331#
332#
333#
334# Print the post count using the `print_post_count` function we provided.
335#
336# **It should print**
337#
338# ```
339# Group 0 posted 87 tweets
340# Group 1 posted 242 tweets
341# Group 2 posted 41 tweets
342# Group 3 posted 349 tweets
343# Group 4 posted 101 tweets
344# Group 5 posted 358 tweets
345# Group 6 posted 434 tweets
346# Group 7 posted 521 tweets
347# ```
348
349# In[12]:
350
351
352def print_post_count(counts):
353 for group_id, count in counts:
354 print('Group %d posted %d tweets' % (group_id, count))
355
356
357# In[13]:
358
359
360timer_start()
361
362# YOUR CODE HERE
363# raise NotImplementedError()
364def getCounts(id):
365 counts = [(_,0) for _ in range(8)]
366 p = partition.get(id,7)
367 counts[p] = (p,1)
368 return counts
369counts_per_partition = parsedRDD.map(lambda user_text:getCounts(user_text[0])) .reduce(lambda x,y: [(xi[0],xi[1]+yi[1]) for xi,yi in zip(x,y)])
370
371# Following code adds your solution to `my_output`
372assert(type(counts_per_partition) is list and
373 len(counts_per_partition) == 8 and
374 len(counts_per_partition[0]) == 2)
375my_output.append("counts_per_part", counts_per_partition)
376print_post_count(counts_per_partition)
377timer_stop("Count tweets per user partition")
378
379
380# # Part 3: Tokens that are relatively popular in each user partition
381
382# In this step, we are going to find tokens that are relatively popular in each user partition.
383#
384# We define the number of mentions of a token $t$ in a specific user partition $k$ as the number of users from the user partition $k$ that ever mentioned the token $t$ in their tweets. Note that even if some users might mention a token $t$ multiple times or in multiple tweets, a user will contribute at most 1 to the counter of the token $t$.
385#
386# Please make sure that the number of mentions of a token is equal to the number of users who mentioned this token but NOT the number of tweets that mentioned this token.
387#
388# Let $N_t^k$ be the number of mentions of the token $t$ in the user partition $k$. Let $N_t^{all} = \sum_{i=0}^7 N_t^{i}$ be the number of total mentions of the token $t$.
389#
390# We define the relative popularity of a token $t$ in a user partition $k$ as the log ratio between $N_t^k$ and $N_t^{all}$, i.e.
391#
392# \begin{equation}
393# p_t^k = \log \frac{N_t^k}{N_t^{all}}.
394# \end{equation}
395#
396#
397# You can compute the relative popularity by calling the function `get_rel_popularity`.
398
399# We load a tweet tokenizer for you in the following cells. This Tokenizer object is called `tok`. Don't forget to execute the two cells below.
400#
401# You can expand the following cell if needed to see the minutae of the Tokenizer.
402
403# In[14]:
404
405
406#!/usr/bin/env python
407
408"""
409This code implements a basic, Twitter-aware tokenizer.
410
411A tokenizer is a function that splits a string of text into words. In
412Python terms, we map string and unicode objects into lists of unicode
413objects.
414
415There is not a single right way to do tokenizing. The best method
416depends on the application. This tokenizer is designed to be flexible
417and this easy to adapt to new domains and tasks. The basic logic is
418this:
419
4201. The tuple regex_strings defines a list of regular expression
421 strings.
422
4232. The regex_strings strings are put, in order, into a compiled
424 regular expression object called word_re.
425
4263. The tokenization is done by word_re.findall(s), where s is the
427 user-supplied string, inside the tokenize() method of the class
428 Tokenizer.
429
4304. When instantiating Tokenizer objects, there is a single option:
431 preserve_case. By default, it is set to True. If it is set to
432 False, then the tokenizer will downcase everything except for
433 emoticons.
434
435The __main__ method illustrates by tokenizing a few examples.
436
437I've also included a Tokenizer method tokenize_random_tweet(). If the
438twitter library is installed (http://code.google.com/p/python-twitter/)
439and Twitter is cooperating, then it should tokenize a random
440English-language tweet.
441
442
443Julaiti Alafate:
444 I modified the regex strings to extract URLs in tweets.
445"""
446
447__author__ = "Christopher Potts"
448__copyright__ = "Copyright 2011, Christopher Potts"
449__credits__ = []
450__license__ = "Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License: http://creativecommons.org/licenses/by-nc-sa/3.0/"
451__version__ = "1.0"
452__maintainer__ = "Christopher Potts"
453__email__ = "See the author's website"
454
455######################################################################
456
457import re
458from html import entities
459
460######################################################################
461# The following strings are components in the regular expression
462# that is used for tokenizing. It's important that phone_number
463# appears first in the final regex (since it can contain whitespace).
464# It also could matter that tags comes after emoticons, due to the
465# possibility of having text like
466#
467# <:| and some text >:)
468#
469# Most imporatantly, the final element should always be last, since it
470# does a last ditch whitespace-based tokenization of whatever is left.
471
472# This particular element is used in a couple ways, so we define it
473# with a name:
474emoticon_string = r"""
475 (?:
476 [<>]?
477 [:;=8] # eyes
478 [\-o\*\']? # optional nose
479 [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
480 |
481 [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
482 [\-o\*\']? # optional nose
483 [:;=8] # eyes
484 [<>]?
485 )"""
486
487# The components of the tokenizer:
488regex_strings = (
489 # Phone numbers:
490 r"""
491 (?:
492 (?: # (international)
493 \+?[01]
494 [\-\s.]*
495 )?
496 (?: # (area code)
497 [\(]?
498 \d{3}
499 [\-\s.\)]*
500 )?
501 \d{3} # exchange
502 [\-\s.]*
503 \d{4} # base
504 )"""
505 ,
506 # URLs:
507 r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
508 ,
509 # Emoticons:
510 emoticon_string
511 ,
512 # HTML tags:
513 r"""<[^>]+>"""
514 ,
515 # Twitter username:
516 r"""(?:@[\w_]+)"""
517 ,
518 # Twitter hashtags:
519 r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
520 ,
521 # Remaining word types:
522 r"""
523 (?:[a-z][a-z'\-_]+[a-z]) # Words with apostrophes or dashes.
524 |
525 (?:[+\-]?\d+[,/.:-]\d+[+\-]?) # Numbers, including fractions, decimals.
526 |
527 (?:[\w_]+) # Words without apostrophes or dashes.
528 |
529 (?:\.(?:\s*\.){1,}) # Ellipsis dots.
530 |
531 (?:\S) # Everything else that isn't whitespace.
532 """
533 )
534
535######################################################################
536# This is the core tokenizing regex:
537
538word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)
539
540# The emoticon string gets its own regex so that we can preserve case for them as needed:
541emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)
542
543# These are for regularizing HTML entities to Unicode:
544html_entity_digit_re = re.compile(r"&#\d+;")
545html_entity_alpha_re = re.compile(r"&\w+;")
546amp = "&"
547
548######################################################################
549
550class Tokenizer:
551 def __init__(self, preserve_case=False):
552 self.preserve_case = preserve_case
553
554 def tokenize(self, s):
555 """
556 Argument: s -- any string or unicode object
557 Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
558 """
559 # Try to ensure unicode:
560 try:
561 s = str(s)
562 except UnicodeDecodeError:
563 s = s.encode('string_escape')
564 s = str(s)
565 # Fix HTML character entitites:
566 s = self.__html2unicode(s)
567 # Tokenize:
568 words = word_re.findall(s)
569 # Possible alter the case, but avoid changing emoticons like :D into :d:
570 if not self.preserve_case:
571 words = map((lambda x : x if emoticon_re.search(x) else x.lower()), words)
572 return words
573
574 def tokenize_random_tweet(self):
575 """
576 If the twitter library is installed and a twitter connection
577 can be established, then tokenize a random tweet.
578 """
579 try:
580 import twitter
581 except ImportError:
582 print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/")
583 from random import shuffle
584 api = twitter.Api()
585 tweets = api.GetPublicTimeline()
586 if tweets:
587 for tweet in tweets:
588 if tweet.user.lang == 'en':
589 return self.tokenize(tweet.text)
590 else:
591 raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
592
593 def __html2unicode(self, s):
594 """
595 Internal metod that seeks to replace all the HTML entities in
596 s with their corresponding unicode characters.
597 """
598 # First the digits:
599 ents = set(html_entity_digit_re.findall(s))
600 if len(ents) > 0:
601 for ent in ents:
602 entnum = ent[2:-1]
603 try:
604 entnum = int(entnum)
605 s = s.replace(ent, unichr(entnum))
606 except:
607 pass
608 # Now the alpha versions:
609 ents = set(html_entity_alpha_re.findall(s))
610 ents = filter((lambda x : x != amp), ents)
611 for ent in ents:
612 entname = ent[1:-1]
613 try:
614 s = s.replace(ent, unichr(entities.name2codepoint[entname]))
615 except:
616 pass
617 s = s.replace(amp, " and ")
618 return s
619
620
621# In[15]:
622
623
624from math import log
625
626tok = Tokenizer(preserve_case=False)
627
628def get_rel_popularity(c_k, c_all):
629 return log(1.0 * c_k / c_all) / log(2)
630
631
632def print_tokens(tokens, gid = None):
633 group_name = "overall"
634 if gid is not None:
635 group_name = "group %d" % gid
636 print('=' * 5 + ' ' + group_name + ' ' + '=' * 5)
637 for t, n in tokens:
638 print("%s\t%.4f" % (t, n))
639 print
640
641
642# ## Tokenize tweets
643#
644# 1. Tokenize the tweets using the `tokenize` function that is a method of the `Tokenizer` class that we have instantiated as `tok`.
645#
646# 1. Count the number of mentions for each tokens regardless of specific user group.
647#
648# 1. Call `print_count` function to show how many different tokens we have.
649#
650# **It should print**
651# ```
652# Number of tokens: 7677
653# ```
654
655# In[16]:
656
657
658# YOUR CODE HERE
659# raise NotImplementedError()
660num_of_tokens=parsedRDD.flatMap(lambda x:tok.tokenize(x[1])).distinct().count()
661
662
663my_output.append("num-tokens", num_of_tokens)
664print("Number of tokens:", num_of_tokens)
665
666
667# ## Token popularity
668#
669# Tokens that are mentioned by too few users are usually not very interesting. So we want to only keep tokens that are mentioned by at least 100 users. Filter out tokens that don't meet this requirement.
670#
671# Call `print_count` function to show how many different tokens we have after the filtering.
672#
673# Call `print_tokens` function to show top 20 most frequent tokens.
674#
675# **It should print**
676# ```
677# Number of tokens: 46
678# ===== overall =====
679# : 1046.0000
680# rt 920.0000
681# . 767.0000
682# the 587.0000
683# trump 560.0000
684# … 520.0000
685# to 501.0000
686# , 497.0000
687# in 385.0000
688# a 383.0000
689# is 382.0000
690# of 300.0000
691# ! 285.0000
692# for 275.0000
693# and 263.0000
694# on 218.0000
695# i 216.0000
696# he 191.0000
697# that 190.0000
698# " 181.0000
699# ```
700
701# In[17]:
702
703
704timer_start()
705
706# YOUR CODE HERE
707# raise NotImplementedError()
708userid=parsedRDD.mapValues(lambda text: set(tok.tokenize(text))) .reduceByKey(lambda x,y: x|y) .flatMap(lambda user_tok:((user_tok[0],tok) for tok in user_tok[1])) .cache()
709user_list=userid.map(lambda user_tok: (user_tok[1],1)) .reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] >= 100).cache()
710
711num_freq_tokens=user_list.count()
712top20=user_list.collect()[:20]
713
714my_output.append("num-freq-tokens", num_freq_tokens)
715my_output.append("top-20-tokens", top20)
716print("Number of tokens:", num_freq_tokens)
717print_tokens(top20)
718timer_stop("Count overall most popular tokens")
719
720
721# ## Relative Popularity
722#
723# For all tokens that are mentioned by at least 100 users, compute their relative popularity in each user group. Then print the top 10 tokens with highest relative popularity in each user group. In case two tokens have same relative popularity, break the tie by printing the alphabetically smaller one.
724#
725# **Hint:** Let the relative popularity of a token $t$ be $p$. The order of the items will be satisfied by sorting them using (-p, t) as the key.
726#
727# **It should print**
728# ```
729# ===== group 0 =====
730# with -3.6088
731# cruz -3.6554
732# his -3.6582
733# amp -3.8651
734# on -3.9608
735# to -4.0145
736# & -4.0875
737# https -4.1699
738# i -4.1699
739# what -4.1699
740# ===== group 1 =====
741# sanders -2.2854
742# gop -2.4060
743# hillary -2.4330
744# ’ -2.4463
745# bernie -2.4835
746# " -2.6925
747# are -2.7249
748# this -2.7633
749# for -2.8179
750# about -2.8346
751# ===== group 2 =====
752# with -4.3458
753# donald -4.5146
754# ... -4.7004
755# gop -4.7279
756# i -4.9475
757# on -4.9608
758# he -4.9925
759# … -5.1155
760# https -5.1699
761# what -5.1699
762# ===== group 3 =====
763# bernie -1.5945
764# sanders -1.6609
765# hillary -2.2188
766# and -2.5154
767# " -2.5930
768# in -2.6114
769# will -2.6160
770# https -2.6674
771# ... -2.7004
772# you -2.7004
773# ===== group 4 =====
774# what -3.4330
775# have -3.4725
776# bernie -3.5380
777# this -3.5518
778# it -3.6881
779# ? -3.6912
780# for -3.7110
781# about -3.7415
782# hillary -3.7549
783# that -3.7625
784# ===== group 5 =====
785# what -1.8007
786# not -1.8745
787# https -2.0000
788# his -2.0144
789# cruz -2.0704
790# it -2.1031
791# on -2.1243
792# & -2.1399
793# amp -2.1489
794# ; -2.1592
795# ===== group 6 =====
796# will -1.3847
797# have -1.4725
798# ! -1.5850
799# cruz -1.6919
800# trump -1.7199
801# https -1.7549
802# - -1.7673
803# ; -1.7807
804# be -1.7952
805# amp -1.8144
806# ===== group 7 =====
807# donald -1.0740
808# trump -1.6535
809# bernie -1.7790
810# sanders -1.7829
811# ’ -1.8613
812# of -1.9069
813# ? -1.9186
814# with -1.9307
815# the -1.9588
816# be -1.9758
817# ```
818
819# In[29]:
820
821
822timer_start()
823
824
825# YOUR CODE HERE
826# raise NotImplementedError()
827def get_partition_id(user):
828 return partition[user] if user in partition.keys() else 7
829tok_dict = dict(user_list.collect())
830tok_word = list(tok_dict.keys())
831
832tok_count = userid.filter(lambda user_tok: user_tok[1] in tok_word) .map(lambda user_tok: ((get_partition_id(user_tok[0]), user_tok[1]), 1)) .reduceByKey(lambda x, y: x+y) .map(lambda part: (part[0][0],(part[0][1], get_rel_popularity(part[1],tok_dict[part[0][1]])))) .groupByKey()
833popular_10_in_each_group = sorted(tok_count.collect())
834
835# Following code add your solution to `my_output`
836my_output.append("popular_10_in_each_group", popular_10_in_each_group)
837for k in range(8):
838 print_tokens(popular_10_in_each_group[k][1], k)
839timer_stop("Print popular tokens in each group")
840
841
842# ## Important: Write your solutions to disk
843#
844# Following cell write your solutions to disk which would be read in by the grader for grading.
845
846# In[18]:
847
848
849my_output.write_to_disk()
850
851
852# ## Optional Ungraded Exercise
853# The users partition is generated by a machine learning algorithm that tries to group the users by their political preferences. Three of the user groups are showing supports to Bernie Sanders, Ted Cruz, and Donald Trump.
854#
855# If your program looks okay on the local test data, you can try it on the larger input by submitting your program to the homework server. Observe the output of your program to larger input files, can you guess the partition IDs of the three groups mentioned above based on your output?
856
857# In[19]:
858
859
860# Change the values of the following three items to your guesses
861users_support = [
862 (-1, "Bernie Sanders"),
863 (-1, "Ted Cruz"),
864 (-1, "Donald Trump")
865]
866
867for gid, candidate in users_support:
868 print("Users from group %d are most likely to support %s." % (gid, candidate))
869
870
871# Uncomment the lines in the cell below to print out the time taken between `time_start()` and `time_end()` at various stages of your pipeline.
872
873# In[ ]:
874
875
876# total_time = 0.0
877# for item in timer:
878# print("{}\t{}".format(item[0], item[1]))
879# total_time += item[1]
880# print("Total time: {}".format(total_time))
881
882
883# In[ ]:
884
885
886
887
888