· 4 years ago · May 21, 2021, 02:24 PM
1/*
2 * This program text file is part of the CS-A1120 Programming 2 course
3 * materials at Aalto University in Spring 2021. The programming exercises
4 * at CS-A1120 are individual and confidential assignments---this means
5 * that as a student taking the course you are allowed to individually
6 * and confidentially work with the material, to discuss and review the
7 * material with course staff, as well as to submit the material for grading
8 * on course infrastructure. All other use, including in particular
9 * distribution of the material or exercise solutions, is forbidden and
10 * constitutes a violation of the code of conduct at this course.
11 *
12 */
13
14package words
15
16import org.apache.spark.rdd.RDD
17import org.apache.spark.SparkContext
18import org.apache.spark.SparkContext._
19
20object run {
21
22 def main(args: Array[String]): Unit = {
23
24 /*
25 * Let us start by setting up a Spark context which runs locally
26 * using two worker threads.
27 *
28 * Here we go:
29 *
30 */
31
32 val sc = new SparkContext("local[2]", "words")
33
34 /*
35 * The following setting controls how ``verbose'' Spark is.
36 * Comment this out to see all debug messages.
37 * Warning: doing so may generate massive amount of debug info,
38 * and normal program output can be overwhelmed!
39 */
40 sc.setLogLevel("WARN")
41
42 /*
43 * Next, let us set up our input.
44 */
45
46 val path = "r10/10.2_words/data/"
47
48
49 /*
50 * After the path is configured, we need to decide which input
51 * file to look at. There are two choices -- you should test your
52 * code with "War and Peace" (default below), and then use the code with
53 * "Wealth of Nations" to compute the correct solutions
54 * (which you will submit to A+ for grading).
55 *
56 */
57
58 // Tolstoy -- War and Peace (test input)
59 val filename = path ++ "pg2600.txt"
60
61 // Smith -- Wealth of Nations (uncomment line below to use as input)
62// val filename = path ++ "pg3300.txt"
63
64 /*
65 * Now you may want to open up in a web browser
66 * the Scala programming guide for
67 * Spark version 3.0.1:
68 *
69 * http://spark.apache.org/docs/3.0.1/programming-guide.html
70 *
71 */
72
73 /*
74 * Let us now set up an RDD from the lines of text in the file:
75 *
76 */
77
78 val lines: RDD[String] = sc.textFile(filename)
79
80 /* The following requirement sanity-checks the number of lines in the file
81 * -- if this requirement fails you are in trouble.
82 */
83
84 require((filename.contains("pg2600.txt") && lines.count() == 65007) ||
85 (filename.contains("pg3300.txt") && lines.count() == 35600))
86
87 /*
88 * Let us make one further sanity check. That is, we want to
89 * count the number of lines in the file that contain the
90 * substring "rent".
91 *
92 */
93
94 val lines_with_rent: RDD[String] =
95 lines.filter(line => line.contains("rent"))
96
97 val rent_count = lines_with_rent.count()
98 println("OUTPUT: \"rent\" occurs on %d lines in \"%s\""
99 .format(rent_count, filename))
100 require((filename.contains("pg2600.txt") && rent_count == 360) ||
101 (filename.contains("pg3300.txt") && rent_count == 1443))
102
103 /*
104 * All right, if the execution continues this far without
105 * failing a requirement, we should be pretty sure that we have
106 * the correct file. Now we are ready for the work that you need
107 * to put in.
108 *
109 */
110
111 /*
112 * Spark operates by __transforming__ RDDs. For example, above we
113 * took the RDD 'lines', and transformed it into the RDD 'lines_with_rent'
114 * using the __filter__ transformation.
115 *
116 * Important:
117 * While the code that manipulates RDDs may __look like__ we are
118 * manipulating just another Scala collection, this is in fact
119 * __not__ the case. An RDD is an abstraction that enables us
120 * to easily manipulate terabytes of data in a cluster computing
121 * environment. In this case the dataset is __distributed__ across
122 * the cluster. In fact, it is most likely that the entire dataset
123 * cannot be stored in a single cluster node.
124 *
125 * Let us practice our skills with simple RDD transformations.
126 *
127 */
128
129 /*
130 * Task 1:
131 * This task asks you to transform the RDD
132 * 'lines' into an RDD 'depunctuated_lines' so that __on each line__,
133 * all occurrences of any of the punctuation characters
134 * ',', '.', ':', ';', '\"', '(', ')', '{', '}' have been deleted.
135 *
136 * Hint: it may be a good idea to consult
137 * http://www.scala-lang.org/api/2.12.12/scala/collection/immutable/StringOps.html
138 *
139 */
140
141 val depunctuated_lines: RDD[String] = {
142 lines.map(x => x.replace(",", "").replace(".", "").replace(":","").replace(";", "").replace("\"","").replace("(","").replace(")","").replace("{","").replace("}",""))
143 }
144
145
146 /*
147 * Let us now check and print out data that you want to
148 * record (__when the input file is "pg3300.txt"__) into
149 * the file "wordsSolutions.scala" that you need to submit for grading
150 * together with this file.
151 */
152
153 val depunctuated_length = depunctuated_lines.map(_.length).reduce(_ + _)
154 println("OUTPUT: total depunctuated length is %d".format(depunctuated_length))
155 require(!filename.contains("pg2600.txt") || depunctuated_length == 3069444)
156
157
158 /*
159 * Task 2:
160 * Next, let us now transform the RDD of depunctuated lines to
161 * an RDD of consecutive __tokens__. That is, we want to split each
162 * line into zero or more __tokens__ where a __token__ is a
163 * maximal nonempty sequence of non-space (non-' ') characters on a line.
164 * Blank lines or lines with only space (' ') in them should produce
165 * no tokens at all.
166 *
167 * Hint: Use either a map or a flatMap to transform the RDD
168 * line by line. Again you may want to take a look at StringOps
169 * for appropriate methods to operate on each line. Use filter
170 * to get rid of blanks as necessary.
171 *
172 */
173
174 val tokens: RDD[String] = depunctuated_lines.flatMap(x => x.split(' ').filterNot(x => x == ""|| x==" ")); // transform 'depunctuated_lines' to tokens
175
176
177 /* ... and here comes the check and the printout. */
178
179 val token_count = tokens.count()
180 println("OUTPUT: %d tokens".format(token_count))
181 require(!filename.contains("pg2600.txt") || token_count == 566315)
182
183
184 /*
185 * Task 3:
186 * Transform the RDD of tokens into a new RDD where all upper case
187 * characters in each token get converted into lower case. Here you may
188 * restrict the conversion to characters in the Roman alphabet
189 * 'A', 'B', ..., 'Z'.
190 *
191 */
192
193 val tokens_lc: RDD[String] = tokens.map(_.toLowerCase) // map each token in 'tokens' to lower case
194
195
196 /* ... and here comes the check and the printout. */
197
198 val tokens_a_count = tokens.flatMap(t => t.filter(_ == 'a')).count()
199 println("OUTPUT: 'a' occurs %d times in tokens".format(tokens_a_count))
200 require(!filename.contains("pg2600.txt") || tokens_a_count == 199232)
201
202 /*
203 * Task 4:
204 * Transform the RDD of lower-case tokens into a new RDD where
205 * all but those tokens that consist only of lower-case characters
206 * 'a', 'b', ..., 'z' in the Roman alphabet have been filtered out.
207 * Let us call the tokens that survive this filtering __words__.
208 *
209 */
210
211 val words: RDD[String] = tokens_lc.filter(x => x.forall(y => ('a' to 'z').contains(y))) // filter out all but words from 'tokens_lc'
212
213
214 /* ... and here comes the check and the printout. */
215
216 val words_count = words.count()
217 println("OUTPUT: %d words".format(words_count))
218 require(!filename.contains("pg2600.txt") || words_count == 547644)
219
220
221 /*
222 * Now let us move beyond maps, filtering, and flatMaps
223 * to do some basic statistics on words. To solve this task you
224 * can consult the Spark programming guide, examples, and API:
225 *
226 * http://spark.apache.org/docs/3.0.1/programming-guide.html
227 * http://spark.apache.org/examples.html
228 * http://spark.apache.org/docs/3.0.1/api/scala/index.html#org.apache.spark.package
229 */
230
231 /*
232 * Task 5:
233 * Count the number of occurrences of each word in 'words'.
234 * That is, create from 'words' by transformation an RDD
235 * 'word_counts' that consists of, ___in descending order___,
236 * pairs (c,w) such that w occurs exactly c times in 'words'.
237 * Then take the 100 most frequent words in this RDD and
238 * answer the following two questions (first is practice with
239 * a given answer for "pg2600.txt", the second question is
240 * the one where you need to find the answer yourself and
241 * submit it for grading).
242 *
243 * Practice question for "pg2600.txt" (answer given below):
244 * What word occurs exactly 1772 times in 'words' ?
245 * (answer: "pierre")
246 *
247 * The question that you need to answer for "pg3300.txt":
248 * What word occurs exactly 777 times in 'words' ?
249 * (give your answer in lower case)
250 *
251 */
252
253 val word_counts: RDD[(Long,String)] = words.map(key => (key, 1L)).reduceByKey((key, value) => key + value).map({case (key, value) => (value, key)}).sortByKey(false)
254
255
256 /* ... and here comes a check. */
257
258 val top_word = word_counts.take(1)(0)
259 println("OUTPUT: top word is %s (%d times)".format(top_word._2, top_word._1))
260 require(!filename.contains("pg2600.txt") || (top_word._2 == "the" && top_word._1 == 34558))
261
262 /* ... print out the 100 most frequent words. */
263
264 println("OUTPUT: The 100 most frequent words are, in rank order ...")
265 word_counts.take(100)
266 .zipWithIndex
267 .foreach(x =>
268 println("OUTPUT: %3d: \"%s\" with %d occurrences".
269 format(x._2+1,x._1._2,x._1._1)))
270
271 /* That's it! */
272
273 }
274}
275
276
277