· 7 years ago · Feb 16, 2019, 10:50 PM
1# 1 : Instructions
2
3# Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
4
5# Data Description
6
7# A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 68883 rows of orders data
8
9# MySQL database information:
10
11# Installation on the node ms.itversity.com
12# Database name is retail_db
13# Username: retail_user
14# Password: itversity
15# Table name orders
16# Output Requirements
17
18# Place the customer files in the HDFS directory
19# /user/`whoami`/problem1/solution/
20# Replace `whoami` with your OS user name
21# Use a text format with comma as the columnar delimiter
22# Load every order record completely
23# End of Problem
24
25sqoop import \
26--connect jdbc:mysql://ms.itversity.com/retail_db \
27--username retail_user \
28--password itversity \
29--table orders \
30--target-dir /user/codersham/problem1/solution \
31--as-textfile \
32--fields-terminated-by ','
33
34#2 :Instructions
35
36# Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
37
38# Data Description
39
40# Data is available in local file system /data/retail_db
41
42# retail_db information:
43
44# Source directories: /data/retail_db/orders and /data/retail_db/customers
45# Source delimiter: comma(",")
46# Source Columns - orders - order_id, order_date, order_customer_id, order_status
47# Source Columns - customers - customer_id, customer_fname, customer_lname and many more
48# Output Requirements
49
50# Target Columns: customer_lname, customer_fname
51# Number of Files: 1
52# Place the output file in the HDFS directory
53# /user/`whoami`/problem2/solution/
54# Replace `whoami` with your OS user name
55# File format should be text
56# delimiter is (",")
57# Compression: Uncompressed
58# End of Problem
59
60sc.setLogLevel('ERROR')
61
62from pyspark.sql import Row
63
64orders_rdd = sc.textFile('/public/retail_db/orders'). \
65map(lambda rec: Row(
66 order_id = int(rec.split(',')[0]),
67 order_customer_id =int(rec.split(',')[2])
68 ))
69
70orders_DF = sqlContext.createDataFrame(orders_rdd)
71
72orders_DF.registerTempTable('orders')
73
74cusotmers_rdd = sc.textFile('/public/retail_db/customers'). \
75map(lambda rec: Row(
76 customer_id = int(rec.split(',')[0]),
77 customer_fname = rec.split(',')[1],
78 customer_lname = rec.split(',')[2]))
79
80customers_DF = sqlContext.createDataFrame(cusotmers_rdd)
81
82customers_DF.registerTempTable('customers')
83
84result = sqlContext.sql("select distinct customer_lname, customer_fname from (select c.customer_lname, c.customer_fname, o.order_id from customers c left outer join orders o on c.customer_id = o.order_customer_id)q where q.order_id is NULL")
85
86sqlContext.setConf('spark.sql.shuffle.partitions','1')
87
88result. \
89selectExpr("concat(customer_lname,',',customer_fname)"). \
90write. \
91mode('overwrite'). \
92text('/user/codersham/problem2/solution')
93
94# 3: Instructions
95
96# Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
97
98# Data Description
99
100# Data is available in HDFS under /public/crime/csv
101
102# crime data information:
103
104# Structure of data: (ID, Case Number, Date, Block, IUCR, Primary Type, Description, Location Description, Arrst, Domestic, Beat, District, Ward, Community Area, FBI Code, X Coordinate, Y Coordinate, Year, Updated on, Latitude, Longitude, Location)
105# File format - text file
106# Delimiter - "," (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
107# Output Requirements
108
109# Output Fields: crime_type, incident_count
110# Output File Format: JSON
111# Delimiter: N/A
112# Compression: No
113# Place the output file in the HDFS directory
114# /user/`whoami`/problem3/solution/
115# Replace `whoami` with your OS user name
116# End of Problem
117
118sc.setLogLevel('ERROR')
119
120crime_data_with_header = sc.textFile('/public/crime/csv')
121
122crime_data_only_header = sc.parallelize([crime_data_with_header.first()])
123
124from pyspark.sql import Row
125
126crime_data_rdd = crime_data_with_header.subtract(crime_data_only_header). \
127filter(lambda rec: rec.split(',')[7] == 'RESIDENCE'). \
128map(lambda rec: Row(
129 id = int(rec.split(',')[0]),
130 crime_type = rec.split(',')[5]
131 ))
132
133crime_data_DF = sqlContext.createDataFrame(crime_data_rdd)
134
135crime_data_DF.registerTempTable('crime_data')
136
137result = sqlContext.sql("select crime_type, incident_count from (select crime_type, incident_count, dense_rank() over(order by incident_count desc) rnk from(select crime_type, count(id) incident_count from crime_data group by crime_type)q)m where m.rnk < 4")
138
139result.write.json('/user/codersham/problem3/solution/')
140
141#5 :Instructions
142
143# Get word count for the input data using space as delimiter (for each word, we need to get how many times it is repeated in the entire input data set)
144
145# Data Description
146
147# Data is available in HDFS /public/randomtextwriter
148
149# word count data information:
150
151# Number of executors should be 10
152# executor memory should be 3 GB
153# Executor cores should be 20 in total (2 per executor)
154# Number of output files should be 8
155# Avro dependency details: groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
156# Output Requirements
157
158# Output File format: Avro
159# Output fields: word, count
160# Compression: Uncompressed
161# Place the customer files in the HDFS directory
162# /user/`whoami`/problem5/solution/
163# Replace `whoami` with your OS user name
164# End of Problem
165
166pyspark \
167--master yarn \
168--conf spark.ui.port=12340 \
169--num-executors 10 \
170--executor-memory 3g \
171--executor-cores 2 \
172--packages com.databricks:spark-avro_2.10:2.0.1
173
174sc.setLogLevel('ERROR')
175
176word_rdd = sc.textFile('/public/randomtextwriter'). \
177flatMap(lambda rec: rec.split(' ')). \
178map(lambda rec:(rec,1))
179
180word_count = word_rdd.reduceByKey(lambda x,y: x+y)
181
182from pyspark.sql import Row
183
184word = word_count. \
185map(lambda rec: Row(
186 word = rec[0],
187 count = int(rec[1])
188 ))
189
190word_DF = sqlContext.createDataFrame(word)
191
192sqlContext.setConf('spark.sql.shuffle.partitions','8') # Not working to be re-worked
193
194word_DF. \
195write. \
196format('com.databricks.spark.avro'). \
197save('/user/codersham/problem5/solution/')
198
199#6 : Instructions
200
201# Get total number of orders for each customer where the cutomer_state = 'TX'
202
203# Data Description
204
205# retail_db data is available in HDFS at /public/retail_db
206
207# retail_db data information:
208
209# Source directories: /public/retail_db/orders and /public/retail_db/customers
210# Source Columns - orders - order_id, order_date, order_customer_id, order_status
211# Source Columns - customers - customer_id, customer_fname, customer_lname, customer_state (8th column) and many more
212# delimiter: (",")
213# Output Requirements
214
215# Output Fields: customer_fname, customer_lname, order_count
216# File Format: text
217# Delimiter: Tab character (\t)
218# Place the result file in the HDFS directory
219# /user/`whoami`/problem6/solution/
220# Replace `whoami` with your OS user name
221# End of Problem
222
223sc.setLogLevel('ERROR')
224
225from pyspark.sql import Row
226
227orders_rdd = sc.textFile('/public/retail_db/orders'). \
228map(lambda rec: Row(
229 order_id = int(rec.split(',')[0]),
230 order_customer_id = int(rec.split(',')[2])
231 ))
232
233orders_DF = sqlContext.createDataFrame(orders_rdd)
234
235orders_DF.registerTempTable('orders')
236
237customers_rdd = sc.textFile('/public/retail_db/customers'). \
238map(lambda rec: Row(
239 customer_id = int(rec.split(',')[0]),
240 customer_fname = rec.split(',')[1],
241 customer_lname = rec.split(',')[2],
242 customer_state = rec.split(',')[7]
243 ))
244
245customers_DF = sqlContext.createDataFrame(customers_rdd)
246
247customers_DF.registerTempTable('customers')
248
249result = sqlContext.sql("select c.customer_fname, c.customer_lname, count(o.order_id) order_count from customers c join orders o on c.customer_id = o.order_customer_id where c.customer_state = 'TX' group by c.customer_fname, c.customer_lname")
250
251sqlContext.setConf('spark.sql.shuffle.partitions','1')
252
253result.selectExpr("concat(customer_fname,'\t',customer_lname,'\t',order_count)"). \
254write. \
255mode('overwrite'). \
256text('/user/codersham/problem6/solution/')
257
258#7 : Instructions
259
260# List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
261
262# Data Description
263
264# retail_db data is available in HDFS at /public/retail_db
265
266# retail_db data information:
267
268# Source directories:
269# /public/retail_db/orders
270# /public/retail_db/order_items
271# /public/retail_db/products
272# Source delimiter: comma(",")
273# Source Columns - orders - order_id, order_date, order_customer_id, order_status
274# Source Columns - order_items - order_item_id, order_item_order_id, order_item_product_id, order_item_quantity, order_item_subtotal, order_item_product_price
275# Source Columns - products - product_id, product_category_id, product_name, product_description, product_price, product_image
276# Output Requirements
277
278# Target Columns: order_date, order_revenue, product_name, product_category_id
279# Data has to be sorted in descending order by order_revenue
280# File Format: text
281# Delimiter: colon (:)
282# Place the output file in the HDFS directory
283# /user/`whoami`/problem7/solution/
284# Replace `whoami` with your OS user name
285# End of Problem
286
287sc.setLogLevel('ERROR')
288
289from pyspark.sql import Row
290
291orders_rdd = sc.textFile('/public/retail_db/orders'). \
292map(lambda rec: Row(
293 order_id = int(rec.split(',')[0]),
294 order_date = rec.split(',')[1],
295 order_status = rec.split(',')[3]))
296
297orders_DF = sqlContext.createDataFrame(orders_rdd)
298
299orders_DF.registerTempTable('orders')
300
301orderItems_rdd = sc.textFile('/public/retail_db/order_items'). \
302map(lambda rec: Row(
303 order_item_order_id = int(rec.split(',')[1]),
304 order_item_product_id = int(rec.split(',')[2]),
305 order_item_subtotal = float(rec.split(',')[4])
306 ))
307
308orderItems_DF = sqlContext.createDataFrame(orderItems_rdd)
309
310orderItems_DF.registerTempTable('order_items')
311
312products_rdd = sc.textFile('/public/retail_db/products'). \
313map(lambda rec: Row(
314 product_id = int(rec.split(',')[0]),
315 product_category_id = int(rec.split(',')[1]),
316 product_name = rec.split(',')[2]
317 ))
318
319products_DF = sqlContext.createDataFrame(products_rdd)
320
321products_DF.registerTempTable('products')
322
323result = sqlContext.sql("select order_date, order_revenue, product_name, product_category_id from (select order_date, order_revenue, product_name, product_category_id, dense_rank() over(order by order_revenue desc) rnk from(select o.order_date, round(sum(oi.order_item_subtotal),2) order_revenue, p.product_name, p.product_category_id from products p join order_items oi on p.product_id = oi.order_item_product_id join orders o on oi.order_item_order_id = o.order_id where to_date(o.order_date) = '2013-07-26' group by p.product_name, p.product_category_id, o.order_date)q)m where m.rnk < 6")
324
325sqlContext.setConf('spark.sql.shuffle.partitions','1')
326
327result. \
328selectExpr("concat(order_date,':',order_revenue,':',product_name,':',product_category_id)"). \
329write. \
330text('/user/codersham/problem7/solution/')
331
332#8 : Instructions
333
334# List the order Items where the order_status = 'PENDING PAYMENT' order by order_id
335
336# Data Description
337
338# Data is available in HDFS location
339
340# retail_db data information:
341
342# Source directories: /public/retail_db/orders
343# Source delimiter: comma(",")
344# Source Columns - orders - order_id, order_date, order_customer_id, order_status
345# Output Requirements
346
347# Target columns: order_id, order_date, order_customer_id, order_status
348# File Format: orc
349# Place the output files in the HDFS directory
350# /user/`whoami`/problem8/solution/
351# Replace `whoami` with your OS user name
352# End of Problem
353
354sc.setLogLevel('ERROR')
355
356from pyspark.sql import Row
357
358orders_rdd = sc.textFile('/public/retail_db/orders'). \
359map(lambda rec: Row(
360 order_id = int(rec.split(',')[0]),
361 order_date = rec.split(',')[1],
362 order_customer_id = int(rec.split(',')[2]),
363 order_status = rec.split(',')[3])
364)
365
366orders_DF = sqlContext.createDataFrame(orders_rdd)
367
368orders_DF.registerTempTable('orders')
369
370result = sqlContext.sql("select * from orders where order_status = 'PENDING PAYMENT' order by order_id")
371
372result. \
373write. \
374orc('/user/codersham/problem8/solution/')
375
376#9 : Instructions
377
378# Remove header from h1b data
379
380# Data Description
381
382# h1b data with ascii null "\0" as delimiter is available in HDFS
383
384# h1b data information:
385
386# HDFS location: /public/h1b/h1b_data
387# First record is the header for the data
388# Output Requirements
389
390# Remove the header from the data and save rest of the data as is
391# Data should be compressed using snappy algorithm
392# Place the H1B data in the HDFS directory
393# /user/`whoami`/problem9/solution/
394# Replace `whoami` with your OS user name
395# End of Problem
396
397sc.setLogLevel('ERROR')
398
399h1b_data_with_header = sc.textFile('/public/h1b/h1b_data')
400
401h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
402
403h1b_data = h1b_data_with_header.subtract(h1b_data_only_header)
404
405h1b_data.saveAsTextFile('/user/codersham/problem9/solution/','org.apache.hadoop.io.compress.SnappyCodec')
406
407#10 : Instructions
408
409# Get number of LCAs filed for each year
410
411# Data Description
412
413# h1b data with ascii null "\0" as delimiter is available in HDFS
414
415# h1b data information:
416
417# HDFS Location: /public/h1b/h1b_data
418# Ignore first record which is header of the data
419# YEAR is 8th field in the data
420# There are some LCAs for which YEAR is NA, ignore those records
421# Output Requirements
422
423# File Format: text
424# Output Fields: YEAR, NUMBER_OF_LCAS
425# Delimiter: Ascii null "\0"
426# Place the output files in the HDFS directory
427# /user/`whoami`/problem10/solution/
428# Replace `whoami` with your OS user name
429# End of Problem
430
431sc.setLogLevel('ERROR')
432
433h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
434filter(lambda rec: rec.split('\0')[7] != 'NA')
435
436h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
437
438h1b_data = h1b_data_with_header.subtract(h1b_data_only_header)
439
440h1b_data_map = h1b_data.map(lambda rec: (rec.split('\0')[7],1))
441
442h1b_data_count = h1b_data_map.reduceByKey(lambda x,y:x+y)
443
444result = h1b_data_count.map(lambda rec: rec[0]+'\0'+str(rec[1]))
445
446result. \
447coalesce(1). \
448saveAsTextFile('/user/codersham/problem10/solution/')
449
450
451#11: Instructions
452
453# Get number of LCAs by status for the year 2016
454
455# Data Description
456
457# h1b data with ascii null "\0" as delimiter is available in HDFS
458
459# h1b data information:
460
461# HDFS Location: /public/h1b/h1b_data
462# Ignore first record which is header of the data
463# YEAR is 8th field in the data
464# STATUS is 2nd field in the data
465# There are some LCAs for which YEAR is NA, ignore those records
466# Output Requirements
467
468# File Format: json
469# Output Field Names: year, status, count
470# Place the output files in the HDFS directory
471# /user/`whoami`/problem11/solution/
472# Replace `whoami` with your OS user name
473# End of Problem
474
475sc.setLogLevel('ERROR')
476
477h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
478filter(lambda rec: rec.split('\0')[7] != 'NA')
479
480h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
481
482from pyspark.sql import Row
483
484h1b_data = h1b_data_with_header.subtract(h1b_data_only_header). \
485map(lambda rec: Row(
486 year = rec.split('\0')[7],
487 status = rec.split('\0')[1]
488 ))
489
490h1b_data_DF = sqlContext.createDataFrame(h1b_data)
491
492h1b_data_DF.registerTempTable('h1b_data')
493
494result = sqlContext.sql("select year, status, count(*) count from h1b_data where year = '2016' group by year, status")
495
496result. \
497write. \
498json('/user/codersham/problem11/solution', mode = 'overwrite')
499
500
501#12 : Instructions
502
503# Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
504
505# Data Description
506
507# h1b data with ascii null "\0" as delimiter is available in HDFS
508
509# h1b data information:
510
511# HDFS Location: /public/h1b/h1b_data
512# Ignore first record which is header of the data
513# YEAR is 7th field in the data
514# STATUS is 2nd field in the data
515# EMPLOYER is 3rd field in the data
516# There are some LCAs for which YEAR is NA, ignore those records
517# Output Requirements
518
519# File Format: parquet
520# Output Fields: employer_name, lca_count
521# Data needs to be in descending order by count
522# Place the output files in the HDFS directory
523# /user/`whoami`/problem12/solution/
524# Replace `whoami` with your OS user name
525# End of Problem
526
527sc.setLogLevel('ERROR')
528
529h1b_data_with_header = sc.textFile('/public/h1b/h1b_data'). \
530filter(lambda rec: rec.split('\0')[7] != 'NA')
531
532h1b_data_only_header = sc.parallelize([h1b_data_with_header.first()])
533
534from pyspark.sql import Row
535
536h1b_data = h1b_data_with_header.subtract(h1b_data_only_header). \
537map(lambda rec: Row(
538 year = rec.split('\0')[7],
539 employer_name = rec.split('\0')[2],
540 status = rec.split('\0')[1]
541 ))
542
543h1b_data_DF = sqlContext.createDataFrame(h1b_data)
544
545h1b_data_DF.registerTempTable('h1b_data')
546
547result = sqlContext.sql("select employer_name,lca_count from(select employer_name,lca_count,dense_rank() over(order by lca_count desc) rnk from (select employer_name, count(*) lca_count from h1b_data where year = '2016' and status in ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by employer_name)q)m where m.rnk < 6")
548
549sqlContext.setConf('spark.sql.shuffle.partitions','1')
550
551result.write.parquet('/user/codersham/problem12/solution/')
552
553
554#13 : Instructions
555
556# Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
557
558# Data Description
559
560# h1b data with ascii null "\0" as delimiter is available in HDFS
561
562# h1b data information:
563
564# HDFS Location: /public/h1b/h1b_data_noheader
565# Fields:
566# ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
567# Ignore data where PREVAILING_WAGE is NA or YEAR is NA
568# PREVAILING_WAGE is 7th field
569# YEAR is 8th field
570# Number of records matching criteria: 3002373
571# Output Requirements
572
573# Save it in Hive Database
574# Create Database: CREATE DATABASE IF NOT EXISTS `whoami`
575# Switch Database: USE `whoami`
576# Save data to hive table h1b_data
577# Create table command:
578
579# CREATE TABLE h1b_data (
580# ID INT,
581# CASE_STATUS STRING,
582# EMPLOYER_NAME STRING,
583# SOC_NAME STRING,
584# JOB_TITLE STRING,
585# FULL_TIME_POSITION STRING,
586# PREVAILING_WAGE DOUBLE,
587# YEAR INT,
588# WORKSITE STRING,
589# LONGITUDE STRING,
590# LATITUDE STRING
591# )
592
593# Replace `whoami` with your OS user name
594# End of Problem
595
596from pyspark.sql import Row
597
598h1b_data_rdd = sc.textFile('/public/h1b/h1b_data_noheader'). \
599filter(lambda rec: rec.split('\0')[7] != "NA"). \
600filter(lambda rec: rec.split('\0')[6] != "NA"). \
601map(lambda rec: Row(
602 ID = int(rec.split('\0')[0]),
603 CASE_STATUS = rec.split('\0')[1],
604 EMPLOYER_NAME = rec.split('\0')[2],
605 SOC_NAME = rec.split('\0')[3],
606 JOB_TITLE = rec.split('\0')[4],
607 FULL_TIME_POSITION = rec.split('\0')[5],
608 PREVAILING_WAGE = float(rec.split('\0')[6]),
609 YEAR = int(rec.split('\0')[7]),
610 WORKSITE = rec.split('\0')[8],
611 LONGITUDE = rec.split('\0')[9],
612 LATITUDE = rec.split('\0')[10]
613 ))
614
615h1b_data_DF = sqlContext.createDataFrame(h1b_data_rdd)
616
617h1b_data_DF. \
618write. \
619saveAsTable('codersham.h1b_data')
620
621#14 : Instructions
622
623# Export h1b data from hdfs to MySQL Database
624
625# Data Description
626
627# h1b data with ascii character "\001" as delimiter is available in HDFS
628
629# h1b data information:
630
631# HDFS Location: /public/h1b/h1b_data_to_be_exported
632# Fields:
633# ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
634# Number of records: 3002373
635# Output Requirements
636
637# Export data to MySQL Database
638# MySQL database is running on ms.itversity.com
639# User: h1b_user
640# Password: itversity
641# Database Name: h1b_export
642# Table Name: h1b_data_`whoami`
643# Nulls are represented as: NA
644# After export nulls should not be stored as NA in database. It should be represented as database null
645# Create table command:
646
647# CREATE TABLE h1b_data_`whoami` (
648# ID INT,
649# CASE_STATUS VARCHAR(50),
650# EMPLOYER_NAME VARCHAR(100),
651# SOC_NAME VARCHAR(100),
652# JOB_TITLE VARCHAR(100),
653# FULL_TIME_POSITION VARCHAR(50),
654# PREVAILING_WAGE FLOAT,
655# YEAR INT,
656# WORKSITE VARCHAR(50),
657# LONGITUDE VARCHAR(50),
658# LATITUDE VARCHAR(50));
659
660# Replace `whoami` with your OS user name
661# Above create table command can be run using
662# Login using mysql -u h1b_user -h ms.itversity.com -p
663# When prompted enter password itversity
664# Switch to database using use h1b_export
665# Run above create table command by replacing `whoami` with your OS user name
666# End of Problem
667
668sqoop export \
669--connect jdbc:mysql://ms.itversity.com/h1b_export \
670--username h1b_user \
671--password itversity \
672--table h1b_data_codersham \
673--export-dir /public/h1b/h1b_data_to_be_exported \
674--input-null-string 'NA' \
675--input-fields-terminated-by "\001"
676
677#15 : Instructions
678
679# Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
680
681# Data Description
682
683# A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 3002373 rows of h1b data
684
685# MySQL database information:
686
687# Installation on the node ms.itversity.com
688# Database name is h1b_db
689# Username: h1b_user
690# Password: itversity
691# Table name h1b_data
692# Output Requirements
693
694# Place the h1b related data in files in HDFS directory
695# /user/`whoami`/problem15/solution/
696# Replace `whoami` with your OS user name
697# Use avro file format
698# Load only those records which have case_status as CERTIFIED completely
699# There are 2615623 such records
700# End of Problem
701
702sqoop import \
703--connect jdbc:mysql://ms.itversity.com/h1b_db \
704--username h1b_user \
705--password itversity \
706--table h1b_data \
707--where "case_status = 'CERTIFIED'" \
708--target-dir /user/codersham/problem15/solution \
709--as-avrodatafile
710
711#16 : Instructions
712
713# Get NYSE data in ascending order by date and descending order by volume
714
715# Data Description
716
717# NYSE data with "," as delimiter is available in HDFS
718
719# NYSE data information:
720
721# HDFS location: /public/nyse
722# There is no header in the data
723# Output Requirements
724
725# Save data back to HDFS
726# Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
727# File Format: text
728# Delimiter: :
729# Place the sorted NYSE data in the HDFS directory
730# /user/`whoami`/problem16/solution/
731# Replace `whoami` with your OS user name
732# End of Problem
733
734sc.setLogLevel('ERROR')
735
736from pyspark.sql import Row
737
738nyse_data_rdd = sc.textFile('/public/nyse'). \
739map(lambda rec: Row(
740 stockticker = rec.split(',')[0],
741 transactiondate = rec.split(',')[1],
742 openprice = float(rec.split(',')[2]),
743 highprice = float(rec.split(',')[3]),
744 lowprice = float(rec.split(',')[4]),
745 closeprice = float(rec.split(',')[5]),
746 volume = long(rec.split(',')[6])
747 ))
748
749nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
750
751from pyspark.sql.functions import col
752
753result = nyse_data_DF. \
754orderBy(col('transactiondate').asc(),col('volume').desc())
755
756sqlContext.setConf('spark.sql.shuffle.partitions','1')
757
758result. \
759selectExpr("concat(stockticker,':',transactiondate,':',openprice,':',highprice,':',lowprice,':',closeprice,':',volume)"). \
760write. \
761text("/user/codersham/problem16/solution")
762
763#17 : Instructions
764
765# Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
766
767# Data Description
768
769# NYSE data with "," as delimiter is available in HDFS
770
771# NYSE data information:
772
773# HDFS location: /public/nyse
774# There is no header in the data
775# NYSE Symbols data with "\t" as delimiter is available in HDFS
776
777# NYSE Symbols data information:
778
779# HDFS location: /public/nyse_symbols
780# First line is header and it should be included
781# Output Requirements
782
783# Get unique stock ticker for which corresponding names are missing in NYSE symbols data
784# Save data back to HDFS
785# File Format: avro
786# Avro dependency details:
787# groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
788# Place the sorted NYSE data in the HDFS directory
789# /user/`whoami`/problem17/solution/
790# Replace `whoami` with your OS user name
791# End of Problem
792
793sc.setLogLevel('ERROR')
794
795from pyspark.sql import Row
796
797nyse_data_rdd = sc.textFile('/public/nyse'). \
798map(lambda rec: Row(
799 stockticker = rec.split(',')[0],
800 transactiondate = rec.split(',')[1],
801 openprice = float(rec.split(',')[2]),
802 highprice = float(rec.split(',')[3]),
803 lowprice = float(rec.split(',')[4]),
804 closeprice = float(rec.split(',')[5]),
805 volume = long(rec.split(',')[6])
806 ))
807
808nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
809
810nyse_data_DF.registerTempTable('nyse_data')
811
812nyse_symbols_with_header = sc.textFile("/public/nyse_symbols")
813
814nyse_symbols_only_header = sc.parallelize([nyse_symbols_with_header.first()])
815
816nyse_symbols_rdd = nyse_symbols_with_header.subtract(nyse_symbols_only_header). \
817map(lambda rec: Row(
818 symbol = rec.split('\t')[0],
819 description = rec.split('\t')[1]
820 ))
821
822nyse_symbols_DF = sqlContext.createDataFrame(nyse_symbols_rdd)
823
824nyse_symbols_DF.registerTempTable('nyse_symbols')
825
826result = sqlContext.sql("select distinct stockticker from (select nyd.stockticker, nys.description from nyse_data nyd left outer join nyse_symbols nys on nyd.stockticker = nys.symbol)q where q.description is null")
827
828sqlContext.setConf('spark.sql.shuffle.partitions','1')
829
830result. \
831write. \
832format('com.databricks.spark.avro'). \
833save('/user/codersham/problem17/solution/')
834
835
836#18 : Instructions
837
838# Get the name of stocks displayed along with other information
839
840# Data Description
841
842# NYSE data with "," as delimiter is available in HDFS
843
844# NYSE data information:
845
846# HDFS location: /public/nyse
847# There is no header in the data
848# NYSE Symbols data with tab character (\t) as delimiter is available in HDFS
849
850# NYSE Symbols data information:
851
852# HDFS location: /public/nyse_symbols
853# First line is header and it should be included
854# Output Requirements
855
856# Get all NYSE details along with stock name if exists, if not stockname should be empty
857# Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
858# Delimiter: ,
859# File Format: text
860# Place the data in the HDFS directory
861# /user/`whoami`/problem18/solution/
862# Replace `whoami` with your OS user name
863# End of Problem
864
865sc.setLogLevel('ERROR')
866
867from pyspark.sql import Row
868
869nyse_data_rdd = sc.textFile('/public/nyse'). \
870map(lambda rec: Row(
871 stockticker = rec.split(',')[0],
872 transactiondate = rec.split(',')[1],
873 openprice = float(rec.split(',')[2]),
874 highprice = float(rec.split(',')[3]),
875 lowprice = float(rec.split(',')[4]),
876 closeprice = float(rec.split(',')[5]),
877 volume = long(rec.split(',')[6])
878 ))
879
880nyse_data_DF = sqlContext.createDataFrame(nyse_data_rdd)
881
882nyse_data_DF.registerTempTable('nyse_data')
883
884nyse_symbols_with_header = sc.textFile("/public/nyse_symbols")
885
886nyse_symbols_only_header = sc.parallelize([nyse_symbols_with_header.first()])
887
888nyse_symbols_rdd = nyse_symbols_with_header.subtract(nyse_symbols_only_header). \
889map(lambda rec: Row(
890 symbol = rec.split('\t')[0],
891 description = rec.split('\t')[1]
892 ))
893
894nyse_symbols_DF = sqlContext.createDataFrame(nyse_symbols_rdd)
895
896nyse_symbols_DF.registerTempTable('nyse_symbols')
897
898result = sqlContext.sql("select nyd.stockticker, nvl(nys.description,'') stockname, nyd.transactiondate, nyd.openprice, nyd.highprice, nyd.lowprice, nyd.closeprice, nyd.volume from nyse_data nyd left outer join nyse_symbols nys on nyd.stockticker = nys.symbol")
899
900sqlContext.setConf('spark.sql.shuffle.partitions','1')
901
902result. \
903selectExpr("concat(stockticker,',',stockname,',',transactiondate,',',openprice,',',highprice,',',lowprice,',',closeprice,',',volume)"). \
904write. \
905text('/user/codersham/problem18/solution/')
906
907
908#19 : Instructions
909
910# Get number of companies who filed LCAs for each year
911
912# Data Description
913
914# h1b data with ascii null "\0" as delimiter is available in HDFS
915
916# h1b data information:
917
918# HDFS Location: /public/h1b/h1b_data_noheader
919# Fields:
920# ID, CASE_STATUS, EMPLOYER_NAME, SOC_NAME, JOB_TITLE, FULL_TIME_POSITION, PREVAILING_WAGE, YEAR, WORKSITE, LONGITUDE, LATITUDE
921# Use EMPLOYER_NAME as the criteria to identify the company name to get number of companies
922# YEAR is 8th field
923# There are some LCAs for which YEAR is NA, ignore those records
924# Output Requirements
925
926# File Format: text
927# Delimiter: tab character "\t"
928# Output Field Order: year, lca_count
929# Place the output files in the HDFS directory
930# /user/`whoami`/problem19/solution/
931# Replace `whoami` with your OS user name
932# End of Problem
933
934sc.setLogLevel('ERROR')
935
936from pyspark.sql import Row
937
938h1b_data_rdd = sc.textFile('/public/h1b/h1b_data_noheader'). \
939filter(lambda rec: rec.split('\0')[7] != 'NA'). \
940map(lambda rec: Row(year = rec.split('\0')[7],employer_name = rec.split('\0')[2]))
941
942h1b_data_DF = sqlContext.createDataFrame(h1b_data_rdd)
943
944h1b_data_DF.registerTempTable('h1b_data')
945
946result = sqlContext.sql("select year, count(distinct employer_name) lca_count from h1b_data group by year")
947
948sqlContext.setConf('spark.sql.shuffle.parititons','1')
949
950result. \
951selectExpr("concat(year,'\t',lca_count)"). \
952write. \
953text('/user/codersham/problem19/solution/')
954
955#20 : Instructions
956
957# Connect to the MySQL database on the itversity labs using sqoop and import data with employer_name, case_status and count. Make sure data is sorted by employer_name in ascending order and by count in descending order
958
959# Data Description
960
961# A MySQL instance is running on a remote node ms.itversity.com in the instance. You will find a table that contains 3002373 rows of h1b data
962
963# MySQL database information:
964
965# Installation on the node ms.itversity.com
966# Database name is h1b_db
967# Username: h1b_user
968# Password: itversity
969# Table name h1b_data
970# Output Requirements
971
972# Place the h1b related data in files in HDFS directory
973# /user/`whoami`/problem20/solution/
974# Replace `whoami` with your OS user name
975# Use text file format and tab (\t) as delimiter
976# Hint: You can use Spark with JDBC or Sqoop import with query
977# You might not get such hints in actual exam
978# Output should contain employer name, case status and count
979# End of Problem
980
981sqoop import \
982--connect jdbc:mysql://ms.itversity.com/h1b_db \
983--username h1b_user \
984--password itversity \
985--query "select employer_name, case_status, count(id) count from h1b_data where \$CONDITIONS group by employer_name, case_status order by employer_name asc, count desc" \
986--target-dir /user/codersham/problem20/solution \
987--fields-terminated-by '\t' \
988--num-mappers 1