· 7 years ago · Oct 14, 2018, 05:36 AM
1from __future__ import print_function
2from pyspark.sql.functions import col, udf,when, split, lit, regexp_extract, regexp_replace, coalesce, date_format
3from pyspark import SparkContext, SparkConf, SparkFiles
4from pyspark.sql import SparkSession
5from pyspark.sql import HiveContext, Row
6from pyspark.sql.functions import explode
7from pyspark.sql.types import *
8import subprocess
9import pandas as pd
10import datetime
11import sys
12import re
13import ast
14#A.Y Add --Start--
15import os
16#A.Y Add --End--
17
18## INIT VARS
19
20sparkconf = SparkConf().setAppName('ingestion')
21sc = SparkContext(conf=sparkconf)
22spark = SparkSession.builder.config(conf=sparkconf).enableHiveSupport().getOrCreate()
23
24spark.sql("SET hive.exec.dynamic.partition = true")
25spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
26
27if len(sys.argv) > 3:
28 tables = sys.argv[2]
29 datasource = sys.argv[1]
30 date = sys.argv[3]
31elif len(sys.argv) > 2:
32 tables = sys.argv[2]
33 datasource = sys.argv[1]
34 date = None
35else:
36 exit(1)
37
38
39def eprint(*args, **kwargs):
40 print(*args, file=sys.stderr, **kwargs)
41
42extract_date = lambda x : datetime.datetime.strptime(re.split('_|\.',x)[-2],'%Y%m%d') if len(re.split('_|\.',x)[-2]) == 8 else datetime.datetime.strptime(re.split('_|\.',x)[-2],'%Y%m%d%H%M')
43extract_date_str = lambda x : re.split('_|\.',x)[2]
44
45def run_cmd(args_list):
46 """
47 run linux commands
48 """
49 eprint('Running system command: {0}'.format(' '.join(args_list)))
50 proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
51 s_output, s_err = proc.communicate()
52 s_return = proc.returncode
53 return s_return, s_output, s_err
54
55def check_cmd(output):
56 if output:
57 if output[0] > 0:
58 eprint("An error occured while executing the command")
59 eprint(output[2])
60 else:
61 return output[1].decode("utf-8")
62 return None
63
64def mk_dir_hdfs(folders):
65 cmd = ["hadoop","fs","-mkdir"]
66 cmd.append(folders)
67 return run_cmd(cmd)
68
69def copy_to_hdfs(src, dst):
70 cmd = ["hdfs","dfs","-copyFromLocal", src, dst]
71 return run_cmd(cmd)
72
73def fix_permission_hdfs(folder, permission):
74 cmd = ["hdfs","dfs","-chmod","-R", permission, folder]
75 return run_cmd(cmd)
76
77def getPDataFrameFrmFile(path):
78 try:
79 return spark.read.csv(path, header='true', inferSchema='true').toPandas()
80 except Py4JJavaError as e:
81 eprint("Error trying to read conf File or Action File. Please check your args : " + e)
82 return None
83
84# date arg should be in format YYYY-MM-DD
85def findFilesToProcess(location, date, processedFilesRDD):
86 files = []
87 args = ["aws","s3","ls", location]
88 eprint(args)
89 if date:
90 proc1 = subprocess.Popen(args,stdout=subprocess.PIPE, bufsize=-1)
91 hdfsFiles = subprocess.Popen(['grep', date], stdin=proc1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.read().decode('utf-8')
92 else:
93 out = check_cmd(run_cmd(args))
94 hdfsFiles = out if out else None
95
96# processedFilesSet = {i[0] for i in processedFilesRDD}
97
98 if not hdfsFiles:
99 return None
100
101 for i in hdfsFiles.splitlines():
102 if i.split(' ')[-2] == '0' or i.split(' ')[-2] == 'PRE':
103 continue
104 if i.split(' ')[-1] == '':
105 continue
106
107 filePath = "s3://" + location + i.split(' ')[-1]
108# if filePath in processedFilesSet:
109# continue
110 files.append(filePath)
111
112 return files
113
114def convertCol(col, dataType):
115 newCol = col.cast(dataType)
116 if isinstance(dataType, StringType):
117 return when(newCol != '',newCol).otherwise(None)
118 return newCol
119
120
121# srcfolder can use hdfs:///
122def renameHDFSFile(srcfolder, filename):
123 args = ["hdfs","dfs","-ls", "-R", srcfolder]
124 files = None
125 res = check_cmd(run_cmd(args))
126 if not res:
127 return None
128 files = res
129 index = 0
130 for l in files.splitlines():
131 srcfile = l.split(" ")[-1]
132 currentfilename = srcfile.split("/")[-1]
133 srcpath = srcfile[:-len(currentfilename)]
134
135
136 eprint("IN RENAME FOR : " +l)
137 if "SUCCESS" in srcfile:
138 continue
139 if "hive-staging_hive" in srcfile:
140 continue
141 if "full_load" in currentfilename:
142 continue
143 if l[0] == "d":
144 continue
145
146 moved = False
147 while(moved == False):
148 dstfile = filename + "_{}".format(str(index))
149 index = index + 1
150 args = ["hdfs","dfs","-mv",srcfile, srcpath + dstfile]
151 feedback = run_cmd(args)
152 if "File exists" in feedback[2].decode("utf-8"):
153 continue
154 else:
155 moved = True
156
157def createDatabase(name):
158 for db in spark.sql("SHOW databases").collect():
159 if db.databaseName == name:
160 return
161 eprint("ACTION CREATING DB : "+ name)
162 spark.sql("CREATE DATABASE " + name)
163
164def dropDatabase(name):
165 spark.catalog.setCurrentDatabase(name)
166 eprint("INFO : DROPPING TABLES IN DB : "+ name)
167 for table in spark.catalog.listTables():
168 spark.sql("DROP TABLE " + table.name)
169 spark.sql("DROP DATABASE " + name)
170 eprint("INFO FINISHED DROPPING DB : " + name)
171
172def createTable(database, tablename, ddl):
173 if database != spark.catalog.currentDatabase() and len(database) > 0:
174 createDatabase(database)
175 spark.catalog.setCurrentDatabase(database)
176
177 for table in spark.catalog.listTables():
178 if table.name == tablename:
179 eprint("INFO Table : " + database + "." + tablename + " already exist")
180 return 1
181 eprint("ACTION CREATING TABLE : " + database + "." + tablename )
182
183 try:
184 res = spark.sql(ddl)
185 eprint(res)
186 return 1
187 except Exception as e:
188 eprint("SQL ERROR: Failed to create table : " + database + "." + tablename)
189 return 0
190
191def processFile(database, filepath, destinationTable,tableSchema,mode, tableType, partition, partitionCols):
192
193 df = None
194
195 if database == "":
196 df = spark.read.csv(filepath, header='false', inferSchema='false', encoding = "x-MS932_0213", sep="\0")
197 split_col = split(df['_c0'], '\|\|')
198 for e, i in enumerate(tableSchema.fields):
199 df = df.withColumn(i.name, convertCol(split_col.getItem(e),i.dataType))
200 if(partition == "yes" and tableType == "delta"):
201 cols = partitionCols.split(",")
202 df = df.withColumn("partition_date", date_format(coalesce(df[cols[0]], df[cols[1]]), 'yyyy-MM'))
203 if(partition == "yes" and tableType == "full"):
204 fileDate = extract_date(filepath)
205 df = df.withColumn("partition_date",lit(fileDate.strftime('%Y-%m')))
206 df.select(tableSchema.names).repartition(1).write.mode(mode).insertInto(destinationTable)
207
208 else database == "":
209 fileName = filepath.split('/')[-1]
210 df = spark.read.schema(tableSchema).csv(filepath, header='true')
211 fileDate = extract_date(fileName)
212 df = df.withColumn("extractiondate",lit(fileDate))
213 df.select(tableSchema.names).coalesce(1).write.mode(mode).insertInto(destinationTable)
214
215 return 0
216
217def ingestTable(database, table, conf, date, processedFilesRDD):
218 if conf.empty:
219 return None
220 if table == "":
221 return None
222 if not conf["createtableemr"]:
223 eprint("WARNING : createtable emr is empty")
224 return None
225
226 location = conf["bucket"] + conf["rawkey"]
227 filesToProcess = findFilesToProcess(location, date, processedFilesRDD)
228 schema = None
229 if not filesToProcess:
230 eprint("WARNING : No file to Process")
231 return 0
232
233 emrdatabase = conf["databasecleansedemr"].replace("emr_", "emr_full_")
234 eprint("CREATE TABLE " + emrdatabase + "." + conf["table"])
235 eprint("DDL : " + conf["createtableemr"].replace("emr_", "emr_full_"))
236 if(createTable(emrdatabase, conf["table"], conf["createtableemr"].replace("emr_", "emr_full_"))):
237 schema = spark.table(emrdatabase + "." + conf["table"]).schema
238 else:
239 return None
240
241 start = datetime.datetime.now()
242 processFile(database, "hdfs:///" + location, emrdatabase + "." + conf["table"],schema,"append",conf["extractionType"],conf["partition"],conf["partitionColumns"])
243
244#A.Y Add --Start--
245 destinationTable = emrdatabase + "." + conf["table"]
246#A.Y Add --End--
247 renameHDFSFile(conf["cleansedtablelocationemr"].replace("emr_", "emr_full_"), table + "_full_load")
248 end = datetime.datetime.now()
249 processedFilesList = list()
250 duration = (end-start).total_seconds()
251
252 for file in filesToProcess:
253#A.Y Add --Start--
254 fileName = os.path.basename(file)
255 filepath = os.path.dirname(file)
256#A.Y Add --End--
257 processedFilesList.append(\
258 tuple((database, fileName, filepath, destinationTable, start,end, duration)))
259
260 processedFilesRDD = sc.parallelize(processedFilesList)
261 processedFilesDF = processedFilesRDD.toDF(schema=spark.table("emr_full_" + database + ".t_processed_files").schema)
262 processedFilesDF.coalesce(1).write.mode("append").insertInto("emr_full_" + database + ".t_processed_files")
263
264
265if __name__=='__main__':
266 routingDf = getPDataFrameFrmFile(pathRoutingTable)
267
268 mk_dir_hdfs("/app")
269 fix_permission_hdfs("/app", "777")
270
271 emrdatabase = "emr_full_" + datasource
272 createDatabase(emrdatabase)
273
274 ddl = routingDf[(routingDf["table"] == "t_processed_files") & (routingDf["datasource"] == datasource)].iloc[0]["createtableemr"]
275 createTable(emrdatabase, "t_processed_files", ddl.replace("emr_", "emr_full_"))
276
277# processedFilesRDD = spark.sql("select filepath from _{0}.t_processed_files".format(datasource)).collect()
278 processedFilesRDD = None
279
280 if tables == "all":
281 tables = list(routingDf[routingDf["datasource"] == datasource].table.unique())
282 for table in tables:
283 if table:
284 conf = routingDf[routingDf["table"] == table]
285 if conf.empty:
286 continue
287 ingestTable(datasource, table, conf.iloc[0], date, processedFilesRDD)
288 else:
289 for table in tables.split(","):
290 conf = routingDf[routingDf["table"] == table]
291 if conf.empty:
292 continue
293 ingestTable(datasource, table, conf.iloc[0], date, processedFilesRDD)