· 6 years ago · Oct 08, 2019, 08:40 PM
1# define function of saving pyspark dataframe
2def save_pyspark_dataframe(df_data, folder, format='csv'):
3 saved_folder = hdfs_base_uri + '/' + folder
4 if format == 'csv':
5 df_data.write.csv(saved_folder, header=True)
6 elif format == 'parquet':
7 df_data.write.parquet(saved_folder)
8 return saved_folder
9# user table fields: 'id', 'email', 'first_name', 'last_name', 'created_date', 'modified_date'
10# generate 100 users
11user_schema = ['id', 'email', 'first_name', 'last_name', 'date_of_birth', 'created_date', 'modified_date']
12user_data = [(i, 'test_' + str(i) + '@gmail.com', 'fname_' + str(i%10+1), 'lname_' + str(i%10+1), datetime(1960+i%20, i%12+1, i%25+1),
13 datetime(2019, 8, i%30+1), datetime(2019, 9, i%30+1)) for i in range(1,101)]
14# transaction table fields: 'id', 'amount', 'created_date'
15# generate 1000 transactions
16transaction_schema = ['id', 'amount', 'created_date']
17transaction_data = [(i, i%10+1, datetime(2019, 9, i%30+1)) for i in range(1,1001)]
18# user transaction relationship table fields: 'user_id', 'transaction_id'
19user_transaction_schema = ['user_id', 'transaction_id']
20user_transaction_data = [(i%100+1, i+1) for i in range(1000)]
21# Create pyspark Dataframe from fake data
22df_user = spark_session.createDataFrame(user_data, user_schema)
23df_transaction = spark_session.createDataFrame(transaction_data, transaction_schema)
24df_user_transaction = spark_session.createDataFrame(user_transaction_data, user_transaction_schema)
25# save user, transaction and user transaction dataframes as parquet files
26save_pyspark_dataframe(df_user, user_parq_folder, 'parquet')
27save_pyspark_dataframe(df_transaction, transaction_parq_folder, 'parquet')
28save_pyspark_dataframe(df_user_transaction, user_transaction_parq_folder, 'parquet')
29# save user, transaction and user transaction dataframes as csv files
30save_pyspark_dataframe(df_user, user_csv_folder, 'csv')
31save_pyspark_dataframe(df_transaction, transaction_csv_folder, 'csv')
32save_pyspark_dataframe(df_user_transaction, user_transaction_csv_folder, 'csv')