· 6 years ago · May 14, 2019, 11:46 AM
1import wtiproj06_ETL_and_data_processing as etl
2from cassandra.cluster import Cluster
3from cassandra.query import dict_factory
4
5KEYSPACE = 'ratings_keyspace'
6USER_TABLE = 'user_rated_movies'
7AVG_GENRE_RATINGS_FOR_USER_TABLE = 'avg_genre_ratings_for_user'
8
9
10class CassandraClient:
11 def __init__(self):
12 cluster = Cluster(['127.0.0.1'], port=9042)
13 self.session = cluster.connect()
14 self.create_keyspace()
15 self.session.set_keyspace(KEYSPACE)
16 self.session.row_factory = dict_factory
17
18 def create_keyspace(self):
19 self.session.execute("""
20CREATE KEYSPACE IF NOT EXISTS """ + KEYSPACE + """
21WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
22""")
23
24 def create_table_query(self, keyspace, table, primaryKey, additionalColumns):
25 query = """ CREATE TABLE IF NOT EXISTS """ + \
26 keyspace + \
27 table + \
28 """ ( """
29 for column_name, vtype in additionalColumns.items():
30 query = query + str(column_name) + ' ' + str(vtype) + ','
31 query = query + 'PRIMARY KEY(' + primaryKey + ')'
32 query = query + \
33 """ ) """
34
35 return query
36
37 def query_helper(self, tableName):
38 _, genres_column_names = etl.get_UserRatedMoviesWithMovieGenres(1)
39 genres_num = len(genres_column_names)
40 genres_column_names = [genre.replace('-', '_') for genre in genres_column_names]
41 if USER_TABLE == tableName:
42 genres_column_names.insert(0, "rating")
43 genres_column_names.insert(0, "movie_id")
44 genres_column_names.insert(0, "user_id")
45 types = ['int', 'int', 'float']
46 for _ in range(genres_num):
47 types.append('int')
48 else:
49 genres_column_names.insert(0, "user_id")
50 types = ['int']
51 for _ in range(genres_num):
52 types.append('float')
53 return dict(zip(genres_column_names, types))
54
55 def create_avg_genre_ratings_for_user_table(self):
56 query = self.create_table_query(KEYSPACE, AVG_GENRE_RATINGS_FOR_USER_TABLE, 'user_id',
57 self.query_helper(AVG_GENRE_RATINGS_FOR_USER_TABLE))
58
59 self.session.execute(query)
60
61 def create_user_rated_movies_table(self):
62 query = self.create_table_query(KEYSPACE, USER_TABLE, 'user_id, movie_id', self.query_helper(USER_TABLE))
63 self.session.execute(query)
64
65 def clear_table(self, tableName):
66 self.session.execute("TRUNCATE" + KEYSPACE + ' ' + tableName + ';')
67
68 def get_row(self, userID, tableName):
69 rows = self.session.execute(
70 'SELECT * FROM' + KEYSPACE + ' ' + tableName + 'WHERE user_id=' + str(userID) + ';')
71 self.printout_table(rows)
72
73 def get_first_n_elements(self, n, tableName):
74 rows = self.session.execute('SELECT * FROM' + KEYSPACE + " " + tableName + " LIMIT " + str(n) + ';')
75 return self.convert_rows(rows,tableName)
76
77 # pobranie wszystkich elementów tablicy
78 def get_all(self, tableName):
79 rows = self.session.execute("SELECT * FROM " + KEYSPACE + "." + "tableName" + ";")
80
81 # przekształcenie danych pobranych z tabeli do obiektu dict z odpowiednimi kluczami
82 def convert_rows(self, rows, tableName):
83 _, genres_column_names = etl.get_UserRatedMoviesWithMovieGenres(1)
84 if USER_TABLE == tableName:
85 genres_column_names.insert(0, "rating")
86 genres_column_names.insert(0, "movieID")
87 genres_column_names.insert(0, "userID")
88 else:
89 genres_column_names.insert(0, "userID")
90
91 output = []
92 for row in rows:
93 values = list(row.values())
94 if USER_TABLE == tableName:
95 values.insert(2, values[-1])
96 output.append(dict(zip(genres_column_names, values)))
97
98 return output
99
100 # dodawanie usera do tabeli user_rated_movies
101 def push_row_to_user_rated_movies(self, data):
102 self.session.execute(
103 """
104 INSERT INTO """ + KEYSPACE + """.""" + USER_TABLE + """ (user_id, movie_id, rating,
105 genre_Action, genre_Adventure, genre_Animation, genre_Children, genre_Comedy, genre_Crime,
106 genre_Documentary, genre_Drama, genre_Fantasy, genre_Film_Noir, genre_Horror, genre_IMAX,
107 genre_Musical, genre_Mystery, genre_Romance, genre_Sci_Fi, genre_Short, genre_Thriller, genre_War,
108 genre_Western)
109 VALUES (%(userID)s, %(movieID)s, %(rating)s, %(genre-Action)s, %(genre-Adventure)s,
110 %(genre_Animation)s, %(genre_Children)s, %(genre_Comedy)s, %(genre_Crime)s, %(genre_Documentary)s,
111 %(genre_Drama)s, %(genre_Fantasy)s, %(genre_Film_Noir)s, %(genre_Horror)s, %(genre_IMAX)s,
112 %(genre_Musical)s, %(genre_Mystery)s, %(genre_Romance)s, %(genre_Sci_Fi)s, %(genre_Short)s,
113 %(genre_Thriller)s, %(genre_War)s, %(genre_Western)s)
114 """,
115 {
116 'userID': data[0],
117 'movieID': data[1],
118 'rating': data[2],
119 'genre-Action': data[3],
120 'genre_Adventure': data[4],
121 'genre_Animation': data[5],
122 'genre_Children': data[6],
123 'genre_Comedy': data[7],
124 'genre_Crime': data[8],
125 'genre_Documentary': data[9],
126 'genre_Drama': data[10],
127 'genre_Fantasy': data[11],
128 'genre_Film_Noir': data[12],
129 'genre_Horror': data[13],
130 'genre_IMAX': data[14],
131 'genre_Musical': data[15],
132 'genre_Mystery': data[16],
133 'genre_Romance': data[17],
134 'genre_Sci_Fi': data[18],
135 'genre_Short': data[19],
136 'genre_Thriller': data[20],
137 'genre_War': data[21],
138 'genre_Western': data[22],
139 }
140 )
141
142def push_row_to_avg_genre_ratings_for_user_table(self, data):
143 self.session.execute(
144 """
145 INSERT INTO """ + KEYSPACE + """.""" + AVG_GENRE_RATINGS_FOR_USER_TABLE + """
146 (user_id, genre_Action, genre_Adventure, genre_Animation, genre_Children, genre_Comedy, genre_Crime
147 , genre_Documentary, genre_Drama, genre_Fantasy, genre_Film_Noir, genre_Horror, genre_IMAX, genre_Musical
148 , genre_Mystery, genre_Romance, genre_Sci_fi, genre_Short, genre_Thriller, genre_War, genre_Western)
149 VALUES
150 (%(userID)s, %(genre-Action)s, %(genre-Adventure)s, %(genre-Animation)s, %(genre_Children)s, %(genre_Comedy)s, %(genre_Crime
151 )s, %(genre_Documentary)s, %(genre_Drama)s, %(genre_Fantasy)s, %(genre_Film_Noir)s, %(genre_Horror)s, %(genre_IMAX)s, %(genre_Musical
152 )s, %(genre_Mystery)s, %(genre_Romance)s, %(genre_Sci_fi)s, %(genre_Short)s, %(genre_Thriller)s, %(genre_War)s, %(genre_Western)s
153 """, data)
154
155
156 def push_dict_to_table(self, userDict, avg_table=False):
157 # userDict = json.loads(userDict) # nie jest uzywane przy polaczeniu do API
158 if avg_table:
159 self.push_row_to_avg_genre_ratings_for_user_table(userDict) # ([value for value in userDict.values()])
160 else:
161 self.push_row_to_user_rated_movies([value for value in userDict.values()])
162
163 def load_test_json(self, userID):
164 TEST_DICT = {} # do skopiowania z kodu wczesniej uzywanego
165 # TEST_JSON = json.dumps(TEST_DICT)
166 # self.push_dict_to_table(TEST_JSON)
167 self.push_dict_to_table(TEST_DICT)
168
169 def load_test_json2(self, userID):
170 TEST_DICT = {} # do skopiowania
171 # TEST_JSON = json.dumps(TEST_DICT)
172 # self.push_dict_to_table(TEST_JSON, avg_table=True)
173 self.push_dict_to_table(TEST_DICT, avg_table=True)
174
175 def parse_avg_genre_ratings_for_user(self, avg_list, id):
176 _, genres_column_names = etl.get_UserRatedMoviesWithMovieGenres(1)
177 outputDict = {}
178 outputDict['userID'] = int(id)
179 for index, genre in enumerate(genres_column_names):
180 outputDict[genre] = avg_list[index]
181
182 return outputDict
183
184 def printout_table(self, rows):
185 # rows = self.session.execute("SELECT * FROM " + KEYSPACE + "." + USER_TABLE + ";")
186 for user_row in rows:
187 print(user_row)
188
189 def delete_table(self):
190 self.session.execute("DROP TABLE " + KEYSPACE + "." + USER_TABLE + ";")
191
192 if __name__ == "__main__":
193 c = CassClient()
194 c.create_user_rated_movies_table
195 c.create_avg_genre_ratings_for_user_table()
196 c.load_test_json2(79)
197 for i in range(0, 10):
198 c.load_test_json(i)
199 c.printout_table(c.get_all(USER_TABLE))
200 c.get_row(75, USER_TABLE)
201 c.printout_table(c.get_first_n_elements(7, USER_TABLE))
202 c.clear_table(USER_TABLE)
203 c.printout_table(c.get_all(USER_TABLE))