· 6 years ago · May 13, 2019, 04:30 PM
1from cassandra.cluster import Cluster
2from cassandra.query import dict_factory
3import json
4import numpy as np
5import pandas as pd
6
7
8class Movie:
9 def __init__(self, movieID, genres):
10 self.movieID = movieID
11 self.genres = genres
12
13
14class CassandraTEST(object):
15 keyspace = "user_ratings"
16 cluster = Cluster(['127.0.0.1'], port=9042)
17 session = cluster.connect()
18
19 def __init__(self):
20
21 self.create_keyspace(self.session, self.keyspace)
22
23 self.session.set_keyspace(self.keyspace)
24
25 self.session.row_factory = dict_factory
26
27 self.session.execute("""
28 CREATE TABLE IF NOT EXISTS """ + self.keyspace + """.""" + 'user_rating' + """(
29 userid int,
30 movieID int,
31 rating float,
32 genreAction int,
33 genreAdventure int,
34 genreHorror int,
35 genreWar int,
36 PRIMARY KEY(userid, movieID)
37 )
38 """)
39
40 self.session.execute("""
41 CREATE TABLE IF NOT EXISTS """ + self.keyspace + """.""" + 'user_profile' + """(
42 userid int,
43 genreAction float,
44 genreAdventure float,
45 genreHorror float,
46 genreWar float,
47 PRIMARY KEY(userid)
48 )
49 """)
50
51 movies = [Movie(0, {'genreAction': 1,
52 'genreAdventure': 1,
53 'genreHorror': 0,
54 'genreWar': 0}), Movie(1, {'genreAction': 1,
55 'genreAdventure': 0,
56 'genreHorror': 1,
57 'genreWar': 0}), Movie(2, {'genreAction': 1,
58 'genreAdventure': 0,
59 'genreHorror': 0,
60 'genreWar': 1})]
61 genres = ['genreaction', 'genreadventure', 'genrehorror', 'genrewar']
62
63 def create_keyspace(self, session, keyspace):
64 session.execute("""
65 CREATE KEYSPACE IF NOT EXISTS """+keyspace+"""
66 WITH replication = {'class':'SimpleStrategy', 'replication_factor':'1'}
67 """)
68
69 def create_table(self, session, keyspace, table):
70 session.execute("""
71 CREATE TABLE IF NOT EXISTS """+keyspace+"""."""+table+"""(
72 user_id int,
73 avg_movie_rating float,
74 PRIMARY KEY(user_id)
75 )
76 """)
77
78 def push_data_table(self, session, keyspace, table, userid, avgMovieRating):
79 session.execute("""
80 INSERT INTO """+keyspace+"""."""+table+""" (user_id, avg_movie_rating)
81 VALUES (%(user_id)s, %(avg_movie_rating)s)
82 """,
83 {
84 'user_id': userid,
85 'avg_movie_rating': avgMovieRating
86 })
87
88 def get_data_table(self, session, keyspace, table):
89 rows = session.execute("SELECT * FROM "+keyspace+"."+table+";")
90 for row in rows:
91 print(row)
92
93 def clear_table(self, session, keyspace, table):
94 session.execute("TRUNCATE "+keyspace+"."+table+";")
95
96 def delete_table(self, session, keyspace, table):
97 session.execute("DROP TABLE "+keyspace+"."+table+";")
98
99 def push_user_rating(self, session, userid, movieID, rating):
100 userid = int(userid)
101 movieID = int(movieID)
102 rating = float(rating)
103 temp = Movie(-1,{})
104 for m in self.movies:
105 if m.movieID == movieID:
106 temp = m
107 break
108 if temp.movieID == -1:
109 return
110
111 session.execute("""
112 INSERT INTO """ + self.keyspace + """.""" + 'user_rating' + """ (userid, movieID, rating, genreAction, genreAdventure, genreHorror, genreWar)
113 VALUES (%(userid)s, %(movieID)s, %(rating)s, %(genreAction)s, %(genreAdventure)s, %(genreHorror)s, %(genreWar)s)
114 """,
115 {
116 'userid': userid,
117 'movieID': movieID,
118 'rating': rating,
119 'genreAction': temp.genres['genreAction'],
120 'genreAdventure': temp.genres['genreAdventure'],
121 'genreHorror': temp.genres['genreHorror'],
122 'genreWar': temp.genres['genreWar']
123 })
124
125 def push_user_profile(self, session, userid):
126 userid = int(userid)
127 temp = self.get_user_profile(session, userid)
128 session.execute("""
129 INSERT INTO """ + self.keyspace + """.""" + 'user_profile' + """ (userid, genreaction, genreadventure, genrehorror, genrewar)
130 VALUES (%(userid)s, %(genreaction)s, %(genreadventure)s, %(genrehorror)s, %(genrewar)s)
131 """,
132 {
133 'userid': userid,
134 'genreaction': temp['genreaction'],
135 'genreadventure': temp['genreadventure'],
136 'genrehorror': temp['genrehorror'],
137 'genrewar': temp['genrewar']
138 })
139
140
141 def get_user_rating(self, session, userid):
142 userid = str(userid)
143 ret = []
144 rows = session.execute("SELECT * FROM "+self.keyspace+".user_rating WHERE userid ="+userid+";")
145 for row in rows:
146 ret.append(row)
147 return json.dumps(ret)
148
149 def get_all_users_rating(self, session):
150 ret = []
151 rows = session.execute("SELECT * FROM "+self.keyspace+".user_rating;")
152 for row in rows:
153 ret.append(row)
154 return json.dumps(ret)
155
156 def dict_to_df(self, lista):
157 return pd.DataFrame.from_dict(lista)
158
159 def df_to_dict(self, df):
160 ret = []
161 for row in df.iterrows():
162 ret.append(row[1].to_dict())
163 return ret
164
165 def get_mean_ratings(self, df):
166 list = {}
167 for header in self.genres:
168 dft = df.loc[df[header] == 1]
169 if len(dft) > 0:
170 v = np.nanmean(dft['rating'])
171 list[header] = round(v, 2)
172 else:
173 list[header] = 0
174 return list
175
176 def calculate_user_profile(self, dict1, dict2):
177 ret = {}
178 for k, v in dict1.items():
179 if v != 0:
180 ret[k] = v-dict2[k]
181 else:
182 ret[k] = 0.0
183
184 return ret
185
186 def get_users_avg(self, session):
187 ret = []
188 rows = session.execute("SELECT * FROM " + self.keyspace + ".user_rating;")
189 for row in rows:
190 ret.append(row)
191 ret = self.get_mean_ratings(self.dict_to_df(ret))
192
193 return ret
194
195 def get_user_avg(self, session, userid):
196 userid = str(userid)
197 ret = []
198 rows = session.execute("SELECT * FROM "+self.keyspace+".user_rating WHERE userid ="+userid+";")
199 for row in rows:
200 ret.append(row)
201 ret = self.get_mean_ratings(self.dict_to_df(ret))
202
203 return ret
204
205 def get_user_profile(self, session, userid):
206 userid = str(userid)
207 return self.calculate_user_profile(self.get_user_avg(session, userid), self.get_users_avg(session))
208
209 def random_data_push(self, session):
210 self.push_user_rating(self.session, 0, 0, 2.0)
211 self.push_user_rating(self.session, 0, 1, 4.2)
212 self.push_user_rating(self.session, 1, 1, 3.4)
213 self.push_user_rating(self.session, 2, 2, 1.5)
214
215# if __name__ == '__main__':
216#
217# cass = CassandraTEST()
218# #cass.delete_table(cass.session, cass.keyspace, 'user_profile')
219# #cass.random_data_push(cass.session)
220# print(cass.get_all_users_rating(cass.session))
221# print(cass.get_user_profile(cass.session, '0'))
222# cass.push_user_profile(cass.session, '0')