· 6 years ago · Sep 14, 2019, 10:58 AM
1import configparser
2
3
4# CONFIG
5config = configparser.ConfigParser()
6config.read('dwh.cfg')
7
8# DROP TABLES
9
10staging_events_table_drop = "DROP TABLE IF EXISTS staging_events"
11staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs"
12songplay_table_drop = "DROP TABLE IF EXISTS songplays"
13user_table_drop = "DROP TABLE IF EXISTS users"
14song_table_drop = "DROP TABLE IF EXISTS songs"
15artist_table_drop = "DROP TABLE IF EXISTS artists"
16time_table_drop = "DROP TABLE IF EXISTS time"
17
18# CREATE TABLES
19
20staging_events_table_create= ("""
21CREATE TABLE staging_events (
22 name varchar(50),
23 auth varchar(50),
24 firstName varchar(50),
25 gender varchar(1),
26 itemInSession integer,
27 lastName varchar(50),
28 length decimal,
29 level varchar(30),
30 location varchar(50),
31 method varchar(20),
32 page varchar(20),
33 registration decimal,
34 session_id integer,
35 song varchar(50),
36 status integer,
37 start_time integer,
38 user_agent varchar(200),
39 user_id integer
40)
41""")
42
43staging_songs_table_create = ("""
44CREATE TABLE staging_songs (
45 num_songs integer,
46 artist_id varchar(50) NOT NULL,
47 latitude varchar(50),
48 longitude varchar(50),
49 location varchar(50),
50 name varchar(50),
51 song_id varchar(50) NOT NULL,
52 title varchar(100),
53 duration numeric,
54 year integer
55)
56""")
57
58songplay_table_create = ("""
59CREATE TABLE songplays (
60 songplay_id INTEGER IDENTITY(0,1),
61 start_time timestamp,
62 user_id integer,
63 level varchar(30),
64 song_id varchar(50),
65 artist_id varchar(50),
66 session_id integer,
67 location varchar(50),
68 user_agent varchar(200)
69)
70""")
71
72user_table_create = ("""
73CREATE TABLE users (
74 user_id integer NOT NULL,
75 first_name varchar(30) NOT NULL,
76 last_name varchar(30) NOT NULL,
77 gender varchar(1) NOT NULL,
78 level varchar(20) NOT NULL
79)
80""")
81
82song_table_create = ("""
83CREATE TABLE songs (
84 song_id varchar(50) NOT NULL ,
85 title varchar(100) NOT NULL,
86 artist_id varchar(50) NOT NULL,
87 year integer,
88 duration numeric
89)
90""")
91
92artist_table_create = ("""
93CREATE TABLE artists (
94 artist_id varchar(50) NOT NULL,
95 name varchar(50) NOT NULL,
96 location varchar(50),
97 latitude varchar(50),
98 longitude varchar(50)
99)
100""")
101
102time_table_create = ("""
103CREATE TABLE time (
104 start_time timestamp NOT NULL,
105 hour integer NOT NULL,
106 day integer NOT NULL,
107 week integer NOT NULL,
108 month integer NOT NULL,
109 year integer NOT NULL,
110 weekday integer NOT NULL
111)
112""")
113
114# STAGING TABLES
115
116staging_events_copy = ("""
117 copy staging_events from {}
118 'aws_iam_role= arn:aws:iam::008088972242:role/myRedshiftRole'
119 JSON {};
120""").format(config['S3']['LOG_DATA'], config['S3']['LOG_JSONPATH'])
121
122
123'''
124staging_events_copy = ("""
125 copy staging_events from {}
126 iam_role {}
127 region 'us-west-2' compupdate off
128 JSON {};
129""").format(config['S3']['LOG_DATA'], config['IAM_ROLE']['ARN'], config['S3']['LOG_JSONPATH'])
130
131staging_songs_copy = ("""
132 copy staging_songs from {}
133 iam_role {}
134 region 'us-west-2' compupdate off ;
135""").format(config['S3']['SONG_DATA'], config['IAM_ROLE']['ARN'])
136
137'''
138
139staging_songs_copy = ("""
140 copy staging_songs from {}
141 'aws_iam_role= arn:aws:iam::008088972242:role/myRedshiftRole'
142 JSON {};
143""").format(config['S3']['SONG_DATA'], config['S3']['LOG_JSONPATH'])
144
145
146
147
148# FINAL TABLES
149
150songplay_table_insert = ("""
151INSERT INTO songplays (
152 start_time,
153 user_id,
154 level,
155 song_id,
156 artist_id,
157 session_id,
158 location,
159 user_agent
160)
161SELECT
162 events.start_time,
163 events.user_id,
164 events.level,
165 song.song_id,
166 song.artist_id,
167 events.session_id,
168 song.location,
169 events.user_agent
170FROM staging_events AS events
171JOIN staging_songs AS songs ON 1=1
172 AND events.name = songs.name
173 AND events.song = songs.title
174""")
175
176user_table_insert = ("""
177INSERT INTO users (
178 user_id,
179 first_name,
180 last_name,
181 gender,
182 level)
183SELECT
184 user_id,
185 firstName,
186 lastName,
187 gender,
188 level
189FROM staging_events
190""")
191
192song_table_insert = ("""
193INSERT INTO songs (
194 song_id,
195 title,
196 artist_id,
197 year,
198 duration
199 )
200SELECT
201 song_id,
202 title,
203 artist_id,
204 year,
205 duration
206FROM staging_songs
207""")
208
209artist_table_insert = ("""
210INSERT INTO artists (
211 artist_id,
212 name,
213 location,
214 latitude,
215 longitude
216 )
217SELECT
218 artist_id,
219 name,
220 location,
221 latitude,
222 longitude
223FROM stating_songs
224""")
225
226time_table_insert = ("""
227INSERT INTO time (
228 start_time,
229 hour,
230 day,
231 week,
232 month,
233 year,
234 weekday
235 )
236SELECT
237 TIMESTAMP 'epoch' + ts/1000 * interval '1 second' AS start_time,
238 HOUR(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS hour,
239 DAY(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS day,
240 WEEK(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS week,
241 MONTH(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS month,
242 YEAR(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS year,
243 WEEKDAY(TIMESTAMP 'epoch' + ts/1000 * interval '1 second') AS weekday
244""")
245
246# QUERY LISTS
247
248create_table_queries = [staging_events_table_create, staging_songs_table_create, songplay_table_create, user_table_create, song_table_create, artist_table_create, time_table_create]
249drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
250copy_table_queries = [staging_events_copy, staging_songs_copy]
251insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]