· 6 years ago · Oct 31, 2019, 02:05 AM
1Command to list services: service --status-all
2Command to start services:
3~> service hbase-master restart # On Master host
4~> service hbase-regionserver restart # On all RS hosts
5
6
7
8Remove root password(if required)
9sudo /etc/init.d/mysqld stop
10sudo /etc/init.d/mysqld stop
11sudo mysqld_safe --skip-grant-tables &
12mysql -uroot
13sudo /etc/init.d/mysqld start
14
15
16
17Queries:
181.
19a.
20[cloudera@quickstart bin]$ mysql -u root
21mysql> show databases;
22create database shopping;
23mysql> use shopping;
24
25Create Tables
26mysql>create table customer(
27userId int Primary Key,
28firstName char(50),
29lastName char(50),
30email varchar(100) ,
31mobileNo numeric(10,0),
32addr varchar(255)
33);
34
35mysql>create table orders(
36orderId int primary key,
37userId int,
38dateOfOrder date,
39statusOfOrder char(50),
40Foreign Key (userId) references customer(userId)
41);
42
43mysql>create table product(
44productId int Primary Key,
45productName varchar(100),
46modelNo varchar(50),
47serialNo varchar(50),
48price int,
49manufacturer varchar(50),
50category varchar(50)
51);
52
53mysql>create table productOrder(
54SrNo int auto_increment primary key not null,
55orderId int,
56productId int,
57quantity int,
58Foreign Key (productId) references product(productId),
59Foreign Key (orderId) references orders(orderId)
60);
61Insert Values:
62insert into customer values (001, 'Nihar', 'Abhyankar', 'nihar@gmail.com', 9876543210,'445 Mount
63Eden Road, Mount Eden, Auckland');
64insert into customer values (002, 'Anshul', 'Chaudhary', 'anshul@gmail.com', 9876543212, '21
65Greens Road RD 2 Ruawai 0592');
66insert into customer values (003, 'Prithvi', 'Amin', 'amin@gmail.com', 9876543213, 'Main Highway
67Otaki; 32 Wilson Street');
68insert into customer values (004, 'Parth', 'Tripathy', 'parth@gmail.com', 9876543214, 'PO Box
6939100, Howick');
70insert into customer values (005, 'Neha', 'jain', 'neha@gmail.com',9876543215, '305 - 14th Ave. S.
71Suite 3B');
72
73
74
75insert into product values (111, 'HyperX', 'Fury', '12344343',4000,'HyperX','RAM');
76insert into product values (112, 'Gigabyte GS-A20', 'hero',
77'12355347',6000,'Gigabyte','Motherboard');
78insert into product values (113, 'Intel i5', '8400', '12366346',24000,'Intel','Processor');
79insert into product values (114, 'Intel i7', '8700K', '12774345',34000,'Intel','Processor');
80insert into product values (115, 'Zotac Graphic Card', 'RTX2080', '13254343',55000,'Zotac','GPU');
81insert into product values (116, 'Zotac Graphic Card', 'RTX2060', '13004343',35000,'Zotac','GPU');
82insert into product values (117, 'Corsair', 'Vengance', '78344343',4000,'Corsair','RAM');
83insert into product values (118, 'Asus Tuf', 'Z370', '12378347',12000,'Asus','Motherboard');
84
85
86insert into orders values (1111, 001, '2019-04-15', 'Delivered');
87insert into orders values (1112, 002, '2018-05-25', 'Delivered');
88insert into orders values (1113, 004, '2019-06-30', 'Delivered');
89insert into orders values (1114, 005, '2019-08-01', 'Shipped');
90insert into orders values (1115, 002, '2019-08-08', 'Placed');
91
92
93insert into productOrder(orderId,productId,quantity) values(1111,111,1);
94insert into productOrder(orderId,productId,quantity) values(1111,115,1);
95insert into productOrder(orderId,productId,quantity) values(1111,117,1);
96insert into productOrder(orderId,productId,quantity) values(1112,116,1);
97insert into productOrder(orderId,productId,quantity) values(1113,118,1);
98insert into productOrder(orderId,productId,quantity) values(1114,114,2);
99insert into productOrder(orderId,productId,quantity) values(1114,114,1);
100insert into productOrder(orderId,productId,quantity) values(1115,112,1);
101
102
103Import single table to HDFS:
104[cloudera@quickstart ~]$ sqoop import --table customer --connect jdbc:mysql://localhost/shopping --username root --target-dir /user/pract
105
106sqoop import --table productOrder --connect jdbc:mysql://localhost/shopping --username root --target-dir /user/pract -m 1
107
108Import multiple tables to HDFS:
109 [training@localhost ~]$ sqoop import-all-tables --connect jdbc:mysql://localhost/shopping --username root --warehouse-dir /user/pract
110
111View data:
112[training@localhost ~]$ hdfs dfs -cat /user/cloudera/customer/part-m-*
113
114Import multiple tables to HIVE:
115sqoop import-all-tables --connect jdbc:mysql://localhost/shopping --username root --hive-import
116
117sqoop import \ --connect "jdbc:mysql://localhost/training" \ --username training -P \ --table cityByCountry \ --target-dir /user/where_clause \ --where "state = 'Alaska'" \--import -hive -m 1
118
119List Tables:
120[training@localhost ~]$ sqoop list-tables --connect jdbc:mysql://localhost/reyDB --username root
121List databases:
122[training@localhost ~]$ sqoop list-databases --connect "jdbc:mysql://localhost" --username root --password cloudera
123Export Data:
124[training@localhost ~]$ sqoop export --connect jdbc:mysql://localhost/shopping --username root --export-dir=/user/hive/warehouse/temp --table temp
125
126
127
128
129
130
131Display Product name and model ordered by user with userId =002
132select c.firstName, c.lastName, p.productName
133from customer c, product p, orders o, productOrder po
134where c.userId=2 and c.userId=o.userId and o.orderId=po.orderId and po.productId=p.productId;
135
136
137Display Customer name and ID and the number of orders placed
138create table temp as
139select c.userId,c.firstName, c.lastName, count(o.orderId)
140from customer c, orders o
141where c.userId=o.userId
142group by c.userId, c.firstName, c.lastName;
143
144
145hive> describe formatted temp;
146This command provides the path
147Path: /user/hive/warehouse/temp
148
149Create Mysql table
150mysql> create table temp(id int primary key, first_name varchar(20), last_name varchar(20), number long);
151
152
153Export to mysql:
154sqoop export --connect jdbc:mysql://localhost/shopping -m 1 --table temp --export-dir /user/hive/warehouse/temp --input-fields-terminated-by '|' --input-lines-terminated-by '#' --fields-terminated-by '|' --lines-terminated-by '#' --username root
155
156
157
158
159
160
161
162INSERT OVERWRITE LOCAL DIRECTORY '/home/cloudera/temp' ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
163select customer.userId,customer.firstname,count(orders.orderid) from customer,orders where customer.userid=orders.userid group by customer.userid,customer.firstname;
164
165Exit from hive and Go to mysql and create table output1 based on output of above query;
166Here,
167create table output1( customer_id int primary key,customer_name varchar(50),count_orders int);
168
169load data infile '/home/cloudera/temp/000000_0' into table output1 columns terminated by ',';
170If above query dont work
171Try
172load data local infile '/home/cloudera/temp/000000_0' into table output1 columns terminated by ',';
173
174
175
176
177
178
179
180
181
182
183
184
185
186HBASE
187
188hbase(main):006:0> list
189
190
191create - Creates a table.
192list - Lists all the tables in HBase.
193disable - Disables a table.
194is_disabled - Verifies whether a table is disabled.
195enable - Enables a table.
196is_enabled - Verifies whether a table is enabled.
197describe - Provides the description of a table.
198alter - Alters a table.
199alter 't1', NAME ⇒ 'f1', VERSIONS ⇒ 5
200alter ‘ table name ’, ‘delete’ ⇒ ‘ column family ’
201exists - Verifies whether a table exists.
202drop - Drops a table from HBase.
203drop_all - Drops the tables matching the ‘regex’ given in the command.
204 ./bin/stop-hbase.sh
205
206put - Puts a cell value at a specified column in a specified row in a particular table.
207get - Fetches the contents of row or a cell.
208delete - Deletes a cell value in a table.
209deleteall - Deletes all the cells in a given row.
210scan - Scans and returns the table data.
211count - Counts and returns the number of rows in a table.
212truncate - Disables, drops, and recreates a specified table
213hbase(main):032:0> create 'company', 'dept', 'project', 'dependent'
214hbase(main):034:0> put 'company', 'C1', 'dept:no', '1'
2150 row(s) in 0.2770 seconds
216hbase(main):035:0> put 'company', 'C1','dept:name', 'D1'
217hbase(main):043:0> scan 'company'
218hbase(main):043:0> scan 'company', {COLUMNS=>'dept:name'}
219
220hbase(main):031:0> create 'department',{'NAME'=>'name'}
2210 row(s) in 2.3140 seconds
222
223=> Hbase::Table - department
224
225hbase(main):033:0> put 'department','1','name:dname','CMPN'
2260 row(s) in 0.0270 seconds
227hbase(main):034:0> put 'department','2','name:dname','IT'
2280 row(s) in 0.0110 seconds
229hbase(main):035:0> create 'project',{'NAME'=>'details'}
2300 row(s) in 1.2450 seconds
231
232=> Hbase::Table - project
233hbase(main):036:0> put 'project','1','details:pname','Inventory'
2340 row(s) in 0.0130 seconds
235hbase(main):037:0> put 'project','1','details:dno','2'
2360 row(s) in 0.0060 seconds
237hbase(main):038:0> put 'project','2','details:pname','Inventory'
2380 row(s) in 0.0060 seconds
239hbase(main):039:0> put 'project','2','details:dno','1'
2400 row(s) in 0.0150 seconds
241hbase(main):040:0> put 'project','3','details:pname','Industrial'
2420 row(s) in 0.0050 seconds
243hbase(main):041:0> put 'project','3','details:dno','2'
2440 row(s) in 0.0060 seconds
245hbase(main):002:0> create 'dependent',{'NAME'=>'details'}
2460 row(s) in 2.5910 seconds
247=> Hbase::Table - dependent
248hbase(main):003:0> put 'dependent',1,'details:name','Varsha'
2490 row(s) in 0.2840 seconds
250hbase(main):004:0> put 'dependent',1,'details:relation','Mother'
2510 row(s) in 0.0070 seconds
252hbase(main):005:0> put 'dependent',2,'details:name','Devidas'
2530 row(s) in 0.0130 seconds
254hbase(main):006:0> put 'dependent',2,'details:relation','Father'
2550 row(s) in 0.0110 seconds
256
257hbase(main):008:0> scan 'department',{COLUMNS=>'name:dname'}
258ROW COLUMN+CELL
259 1 column=name:dname, timestamp=1572250404319, value=CMPN
260 2 column=name:dname, timestamp=1572250413830, value=IT
2612 row(s) in 0.1990 seconds
262
263hbase(main):009:0> count 'project'
2643 row(s) in 0.0830 seconds
265
266=> 3
267hbase(main):010:0> scan 'dependent',{COLUMNS=>['details:name','details:relation']}
268ROW COLUMN+CELL
269 1 column=details:name, timestamp=1572336508036, value=Varsha
270 1 column=details:relation, timestamp=1572336528477, value=Mo
271 ther
272 2 column=details:name, timestamp=1572336546638, value=Devida
273 s
274 2 column=details:relation, timestamp=1572336560001, value=Fa
275 ther
2762 row(s) in 0.0290 seconds
277
278hbase(main):011:0> alter 'dependent',NAME=>'cost'
279Updating all regions with the new schema...
2800/1 regions updated.
2811/1 regions updated.
282Done.
2830 row(s) in 3.2010 seconds
284
285OR(above or below any one query)
286
287hbase(main):014:0> alter 'dependent',NAME=>'cost',VERSIONS=>5
288Updating all regions with the new schema...
2891/1 regions updated.
290Done.
2910 row(s) in 1.9820 seconds
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310Count of Words:
311rdd1 = sc.textFile('file:/home/cloudera/Desktop/BDA/words.txt')
312rdd1.collect();
313rdd2 = rdd1.map(lambda word:(word,1)).reduceByKey(lambda v1,v2:(v1+v2));
314rdd2.collect();
315
316
317Count of words starting with ‘H’:
318Input file - words.txt
319Hi, my name is Rey. How are you all?
320Hi again Colorado USA Indiana USA
321Input file - words1.txt
322Illinois USA
323Goa USA
324Colorado USA
325Indiana USA
326PROGRAM
327from pyspark import SparkConf,SparkContext
328sc = SparkContext(master = "local[*]")
329#sc = SparkContext.getOrCreate()
330#Read text file and split it into lines using flatMap
331rdd1 = sc.textFile("file:/home/cloudera/Desktop/BDA/words.txt").flatMap(lambda line:line.split())
332#Method 1 : Filter out words starting with a letter 'H' using startswith()
333Hwords = rdd1.filter(lambda word:word.startswith('H')).collect()
334#Method 2 : Initial mapping (word, 1) and then combining same word (i.e. updating it's count)
335allwords = rdd1.map(lambda word:(word,1)).reduceByKey(lambda v1,v2:(v1+v2)).collect()
336#Example: allwords = [ (u'Hello', 2) , (u'World', 1) ] #Count
337c = 0
338for i in range(len(allwords)):
339if allwords[i][0][0] == 'H':
340c = c + allwords[i][1]
341#output from method 1
342print "Number of words starting with letter H are:",len(Hwords)
343#output from method 2
344print "Number of words starting with letter H are:",c
345OUTPUT:
346Number of words starting with letter H are: 3
347Number of words starting with letter H are: 3
348
349Two lettered words
350PROGRAM
351from pyspark import SparkConf,SparkContext
352sc = SparkContext(master = "local[*]")
353#sc = SparkContext.getOrCreate()
354
355#Read text file and split it into lines using flatMap
356rdd1 = sc.textFile("file:/home/cloudera/Desktop/BDA/words.txt").flatMap(lambda line:line.split())
357#Filter out 2 lettered words
358rdd2 = rdd1.filter(lambda word:len(word)==2)
359#Map each 2 letterd word initially as having count as 1 => (key,value) = (word, it's count)
360rdd3 = rdd2.map(lambda word:(word,1))
361#update count of a word(key) for multiple occurence of a word
362rdd4 = rdd3.reduceByKey(lambda v1,v2:(v1+v2))
363#collect is for printing the output on terminal but this is a .py file so we write print statement for it.
364print rdd4.collect()
365OUTPUT:
366[(u'is', 1), (u'Hi', 2), (u'My', 1)]
367
368SORTING
369PROGRAM
370from pyspark import SparkConf,SparkContext
371sc = SparkContext(master = "local[*]")
372#sc = SparkContext.getOrCreate()
373#Read text file and split it into lines using flatMap rdd1=sc.textFile("file:/home/cloudera/Desktop/BDA/words.txt").flatMap(lambda line:line.split())
374#Initial mapping (word, 1) and then combining same word (i.e. updating it's count) rdd2=rdd1.map(lambda word:(word,1)).reduceByKey(lambda v1,v2:(v1+v2))
375#sort alphabetically
376print rdd2.sortBy(lambda a:a[0]).collect()
377#sort by its occurence
378print rdd2.sortBy(lambda a:a[1]).collect()
379OUTPUT:
380Sort alphabetically
381Note - First capital letters then small letters are alphabetically sorted [(u'Hi', 2),
382(u'How', 1), (u'Mohit.', 1), (u'My', 1), (u'again', 1), (u'all?', 1), (u'are', 1), (u'is', 1), (u'name', 1), (u'you', 1)]
383Sort by occurence of words [(u'again', 1),
384(u'name', 1), (u'is', 1), (u'How', 1), (u'Mohit.', 1), (u'all?', 1), (u'you', 1), (u'My', 1), (u'are', 1), (u'Hi', 2)]
385
386Union of 2 files
387PROGRAM
388from pyspark import SparkConf,SparkContext
389sc = SparkContext(master = "local[*]")
390#Read text file and map lines
391rdd1=sc.textFile("file:/home/cloudera/Desktop/BDA/words.txt").map(lambda line:(line,line)) rdd2=sc.textFile("file:/home/cloudera/Desktop/BDA/words1.txt").map(lambda line:(line,line))
392#Union has single occurrence of repeated terms
393rdd3 = (rdd1 + rdd2).reduceByKey(lambda v1,v2:(v1)).collect()
394for i in range(len(rdd3)): print rdd3[i][0]
395OUTPUT and
396is
397Indiana USA Colorado USA all?
398you My Illinois USA Hi again name
399Goa USA How
400Hi Mohit. Hello are
401
402Searching a word
403PROGRAM
404from pyspark import SparkConf,SparkContext
405sc = SparkContext(master = "local[*]")
406#Read input
407rdd1=sc.textFile("file:/home/cloudera/Desktop/BDA/words.txt")
408searchTerm = 'USA'
409rdd2 = rdd1.filter(lambda line : (searchTerm in line)).collect()
410print rdd2
411OUTPUT
412[u'Colorado USA', u'Indiana USA']
413
414MAX AND MIN INPUT - numbers.txt
41510
41623
41777
41844
41932
42042
42165
422PROGRAM
423from pyspark import SparkConf, SparkContext
424sc = SparkContext(master = "local[*]")
425#Read input
426numbers = sc.textFile("file:/home/cloudera/Desktop/BDA/numbers.txt")
427rdd1 = numbers.map(lambda line: ("numbers", float(line)))
428maxnumber = rdd1.reduceByKey(lambda v1,v2 : (max([v1,v2]))).collect()
429minnumber = rdd1.reduceByKey(lambda v1,v2 : (min([v1,v2]))).collect()
430print 'Max is ', maxnumber[0][1], 'and Min is ', minnumber[0][1]
431OUTPUT
432Max is 77.0 and Min is 2.0
433
434
435Kmeans clustering
436pip install --user numpy==1.4.1 Or sudo yum install numpy
437sudo chown -R $USER (path) (ye jab agr permission denied aaye last mein)
438PROGRAM
439from pyspark.mllib.clustering import KMeans
440from numpy import array
441from math import sqrt
442from pyspark import SparkConf,SparkContext
443sc = SparkContext(master = "local[*]")
444#Read input
445data = sc.textFile("file:/home/cloudera/Desktop/BDA/iris.csv")
446header = data.first()
447header = sc.parallelize([header])
448data = data.subtract(header)
449parsedData = data.map(lambda line: array([float(x) for x in line.split(',')]))
450# Build the model (cluster the data)
451clusters = KMeans.train(parsedData, 3, maxIterations=10, runs=10, initializationMode="random")
452#Evaluate clustering by computing Within Set Sum of Squared Errors
453def error(point):
454center = clusters.centers[clusters.predict(point)]
455return sqrt(sum([x**2 for x in (point - center)]))
456WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) print("Within Set Sum of Squared Error = " + str(WSSSE))
457center = clusters.centers
458for i in center:
459print(i[0],i[1])
460OUTPUT
461For Admission_Predict.csv
462Within Set Sum of Squared Error = 2194.83778503
463(300.46987951807233, 100.57831325301206)
464(328.36363636363637, 113.1883116883117)
465(314.20858895705521, 105.42944785276075)
466For Iris.csv
467Within Set Sum of Squared Error = 97.1754768346
468(6.8499999999999996, 3.0736842105263151)
469(5.9016129032258071, 2.7483870967741941)
470(5.0040816326530608, 3.4163265306122446)
471
472
473
474Average of temp
475data = sc.parallelize(Seq(("T", 2), ("T", 4), ("T", 2), ("T", 0), ("T", 10)))
476avgValue = data.mapValues((_, 1)
477.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
478.mapValues{ case (sum, count) => (1.0 * sum) / count }
479.collectAsMap()
480Output: (Mohit execute karke dekh)
481Error: .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
482^
483SyntaxError: invalid syntax
484Average of ratings
485>>data = sc.parallelize(Seq(("R", 2), ("R", 4), ("R", 2), ("R", 0), ("R", 10)))
486>>avgValue = data.mapValues((_, 1)
487.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
488.mapValues{ case (sum, count) => (1.0 * sum) / count }
489.collectAsMap()
490Output: (Mohit execute karke dekh) Same error :
491.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
492^
493SyntaxError: invalid syntax
494
495
496
497Pagerank:
498from pyspark import SparkConf,SparkContext
499sc = SparkContext()
500
501def computeContribs(neighbors, rank):
502 for neighbor in neighbors: yield(neighbor, rank/len(neighbors))
503# read in a file of page links (format: url1 url2)
504linkfile="file:/home/cloudera/Desktop/BDA/PageLinks.txt"
505links = sc.textFile(linkfile).map(lambda line: line.split()).map(lambda pages: (pages[0],pages[1])).distinct().groupByKey().persist()
506# set initial page ranks to 1.0
507ranks=links.map(lambda (page,neighbors): (page,1.0))
508# number of iterations
509n = 10
510# for n iterations, calculate new page ranks based on neighbor contribibutios
511for x in range(n):
512contribs=links.join(ranks).flatMap(lambda (page,(neighbors,rank)): computeContribs(neighbors,rank))
513ranks=contribs.reduceByKey(lambda v1,v2: v1+v2).map(lambda (page,contrib):(page,contrib * 0.85 + 0.15))
514print "Iteration ",x
515for pair in ranks.take(10):
516print pair
517input:
518page1 page3
519page2 page1
520page4 page1
521page3 page1
522page4 page2
523page3 page4
524
525
526
527
528
529
530MapReduce
531Matrix Multiplication
532
533File: MultMapper.java:
534package mult;
535
536import org.apache.hadoop.conf.*;
537import org.apache.hadoop.io.LongWritable;
538import org.apache.hadoop.io.Text;
539import org.apache.hadoop.mapreduce.Mapper;
540import java.io.IOException;
541
542public class MultMapper
543 extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
544 @Override
545 public void map(LongWritable key, Text value, Context context)
546 throws IOException, InterruptedException {
547 Configuration conf = context.getConfiguration();
548 int m = Integer.parseInt(conf.get("m"));
549 int p = Integer.parseInt(conf.get("p"));
550 String line = value.toString();
551 // (M, i, j, Mij);
552 String[] indicesAndValue = line.split(",");
553 Text outputKey = new Text();
554 Text outputValue = new Text();
555 if (indicesAndValue[0].equals("M")) {
556 for (int k = 0; k < p; k++) {
557 outputKey.set(indicesAndValue[1] + "," + k);
558 // outputKey.set(i,k);
559 outputValue.set(indicesAndValue[0] + "," + indicesAndValue[2]
560 + "," + indicesAndValue[3]);
561 // outputValue.set(M,j,Mij);
562 context.write(outputKey, outputValue);
563 }
564 } else {
565 // (N, j, k, Njk);
566 for (int i = 0; i < m; i++) {
567 outputKey.set(i + "," + indicesAndValue[2]);
568 outputValue.set("N," + indicesAndValue[1] + ","
569 + indicesAndValue[3]);
570 context.write(outputKey, outputValue);
571 }
572 }
573 }
574 }
575
576File: MultReducer.java
577package mult;
578
579import org.apache.hadoop.io.Text;
580import org.apache.hadoop.mapreduce.Reducer;
581import java.io.IOException;
582import java.util.HashMap;
583
584public class MultReducer
585 extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
586 @Override
587 public void reduce(Text key, Iterable<Text> values, Context context)
588 throws IOException, InterruptedException {
589 String[] value;
590 //key=(i,k),
591 //Values = [(M/N,j,V/W),..]
592 HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
593 HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
594 for (Text val : values) {
595 value = val.toString().split(",");
596 if (value[0].equals("M")) {
597 hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
598 } else {
599 hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
600 }
601 }
602 int n = Integer.parseInt(context.getConfiguration().get("n"));
603 float result = 0.0f;
604 float m_ij;
605 float n_jk;
606 for (int j = 0; j < n; j++) {
607 m_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
608 n_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
609 result += m_ij * n_jk;
610 }
611 if (result != 0.0f) {
612 context.write(null,
613 new Text(key.toString() + "," + Float.toString(result)));
614 }
615 }
616}
617
618File: MatrixMultiply.java
619package mult;
620
621import org.apache.hadoop.conf.*;
622import org.apache.hadoop.fs.Path;
623import org.apache.hadoop.io.*;
624import org.apache.hadoop.mapreduce.*;
625import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
626import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
627import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
628import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
629
630public class MatrixMultiply {
631
632 public static void main(String[] args) throws Exception {
633 if (args.length != 2) {
634 System.err.println("Usage: MatrixMultiply <in_dir> <out_dir>");
635 System.exit(2);
636 }
637 Configuration conf = new Configuration();
638 // M is an m-by-n matrix; N is an n-by-p matrix.
639 conf.set("m", "1000");
640 conf.set("n", "100");
641 conf.set("p", "1000");
642 @SuppressWarnings("deprecation")
643 Job job = new Job(conf, "MatrixMultiply");
644 job.setJarByClass(MatrixMultiply.class);
645 job.setOutputKeyClass(Text.class);
646 job.setOutputValueClass(Text.class);
647
648 job.setMapperClass(MultMapper.class);
649 job.setReducerClass(MultReducer.class);
650
651 job.setInputFormatClass(TextInputFormat.class);
652 job.setOutputFormatClass(TextOutputFormat.class);
653
654 FileInputFormat.addInputPath(job, new Path(args[0]));
655 FileOutputFormat.setOutputPath(job, new Path(args[1]));
656
657 job.waitForCompletion(true);
658 }
659}
660
661Input files:
662/home/cloudera/Desktop/BDA/MatrixMultiplication/M.txt
663M,0,0,1
664M,0,1,2
665M,1,0,3
666M,1,1,4
667/home/cloudera/Desktop/BDA/MatrixMultiplication/N.txt
668N,0,0,5
669N,0,1,6
670N,1,0,7
671N,1,1,8
672
673
674[cloudera@quickstart ~]$ hdfs dfs -mkdir /test
675[cloudera@quickstart ~]$ hdfs dfs -put /home/cloudera/Desktop/BDA/MatrixMultiplication/*.txt /test
676[cloudera@quickstart ~]$ hadoop jar /home/cloudera/Desktop/BDA/MatrixMultiplication/Matrix.jar mult.MatrixMultiply /test/*.txt /moutput
677[cloudera@quickstart ~]$ hdfs dfs -cat /moutput/part-r-*
678Output:
6790,0,19.0
6800,1,22.0
6811,0,43.0
6821,1,50.0
683
684
685
686
687
688
689
690
691WORD COUNT
692File: WC_Mapper.java
693 import java.io.IOException;
694 import java.util.StringTokenizer;
695 import org.apache.hadoop.io.IntWritable;
696 import org.apache.hadoop.io.LongWritable;
697 import org.apache.hadoop.io.Text;
698 import org.apache.hadoop.mapred.MapReduceBase;
699 import org.apache.hadoop.mapred.Mapper;
700 import org.apache.hadoop.mapred.OutputCollector;
701 import org.apache.hadoop.mapred.Reporter;
702 public class WC_Mapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
703 private final static IntWritable one = new IntWritable(1);
704 private Text word = new Text();
705 public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output,
706 Reporter reporter) throws IOException{
707 String line = value.toString();
708 StringTokenizer tokenizer = new StringTokenizer(line);
709 while (tokenizer.hasMoreTokens()){
710 word.set(tokenizer.nextToken());
711 output.collect(word, one);
712 }
713 }
714 }
715File: WC_Reducer.java
716 import java.io.IOException;
717 import java.util.Iterator;
718 import org.apache.hadoop.io.IntWritable;
719 import org.apache.hadoop.io.Text;
720 import org.apache.hadoop.mapred.MapReduceBase;
721 import org.apache.hadoop.mapred.OutputCollector;
722 import org.apache.hadoop.mapred.Reducer;
723 import org.apache.hadoop.mapred.Reporter;
724
725 public class WC_Reducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
726 public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,
727 Reporter reporter) throws IOException {
728 int sum=0;
729 while (values.hasNext()) {
730 sum+=values.next().get();
731 }
732 output.collect(key,new IntWritable(sum));
733 }
734 }
735File: WC_Runner.java
736 import java.io.IOException;
737 import org.apache.hadoop.fs.Path;
738 import org.apache.hadoop.io.IntWritable;
739 import org.apache.hadoop.io.Text;
740 import org.apache.hadoop.mapred.FileInputFormat;
741 import org.apache.hadoop.mapred.FileOutputFormat;
742 import org.apache.hadoop.mapred.JobClient;
743 import org.apache.hadoop.mapred.JobConf;
744 import org.apache.hadoop.mapred.TextInputFormat;
745 import org.apache.hadoop.mapred.TextOutputFormat;
746 public class WC_Runner {
747 public static void main(String[] args) throws IOException{
748 JobConf conf = new JobConf(WC_Runner.class);
749 conf.setJobName("WordCount");
750 conf.setOutputKeyClass(Text.class);
751 conf.setOutputValueClass(IntWritable.class);
752 conf.setMapperClass(WC_Mapper.class);
753 conf.setCombinerClass(WC_Reducer.class);
754 conf.setReducerClass(WC_Reducer.class);
755 conf.setInputFormat(TextInputFormat.class);
756 conf.setOutputFormat(TextOutputFormat.class);
757 FileInputFormat.setInputPaths(conf,new Path(args[0]));
758 FileOutputFormat.setOutputPath(conf,new Path(args[1]));
759 JobClient.runJob(conf);
760 }
761 }
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831pyspark:
832
833
834Input file words.txt
835Hi
836My
837name
838is
839akshay
840How
841are
842you
843all
844Hi again
845Hello
846
847
848Pyspark frequency of each letter in document:
849>>>rdd2=sc.textFile("file:/home/cloudera/Desktop/words.txt").flatMap(lambda line:line.split())
850>>>rdd3=rdd2.map(lambda word:(word,1)).reduceByKey(lambda a,b:(a+b))
851>>> print(rdd3.collect())
852[(u'again', 1), (u'all', 1), (u'name', 1), (u'How', 1), (u'is', 1), (u'akshay', 1), (u'Hi', 2), (u'are', 1), (u'you', 1), (u'My', 1), (u'Hello', 1)]
853>>> for i in rdd3.collect():
854... print(i)
855...
856(u'again', 1)
857(u'all', 1)
858(u'name', 1)
859(u'How', 1)
860(u'is', 1)
861(u'akshay', 1)
862(u'Hi', 2)
863(u'are', 1)
864(u'you', 1)
865(u'My', 1)
866(u'Hello', 1)
867>>>
868
869Sorting
870>>> rdd4=rdd2.map(lambda word:(word,1)).reduceByKey(lambda a,b:(a+b))
871>>> rdd5=rdd4.sortBy(lambda a:a[0]).collect()
872>>> print(rdd5)
873[(u'Hello', 1), (u'Hi', 2), (u'How', 1), (u'My', 1), (u'again', 1), (u'akshay', 1), (u'all', 1), (u'are', 1), (u'is', 1), (u'name', 1), (u'you', 1)]
874>>> rdd6=rdd4.sortBy(lambda a:a[1]).collect()
875>>> print(rdd6)
876[(u'again', 1), (u'all', 1), (u'name', 1), (u'How', 1), (u'is', 1), (u'akshay', 1), (u'are', 1), (u'you', 1), (u'My', 1), (u'Hello', 1), (u'Hi', 2)]
877>>>
878
879Words with ‘h’
880>>> rdd1=sc.textFile("file:/home/cloudera/Desktop/words.txt").flatMap(lambda line:line.split())
881>>> rdd2=rdd1.filter(lambda word:word.startswith('H')).collect()
882>>> print(rdd2)
883[u'Hi', u'How', u'Hi', u'Hello']
884>>> print(len(rdd2))
8854
886
8874 letter word
888>>> rdd1=sc.textFile("file:/home/cloudera/Desktop/words.txt").flatMap(lambda line:line.split())
889>>> rdd2=rdd1.filter(lambda word:len(word)==4).collect()
890>>> print(rdd2)
891[u'name']
892>>> print(len(rdd2))
8931
894>>>
895
896
897
898
899Avarage Tempreture:
900Input file tempreture.txt
90123
90234
90343
90423
90534
90632
90726
90827
90928
91042
91129
912
913
914>>> rdd1=sc.textFile("file:/home/cloudera/Desktop/tempreture.txt").flatMap(lambda line:line.split())
915>>> sum=0
916>>> count=0
917>>> for i in rdd1.collect():
918... sum=sum+int(i)
919... count=count+1
920...
921>>> print(sum)
922341
923>>> print(count)
92411
925>>> print("average tempreture is",sum/count)
926('average tempreture is', 31)
927
928Pagerank:
929Input file pagerank.txt
930page1 page3
931page2 page1
932page4 page1
933page3 page1
934page4 page2
935page3 page4
936
937>>> def computeContribs(neighbors, rank):
938... for neighbor in neighbors: yield(neighbor, rank/len(neighbors))
939...
940>>> linkfile="file:/home/cloudera/Desktop/pagerank.txt"
941>>> links = sc.textFile(linkfile).map(lambda line: line.split())\
942... .map(lambda pages: (pages[0],pages[1]))\
943... .distinct()\
944... .groupByKey()\
945... .persist()
946
94719/10/31 00:19:00 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
948>>>
949>>> links = sc.textFile(linkfile).map(lambda line: line.split())
950>>> ranks=links.map(lambda (page,neighbors): (page,1.0))
951>>> n = 10
952>>> for x in xrange(n):
953... contribs=links\
954... .join(ranks)\
955... .flatMap(lambda (page,(neighbors,rank)): \
956... computeContribs(neighbors,rank))
957... ranks=contribs\
958... .reduceByKey(lambda v1,v2: v1+v2)\
959... .map(lambda (page,contrib): \
960... (page,contrib * 0.85 + 0.15))
961... print "Iteration ",x
962... for pair in ranks.take(10): print pair
963...
964
965Iteration 0
966(u'a', 1.8499999999999999)
967(u'1', 1.0)
968(u'3', 0.32000000000000001)
969(u'e', 1.8499999999999999)
970(u'g', 1.8499999999999999)
971(u'p', 1.8499999999999999)
972(u'2', 0.48999999999999999)
973(u'4', 0.48999999999999999)
974Iteration 1
975Iteration 2
976Iteration 3
977Iteration 4
978Iteration 5
979Iteration 6
980Iteration 7
981Iteration 8
982Iteration 9
983
984
985
986Clustering:
987
988data=sc.textFile("file:/home/training/Desktop/Admission_Predict.csv")
989import numpy
990from numpy import array
991from pyspark.mllib.clustering import KMeans
992header=data.first()
993header = sc.parallelize([header])
994data = data.subtract(header)
995parseData = data.map(lambda line: array([float(x) for x in line.split(',')])).cache()
996clusters = KMeans.train(parseData,3,maxIterations=15,runs=10,initializationMode='random')
997from math import sqrt
998def error(point):
999center = clusters.centers[clusters.predict(point)]
1000return sqrt(sum([x**2 for x in (point - center)]))
1001WSSSE = parseData.map(lambda point:error(point)).reduce(lambda x,y:x+y)
1002print('Within set sum of squared error = '+str(WSSSE))
1003Clusters.centers
1004
1005Matrix vector:
1006
1007JOIN
1008rdd1=sc.parallelize([('hadoop',4),('map',5)])
1009>>> rdd2=sc.parallelize([('hadoop',9),('map',10)])
1010>>> rdd1.join(rdd2)
1011PythonRDD[298] at RDD at PythonRDD.scala:43
1012>>> joined=rdd1.join(rdd2)
1013>>> joined.collect()
1014[('map', (5, 10)), ('hadoop', (4, 9))]
1015>>>