· 7 years ago · Dec 08, 2018, 12:12 PM
1:'Problem 1
2#Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
3Output Requirements
4#Place the customer files in the HDFS directory
5#/user/yourusername/problem1/solution/
6#Replace yourusername with your OS user name
7#Use a text format with comma as the columnar delimiter
8#Load every order record completely'
9
10sqoop import \
11--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
12--username retail_user \
13--password itversity \
14--table orders \
15--target-dir /user/vikasgonti/itversity/problem1/solution/
16
17:'#Problem 2
18#Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
19Output Requirements
20#Target Columns: customer_lname, customer_fname
21#Number of Files: 1
22#Place the output file in the HDFS directory
23#/user/yourusername/problem2/solution/
24#Replace yourusername with your OS user name
25#File format should be text
26#delimiter is (",")
27#Compression: Uncompressed
28'
29sqoop import \
30--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
31--username retail_user \
32--password itversity \
33--target-dir /user/vikasgonti/itversity/problem2/solution/ \
34-m 1 \
35--query "select customer_lname, customer_fname from customers left outer join orders on customer_id = order_customer_id where \$CONDITIONS and order_customer_id is null order by customer_lname, customer_fname"
36
37
38:'#Problem 3
39#Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
40Output Requirements
41#Output Fields: crime_type, incident_count
42#Output File Format: JSON
43#Delimiter: N/A
44#Compression: No
45#Place the output file in the HDFS directory
46#/user/yourusername/problem3/solution/
47#Replace yourusername with your OS user name'
48
49val crimeData = sc.textFile("/public/crime/csv")
50val crimeDataHeader = crimeData.first
51val crimeDatawithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
52
53crimeDatawithoutHeader.take(10).foreach(println)
54
55val crimeRDD = crimeDatawithoutHeader.map(rec => {
56 val t = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
57 (t(5),t(7))
58})
59
60val crimeDF = crimeRDD.toDF("type","location")
61crimeDF.registerTempTable("crime");
62val res = sqlContext.sql("select * from (select type as crime_type, count(1) as incident_count from crime "+
63 "where location = 'RESIDENCE' group by type order by incident_count desc) A limit 3");
64res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem3/solution/")
65
66:'#Problem 4
67#Convert NYSE data into parquet
68Output Requirements
69#Column Names: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
70#Convert file format to parquet
71#Place the output file in the HDFS directory
72#/user/yourusername/problem4/solution/
73#Replace yourusername with your OS user name'
74
75val nyseRDD = sc.textFile("/user/vikasgonti/data/nyse")
76val nyseDF = nyseRDD.map(rec => {
77 val t = rec.split(",")
78 (t(0),t(1),t(2),t(3),t(4),t(5),t(6))
79}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
80
81nyseDF.write.parquet("/user/vikasgonti/itversity/problem4/solution/")
82
83:'#problem 5
84#Get word count for the input data using space as delimite
85Output Requirements
86#Output File format: Avro
87#Output fields: word, count
88#Compression: Uncompressed
89#Place the customer files in the HDFS directory
90#/user/yourusername/problem5/solution/
91#Replace yourusername with your OS user name'
92
93spark-shell --master yarn \
94 --conf spark.ui.port=12456 \
95 --num-executors 10 \
96 --executor-memory 3G \
97 --executor-cores 2 \
98 --packages com.databricks:spark-avro_2.10:2.0.1
99
100val wordsRDD = sc.textFile("/public/randomtextwriter")
101val wordsFlat = wordsRDD.flatMap(rec => rec.split(" "))
102val wordsMap = wordsFlat.map(rec => (rec,1))
103val wordsCount = wordsMap.reduceByKey((t,v) => t+v,8)
104val wordsDF = wordsCount.toDF("word","count")
105wordsDF.write.avro("/user/vikasgonti/itversity/problem5/solution/")
106
107
108:'#problem 6
109#Get total number of orders for each customer where the cutomer_state = 'TX'
110Output Requirements
111#Output Fields: customer_fname, customer_lname, order_count
112#File Format: text
113#Delimiter: Tab character (\t)
114#Place the result file in the HDFS directory
115#/user/yourusername/problem6/solution/
116#Replace yourusername with your OS user name'
117
118val orders = sc.textFile("/public/retail_db/orders")
119val customers = sc.textFile("/public/retail_db/customers")
120val ordersDF = orders.map(rec => {
121 val t = rec.split(",")
122 (t(0),t(2))
123}).toDF("order_id","order_customer_id")
124
125val customersDF = customers.map(rec => {
126 val t = rec.split(",")
127 (t(0),t(1),t(2), t(7))
128}).toDF("customer_id","customer_fname","customer_lname","customer_state")
129
130ordersDF.registerTempTable("orders")
131customersDF.registerTempTable("customers")
132val res = sqlContext.sql("select customer_fname, customer_lname, count(order_id) order_count "+
133 "from customers, orders where customer_id = order_customer_id and customer_state = 'TX' "+
134 "group by customer_fname, customer_lname ")
135
136res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem6/solution/")
137
138
139
140:'#problem 7
141#List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
142Output Requirements
143#Target Columns: order_date, order_revenue, product_name, product_category_id
144#Data has to be sorted in descending order by order_revenue
145#File Format: text
146#Delimiter: colon (:)
147#Place the output file in the HDFS directory
148#/user/yourusername/problem7/solution/
149#Replace yourusername with your OS user name
150'
151val orders = sc.textFile("/public/retail_db/orders")
152val orderItems = sc.textFile("/public/retail_db/order_items")
153val products = sc.textFile("/public/retail_db/products")
154
155val ordersDF = orders.map(rec => {
156 val t = rec.split(",")
157 (t(0),t(1).split(" ")(0), t(3))
158}).toDF("order_id","order_date","order_status")
159
160val orderItemsDF = orderItems.map(rec => {
161 val t = rec.split(",")
162 (t(1),t(2), t(4))
163}).toDF("order_item_order_id","order_item_product_id","order_item_subtotal")
164
165val productsDF = products.map(rec => {
166 val t = rec.split(",")
167 (t(0),t(1),t(2))
168}).toDF("product_id","product_category_id","product_name")
169
170ordersDF.registerTempTable("orders")
171orderItemsDF.registerTempTable("orderItems")
172productsDF.registerTempTable("products")
173
174val res = sqlContext.sql("select DISTINCT order_date, round(sum(order_item_subtotal) over (partition by product_id), 2) order_revenue, "+
175 "product_name,product_category_id from orders, products, orderItems where order_id = order_item_order_id "+
176 "and order_item_product_id = product_id and order_date ='2013-07-26' and order_status IN ('COMPLETE','CLOSED') "+
177 "order by order_revenue desc limit 5")
178res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem7/solution/")
179
180:'#problem 8
181#List the order Items where the order_status = PENDING PAYMENT order by order_id
182Output Requirements
183#Target columns: order_id, order_date, order_customer_id, order_status
184#File Format: orc
185#Place the output files in the HDFS directory
186#/user/yourusername/problem8/solution/
187#Replace yourusername with your OS user name'
188
189 val ordersRDD = sc.textFile("/public/retail_db/orders")
190 val ordersDF = orders.map(rec => {
191 val t = rec.split(",")
192 (t(0).toInt,t(1), t(2), t(3))
193}).toDF("order_id","order_date","order_customer_id","order_status")
194val res = ordersDF.filter("order_status = 'PENDING_PAYMENT'").orderBy("order_id")
195res.write.orc("/user/vikasgonti/itversity/problem8/solution/")
196
197:'#problem 9
198#Remove header from h1b data
199Output Requirements
200#Remove the header from the data and save rest of the data as is
201#Data should be compressed using snappy algorithm
202#Place the H1B data in the HDFS directory
203#/user/yourusername/problem9/solution/
204#Replace yourusername with your OS user name
205'
206val h1bdata = sc.textFile("/public/h1b/h1b_data")
207val h1bHeader = h1bdata.first
208val h1bdatawithoutheader = h1bdata.filter(rec => rec!=h1bHeader)
209h1bdatawithoutheader.saveAsTextFile("/user/vikasgonti/itversity/problem9/solution/",classOf[org.apache.hadoop.io.compress.SnappyCodec])
210
211:'#problem 10
212#Get number of LCAs filed for each year
213Output Requirements
214#File Format: text
215#Output Fields: YEAR, NUMBER_OF_LCAS
216#Delimiter: Ascii null "\0"
217#Place the output files in the HDFS directory
218#/user/yourusername/problem10/solution/
219#Replace yourusername with your OS user name'
220# ID CASE_STATUS EMPLOYER_NAME SOC_NAME JOB_TITLE FULL_TIME_POSITION PREVAILING_WAGE YEAR WORKSITE lon lat
221
222val h1bDF = h1bdatawithoutheader.map(rec => {
223 val t = rec.split("\0")
224 (t(7))
225}).toDF("YEAR")
226h1bDF.registerTempTable("h1bdata")
227val res = sqlContext.sql("select YEAR, count(1) as NUMBER_OF_LCAS from h1bdata where year != 'NA' group by year")
228res.map(rec => rec.mkString("\0")).saveAsTextFile("/user/vikasgonti/itversity/problem10/solution/")
229
230:'#problem 11
231#Get number of LCAs by status for the year 2016
232Output Requirements
233#File Format: json
234#Output Field Names: year, status, count
235#Place the output files in the HDFS directory
236#/user/yourusername/problem11/solution/
237#Replace yourusername with your OS user name'
238
239val h1bDF = h1bdatawithoutheader.map(rec => {
240 val t = rec.split("\0")
241 (t(1),t(7))
242}).toDF("Status", "Year")
243h1bDF.registerTempTable("h1bdata")
244val res = sqlContext.sql("select Year, Status, Count(1) as count from h1bdata where year != 'NA' and year = 2016 group by year, status")
245res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem11/solution/")
246
247:'#problem 12
248#Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
249Output Requirements
250#File Format: parquet
251#Output Fields: employer_name, lca_count
252#Data needs to be in descending order by count
253#Place the output files in the HDFS directory
254#/user/yourusername/problem12/solution/
255#Replace yourusername with your OS user name'
256
257val h1bDF = h1bdatawithoutheader.map(rec => {
258 val t = rec.split("\0")
259 (t(1),t(2),t(7))
260}).toDF("Status","Employer","Year")
261h1bDF.registerTempTable("h1bdata")
262val res = sqlContext.sql("select Employer employer_name, count(1) lca_count from h1bdata where year != 'NA' and year = 2016 "+
263 "and status IN ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by Employer order by lca_count desc limit 5")
264res.write.parquet("/user/vikasgonti/itversity/problem12/solution/")
265
266:'#problem 13
267#Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
268Output Requirements
269#Save it in Hive Database
270#Create Database: CREATE DATABASE IF NOT EXISTS yourusername
271#Switch Database: USE yourusername
272#Save data to hive table h1b_data
273#Create table command:
274
275CREATE TABLE h1b_data (
276 ID INT,
277 CASE_STATUS STRING,
278 EMPLOYER_NAME STRING,
279 SOC_NAME STRING,
280 JOB_TITLE STRING,
281 FULL_TIME_POSITION STRING,
282 PREVAILING_WAGE DOUBLE,
283 YEAR INT,
284 WORKSITE STRING,
285 LONGITUDE STRING,
286 LATITUDE STRING
287)
288
289Replace yourusername with your OS user name'
290
291val h1bdatawithoutheader = sc.textFile("/public/h1b/h1b_data_noheader")
292val h1bDF = h1bdatawithoutheader.map(rec => {
293 val t = rec.split("\0")
294 (t(0),t(1),t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9),t(10))
295}).toDF("ID","CASE_STATUS","EMPLOYER_NAME","SOC_NAME","JOB_TITLE","FULL_TIME_POSITION","PREVAILING_WAGE","YEAR","WORKSITE","LONGITUDE","LATITUDE")
296
297sqlContext.sql("use vghivedatabase")
298sqlContext.sql("show tables").show
299h1bDF.registerTempTable("h1bdata")
300
301sqlContext.sql("insert into h1b_data select * from h1bdata where year != 'NA' and PREVAILING_WAGE!='NA'")
302
303
304:'#problem 14
305#Export h1b data from hdfs to MySQL Database
306Output Requirements
307#Export data to MySQL Database
308#MySQL database is running on ms.itversity.com
309#User: h1b_user
310#Password: itversity
311Database Name: h1b_export
312Table Name: h1b_data_yourusername
313Nulls are represented as: NA
314After export nulls should not be stored as NA in database. It should be represented as database null
315Create table command:
316
317CREATE TABLE h1b_data_yourusername (
318 ID INT,
319 CASE_STATUS VARCHAR(50),
320 EMPLOYER_NAME VARCHAR(100),
321 SOC_NAME VARCHAR(100),
322 JOB_TITLE VARCHAR(100),
323 FULL_TIME_POSITION VARCHAR(50),
324 PREVAILING_WAGE FLOAT,
325 YEAR INT,
326 WORKSITE VARCHAR(50),
327 LONGITUDE VARCHAR(50),
328 LATITUDE VARCHAR(50));
329
330#Replace yourusername with your OS user name
331#Above create table command can be run using
332#Login using mysql -u h1b_user -h ms.itversity.com -p
333#When prompted enter password itversity
334#Switch to database using use h1b_export
335#Run above create table command by replacing yourusername with your OS user name'
336
337sqoop export \
338--connect jdbc:mysql://ms.itversity.com:3306/h1b_export \
339--username h1b_user \
340--password itversity \
341--table h1b_data_vg \
342--export-dir /public/h1b/h1b_data_to_be_exported \
343--input-fields-terminated-by '\001' \
344--input-null-string 'NA'
345
346:'#problem 15
347#Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
348Output Requirements
349#Place the h1b related data in files in HDFS directory
350#/user/yourusername/problem15/solution/
351#Replace yourusername with your OS user name
352#Use avro file format
353#Load only those records which have case_status as CERTIFIED completely
354#There are 2615623 such records'
355
356sqoop import \
357--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
358--username h1b_user \
359--password itversity \
360--table h1b_data \
361--where "CASE_STATUS = 'CERTIFIED'" \
362--target-dir /user/vikasgonti/itversity/problem15/solution/ \
363--as-avrodatafile
364
365:'#problem 16
366#Get NYSE data in ascending order by date and descending order by volume
367Output Requirements
368#Save data back to HDFS
369#Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
370#File Format: text
371#Delimiter: :
372#Place the sorted NYSE data in the HDFS directory
373#/user/yourusername/problem16/solution/
374#Replace yourusername with your OS user name'
375
376val nyseRDD = sc.textFile("/public/nyse")
377val nyseDF = nyseRDD.map(rec => {
378 val t = rec.split(",")
379 (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
380}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
381
382val res = nyseDF.orderBy(col("transactiondate"),col("volume").desc)
383res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem16/solution/")
384
385:'#problem 17
386#Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
387Output Requirements
388#Get unique stock ticker for which corresponding names are missing in NYSE symbols data
389#Save data back to HDFS
390#File Format: avro
391#Avro dependency details:
392#groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
393#Place the sorted NYSE data in the HDFS directory
394#/user/yourusername/problem17/solution/
395#Replace yourusername with your OS user name'
396
397val nyseRDD = sc.textFile("/public/nyse")
398val nyseDF = nyseRDD.map(rec => {
399 val t = rec.split(",")
400 (t(0).toString)
401}).toDF("stockticker")
402nyseDF.registerTempTable("nyse")
403
404val nysesymRDD = sc.textFile("/public/nyse_symbols")
405val first = nysesymRDD.first
406val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
407 val t = rec.split("\t")
408 (t(0).toString)
409}).toDF("stocksymbol")
410symDataDF.registerTempTable("symData")
411
412val res = sqlContext.sql("select DISTINCT stockticker as Symbol from nyse n "+
413 "left outer join symData s on n.stockticker = s.stocksymbol and s.stocksymbol is null")
414import com.databricks.spark.avro._
415res.write.avro("/user/vikasgonti/itversity/problem17/solution/")
416
417
418:'#problem 18
419#Get the name of stocks displayed along with other information
420Output Requirements
421#Get all NYSE details along with stock name if exists, if not stockname should be empty
422#Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
423#Delimiter: ,
424#File Format: text
425#Place the data in the HDFS directory
426#/user/yourusername/problem18/solution/
427#Replace yourusername with your OS user name'
428
429val nyseRDD = sc.textFile("/public/nyse")
430val nyseDF = nyseRDD.map(rec => {
431 val t = rec.split(",")
432 (t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
433}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
434nyseDF.registerTempTable("nyse")
435
436val nysesymRDD = sc.textFile("/public/nyse_symbols")
437val first = nysesymRDD.first
438val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
439 val t = rec.split("\t")
440 (t(0).toString, t(1).toString)
441}).toDF("stocksymbol", "stockname")
442symDataDF.registerTempTable("symData")
443
444
445val res = sqlContext.sql("select stockticker, nvl(stockname,'') stockname, transactiondate, openprice, "+
446 "highprice, lowprice, closeprice, volume "+
447 "from nyse n left outer join symData s on n.stockticker = s.stocksymbol")
448
449res.map(rec => rec.mkString(",")).saveAsTextFile("/user/vikasgonti/itversity/problem18/solution/")
450
451:'#problem 19
452#Get number of companies who filed LCAs for each year
453Output Requirements
454#File Format: text
455#Delimiter: tab character "\t"
456#Output Field Order: year, lca_count
457#Place the output files in the HDFS directory
458#/user/yourusername/problem19/solution/
459#Replace yourusername with your OS user name'
460
461val h1bdata = sc.textFile("/public/h1b/h1b_data_noheader")
462val h1bDF = h1bdata.map(rec => {
463 val t = rec.split("\0")
464 (t(2),t(7))
465}).toDF("EMPLOYER_NAME","YEAR")
466h1bDF.registerTempTable("h1bdata")
467val res = sqlContext.sql("select YEAR, count(EMPLOYER_NAME) as lca_count from h1bdata where year != 'NA' group by year")
468res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem19/solution/")
469
470:'#problem 20
471#using sqoop and import data with employer_name, case_status and count.
472#Make sure data is sorted by employer_name in ascending order and by count in descending order
473Output Requirements
474#Place the h1b related data in files in HDFS directory
475#/user/yourusername/problem20/solution/
476#Replace yourusername with your OS user name
477#Use text file format and tab (\t) as delimiter
478#Hint: You can use Spark with JDBC or Sqoop import with query
479#You might not get such hints in actual exam
480#Output should contain employer name, case status and count
481'
482sqoop eval \
483--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
484--username h1b_user \
485--password itversity \
486--query "select count(*) from (select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data group by EMPLOYER_NAME, CASE_STATUS) A"
487
488sqoop import \
489-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
490--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
491--username h1b_user \
492--password itversity \
493--query "select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data where \$CONDITIONS group by EMPLOYER_NAME, CASE_STATUS order by EMPLOYER_NAME, count desc" \
494--target-dir /user/vikasgonti/itversity/problem20/solution/ \
495--as-textfile \
496--fields-terminated-by '\t' \
497--split-by case_status