· 4 years ago · Jul 28, 2021, 06:14 PM
1import sys
2import boto3
3import json
4from awsglue.transforms import *
5from awsglue.utils import getResolvedOptions
6from pyspark.context import SparkContext
7from awsglue.context import GlueContext
8from awsglue.job import Job
9from awsglue.dynamicframe import DynamicFrame
10from pyspark.sql import functions as F
11
12## @params: [JOB_NAME]
13args = getResolvedOptions(sys.argv, ['JOB_NAME'])
14
15sc = SparkContext()
16glueContext = GlueContext(sc)
17spark = glueContext.spark_session
18spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
19job = Job(glueContext)
20job.init(args['JOB_NAME'], args)
21
22# Secret Manager
23secret_name = "mysql-cashier"
24region_name = "eu-central-1"
25
26# Create a Secrets Manager client
27session = boto3.session.Session()
28client = session.client(
29 service_name='secretsmanager',
30 region_name=region_name
31)
32
33get_secret_value_response = client.get_secret_value(
34 SecretId=secret_name
35)
36
37# Get user, password, etc
38secret_key = get_secret_value_response.get('SecretString')
39secret_key = json.loads(secret_key)
40db_username = secret_key.get('username')
41db_password = secret_key.get('password')
42jdbc_driver_name = secret_key.get('engine')
43db_host = secret_key.get('host')
44
45# Read config date tables
46date_config_df = glueContext.create_dynamic_frame.from_catalog(database = "datalake_output", table_name = "date_config_tbl")
47date_config_df = date_config_df.toDF()
48date_config_df.createOrReplaceTempView("date_config_tbl")
49
50# Take date for limited
51var_limited_date = \
52spark.sql('''select start_date s, end_date e from date_config_tbl where types ='REPORTS' ''').first()
53
54# Get data by SQL query
55query= "(select * from fund.tbl_real_fund_txn trft where c_updated_time between '{}' and '{} 23:59:59' ) as dbtable".format(var_limited_date['s'],var_limited_date['e'])
56
57df = spark.read \
58.format("jdbc") \
59.option("url", "jdbc:mysql://{}".format(db_host)) \
60.option("driver", "com.mysql.jdbc.Driver") \
61.option("dbtable", query) \
62.option("user", db_username) \
63.option("password", db_password).load()
64
65# Mapping columns
66# DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "datelake_input", table_name = "input_fund_tbl_real_fund_txn", transformation_ctx = "DataSource0")
67DataSource0 = DynamicFrame.fromDF(df, glueContext, "df")
68Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [
69 ("c_ecr_id", "long", "ecr_id", "long"),
70 ("c_txn_id", "long", "txn_id", "long"),
71 ("c_cancel_txn_id", "long", "cancel_txn_id", "long"),
72 ("c_session_id", "long", "session_id", "long"),
73 ("c_partner_id", "string", "partner_id", "string"),
74 ("c_label_id", "string", "label_id", "string"),
75 ("c_product_id", "string", "product_id", "string"),
76 ("c_sub_product_id", "string", "sub_product_id", "string"),
77 ("c_channel", "string", "channel", "string"),
78 ("c_sub_channel", "string", "sub_channel", "string"),
79 ("c_os_browser_type", "string", "os_browser_type", "string"),
80 ("c_initiator_ref_id", "string", "initiator_ref_id", "string"),
81 ("c_txn_type", "int", "txn_type", "int"),
82 ("c_game_type", "int", "game_type", "int"),
83 ("c_op_type", "string", "op_type", "string"),
84 ("c_agent_name", "string", "agent_name", "string"),
85 ("c_txn_status", "string", "txn_status", "string"),
86 ("c_error_code", "string", "error_code", "string"),
87 ("c_error_desc", "string", "error_desc", "string"),
88 ("c_sub_funds_order", "string", "sub_funds_order", "string"),
89 ("c_start_time", "timestamp", "start_time", "timestamp"),
90 ("c_updated_by", "string", "updated_by", "string"),
91 ("c_updated_time", "timestamp", "updated_time", "timestamp"),
92 ("c_id", "long", "id", "long"),
93 ("c_prev_balance", "long", "prev_balance", "long"),
94 ("c_current_balance", "long", "current_balance", "long"),
95 ("c_amount_in_ecr_ccy", "long", "amount_in_ecr_ccy", "long"),
96 ("c_ecr_ccy", "string", "ecr_ccy", "string"),
97 ("c_comments", "string", "comments", "string"),
98 ("c_is_cancelled", "boolean", "is_cancelled", "boolean"),
99 ("c_session_close_time", "timestamp", "session_close_time", "timestamp"),
100 ("c_sub_wallet_txn_status", "string", "sub_wallet_txn_status", "string"),
101 ("c_is_sub_wallet_txn", "boolean", "is_sub_wallet_txn", "boolean"),
102 ("c_sub_wallet_error_code", "string", "sub_wallet_error_code", "string"),
103 ("c_session_id_at_vendor", "string", "session_id_at_vendor", "string"),
104 ("c_game_id", "string", "game_id", "string"),
105 ("c_game_category", "int", "game_category", "int"),
106 ("c_custom1", "string", "custom1", "string"),
107 ("c_custom2", "string", "custom2", "string"),
108 ("c_custom3", "string", "custom3", "string"),
109 ("c_custom_json", "string", "custom_json", "string"),
110 ("c_sub_vendor_id", "string", "sub_vendor_id", "string"),
111 ("c_jackpot_contribution_in_txn_ccy", "double", "jackpot_contribution_in_txn_ccy", "double")
112 ], transformation_ctx = "Transform0")
113
114# Add sys/partitions columns
115df = Transform0.toDF()
116df = (
117 df
118 .withColumn('years', F.year('updated_time'))
119 .withColumn('months', F.month('updated_time'))
120 .withColumn('days', F.dayofmonth('updated_time'))
121 .withColumn('dates', F.to_date('updated_time'))
122 .withColumn('hours', F.hour('updated_time'))
123 .withColumn('sys_updated_by', F.lit(args['JOB_NAME']))
124 .withColumn('sys_updated_time', F.from_utc_timestamp(F.current_timestamp(),"Europe/Kiev"))
125)
126
127# # Read config date tables
128# date_config_df = glueContext.create_dynamic_frame.from_catalog(database = "datalake_output", table_name = "date_config_tbl")
129# date_config_df = date_config_df.toDF()
130# date_config_df.createOrReplaceTempView("date_config_tbl")
131
132# # Take date for limited
133# var_limited_date = \
134# spark.sql('''select date_add(start_date, 4) s, end_date e from date_config_tbl where types ='REPORTS' ''').first()
135
136# # Filter df
137# df = df.where((F.col("dates").cast('string') >= F.lit(var_limited_date['s'])) \
138# & (F.col("dates").cast('string') <= F.lit(var_limited_date['e'])))
139
140# Write df
141datasink2 = (
142 df
143 .repartition("dates", 'hours')
144 .write
145 .mode("overwrite")
146 .format("parquet")
147 .partitionBy("partner_id", "years", "months", "days", "dates", 'hours')
148 .save('s3://datalake-analyst/fund/fund_txn_tbl')
149 )
150job.commit()