· 6 years ago · Nov 10, 2019, 07:36 AM
1os.chdir('/home/busportal/Downloads/wifiProject/wifiProject')
2allDevices = [ "104" ,"105" , "902" , "903","102" , "103" ,"901" , "113" ]
3devicePaired = [
4 ["104" , "105" , "000M1D" , "000M1U" ,761 , 647],
5 ["902" ,"903" , "0019BU" , "0019BD" , 587 , 833 ],
6 ["102" , "103" , "0005AD" , "0005AU" , 801 , 803 ],
7 ["901" , "104" , "00M70D" , "00M70U" , 942 , 858],
8 ["902" , "113" , "00519D" , "00519U" , 820 , 856 ],
9 ["901" , "105" , "00570U" , "00570D" , 960 , 1025],
10 ["103" , "104" , "34M70D" , "34M70U" , 820 , 824 ],
11 ["102" ,"901" , "00570U" , "00D70D" , 819 , 724]
12]
13
14
15class wifiProject :
16
17 def createDbIfNotAvail(self , devicePair ):
18 for each in devicePair:
19 Query = wifiProject.createTableQuery(each)[0]
20 cur.execute(Query)
21 conn.commit()
22 if LOCALS:dixt['CDN'] = locals()
23 def poolRead_wms(self , devicePairEach ) :
24 q_cond = wifiProject.qcond(devicePairEach , self.time_now , self.time_past )
25 with open('data1_'+devicePairEach+'.json', 'w') as outfile :
26 json.dump(q_cond, outfile)
27
28 query_aws = '~/Music/pypy3.5-v7.0.0-linux64/bin/aws dynamodb query --table-name WMS_COEUT --key-condition-expression "device_id = :did and mac_timestamp BETWEEN :st AND :et" --expression-attribute-values file://data1_'+ devicePairEach+'.json > ' +\
29 self.filePath+ '/temp_' + devicePairEach + '.json'
30 # query_aws = 'aws dynamodb query --table-name WMS_COEUT --key-condition-expression "device_id = :did and mac_timestamp BETWEEN :st AND :et" --expression-attribute-values file://data1_' + devicePairEach + '.json > ' + \
31 # self.filePath + '/temp_' + devicePairEach + '.json'
32
33 print(query_aws)
34 os.system( query_aws )
35 #if LOCALS:dixt['prw'] = locals()
36
37 def read_wms(self , allDevices , timeIndex ):
38
39 self.filePath = wifiProject.filePath()
40 self.time_now = time_now = timeIndex
41 self.time_past = time_past = timeIndex - 60
42 print('FROM :: ' + str(time_past) + ' NOW :: ' + str(time_now))
43 p = Pool(processes= 3 )
44 p.map( self.poolRead_wms , allDevices )
45 p.close()
46 if LOCALS:dixt['rms'] = locals()
47 #self.dataProcess()
48 def mainExecutor(self , liveTime ):
49 # Devices =
50 #for index in range( 1556476200 , 1557081000, 60 )[50:]:
51 self.read_wms( allDevices, liveTime )
52 self.dataProcess(allDevices)
53 for pair in devicePaired:
54 self.csvUpdate( pair )
55 #print(pair)
56
57 #return self.time_now
58
59 def dataProcess(self , allDevices ) :
60 #self.D = devicePairedEach
61 #self.createDbIfNotAvail(devicePairedEach)
62 self.createDbIfNotAvail(allDevices)
63
64 mappedPath = lambda x : [ self.filePath +'/' +'temp_'+x+'.json' ,x]
65
66 for each in list(map(mappedPath , allDevices)) :
67 self.excelCreate( each )
68
69 """
70 file1path = self.filePath+'/'+'temp_'+self.D[0]+'.json'
71 file2path = self.filePath+'/'+'temp_'+self.D[1]+'.json'
72 if LOCALS : dixt['dp'] = locals()
73 #lwrt(str(locals()))"""
74 #self.excelCreate( file1path , file2path )
75 def excelCreate(self , file1path ):
76 df1 = self.dataFrameGen(file1path[0])
77 #df2 = self.dataFrameGen(file2path)
78
79 Groups1 = df1.groupby('mac')
80 #Groups2 = df2.groupby('mac')
81 if LOCALS : dixt['ec'] = locals()
82 self.ConfigureAndExe(Groups1 ,file1path[1])
83 def dataFrameGen(self , filePath ):
84 with open(filePath , 'r') as file :
85 try :
86 data = json.loads( file.read())
87 except:
88 data = { 'Items':[]}
89 df = pd.DataFrame(data['Items'] , columns=wifiProject.dataFrameColumns())
90 df = df.applymap( Map )
91 return df
92 def filterGroupsSub ( self , subDF ):
93 df = subDF
94 df = df.sort_values( 'logtime' , ascending= False)
95 """
96 y = 100000000000
97
98 for key, Frame in df['mac'].iteritems():
99 temp = int(float(df.loc[key, 'logtime']))
100 df.loc[key, 'Duplicates'] = [True if y - 30 > temp else None][0]
101 y = temp
102 df = df.dropna().iloc[: , :-1]"""
103 return df.iloc[0:1 ]
104
105 def ConfigureAndExe(self ,Groups1 , d ):
106 Keys1 = Groups1.groups.keys() #
107 #Keys2 = Groups2.groups.keys() #
108 group1DF = pd.DataFrame(columns=wifiProject.dataFrameColumns())
109 #group2DF = pd.DataFrame(columns=wifiProject.dataFrameColumns())
110 p = Pool(processes=3)
111
112 self.df1List = p.map(self.filterGroupsSub, [Groups1.get_group(key) for key in Keys1])
113 print('first work completed')
114 #self.df2List = p.map(self.filterGroupsSub, [Groups2.get_group(key) for key in Keys2])
115 p.close()
116 print('first pool closed')
117
118 #p = Pool( processes = 3)
119 try:
120 group1DF = group1DF.append(self.df1List)
121 except:
122 print('dataframe1 empty')
123 """
124 try:
125 group2DF = group2DF.append(self.df2List)
126 except:
127 print('dataframe2 empty')
128 """
129 self.updateDBLeft(group1DF , d )
130 #self.updateDBRight(group2DF)
131 #p.map(self.updateDBLeft , self.df1List )
132 #p.map( self.updateDBRight , self.df2List)
133 #p.close()
134 print('second pool closed')
135 def csvUpdate(self , devicePairedEach ):
136 self.D = devicePairedEach
137 resultQuery1 = wifiProject.resultQuery1(self.D[0] , self.D[1] , self.time_now , self.time_past ,etfthTime=self.D[4])
138 cur.execute(resultQuery1)
139 #data = cur.fetchall()
140 data1 = cur.fetchall()
141 resultQuery2 = wifiProject.resultQuery2(self.D[0] , self.D[1] , self.time_now , self.time_past ,etfthTime= self.D[5])
142 #print(resultQuery2)
143 cur.execute(resultQuery2)
144 data2 = cur.fetchall()
145 data = data1 + data2
146 #print(data)
147 df = pd.DataFrame ( data , columns = ['mac' , 'logtimeLeft' , 'logtimeRight'])
148 #print(df)
149 func = lambda x : int(x)
150 df['logtimeLeft'] , df['logtimeRight'] = df['logtimeLeft'].map(func) , df['logtimeRight'].map(func)
151 df['route_id'] , df['travel_time'] = [self.D[2] if index else self.D[3] for index in df.logtimeLeft<df.logtimeRight] , [abs(index) for index in df.logtimeLeft - df.logtimeRight]
152 datetimeMap = lambda x : datetime.strftime(datetime.fromtimestamp(x) , "%a %b %d %H:%M:%S %Y" )
153 dfLeftMap = lambda x : df.loc[x,'logtimeLeft']
154 dfRightMap = lambda x : df.loc[x , 'logtimeRight']
155 df['datetime'] = [ datetimeMap(dfLeftMap(index)) if dfLeftMap(index) > dfRightMap(index) else datetimeMap(dfRightMap(index)) for index in df.index ]
156 df = df.drop(['logtimeLeft' ], axis = 1 )
157
158 df = df[['logtimeRight' , 'datetime' , 'route_id' , 'mac','travel_time']]
159 df.columns = ['timestamp','datetime' , 'route_id' ,'mac','travel_time']
160 groups = df.groupby("route_id")
161 path = wifiProject.csvFilePath()
162 left1 = path + f"{self.D[2]}-"+datetime.strftime(datetime.fromtimestamp(self.time_now) , "%d-%m-%y")+".csv"
163 right1 = path + f"{self.D[3]}-" + datetime.strftime(datetime.fromtimestamp(self.time_now) , "%d-%m-%y") + ".csv"
164 #print(left1)
165 #print(right1)
166
167 try:
168 if os.path.isfile(left1):
169
170 #print(groups.get_group(self.D[2]))
171 groups.get_group(self.D[2]) .to_csv ( left1 ,mode= 'a' , index= False , header= False )
172 print('exe')
173 wifiProject.csvSqlUpdate(groups.get_group(self.D[2]), self.D[2])
174 else:
175 groups.get_group(self.D[2]) .to_csv ( left1 ,mode= 'a' , index= False , header= True )
176 wifiProject.csvSqlUpdate(groups.get_group(self.D[2]), self.D[2])
177
178 except:
179 print('None')
180 try:
181 if os.path.isfile(right1):
182
183 groups.get_group(self.D[3]).to_csv(right1, mode='a', index=False, header=False)
184 print('exe')
185 wifiProject.csvSqlUpdate(groups.get_group(self.D[3]), self.D[3])
186
187 else:
188 groups.get_group(self.D[3]).to_csv(right1, mode='a', index=False, header=True)
189 wifiProject.csvSqlUpdate(groups.get_group(self.D[3]), self.D[3])
190
191 except:
192 print('None')
193
194
195 conn.commit()
196 #print(data)
197 #conn.close()
198 #resultDf.to_excel( dataPath + '/Results/result'+self.count+'.xls' , sheet_name='sheet1')
199 def updateDBLeft ( self , df , d ) :
200 for each in df.index :
201 temp = df.loc[each]
202 sqlQuery = wifiProject.updateQuery(d) +S(temp.device_id)+','+S(temp.mac)+','+temp.logtime+','+temp.mac_timestamp+" , "+S(f'{datetime.strftime(datetime.fromtimestamp(float(temp.logtime)) , "%d-%m,%H:%M:%S")}')+")"
203 #print(sqlQuery)
204 cur.execute(sqlQuery)
205 conn.commit()
206 def updateDBRight(self , df ):
207 for each in df.index :
208 temp = df.loc[each]
209
210 sqlQuery = wifiProject.updateQuery(self.D[1]) +S(temp.device_id)+','+S(temp.mac)+','+temp.logtime+','+temp.mac_timestamp+ " , " +S(f'{datetime.strftime(datetime.fromtimestamp(float(temp.logtime)) , "%d-%m,%H:%M:%S")}')+')'
211 #print(sqlQuery)
212 cur.execute(sqlQuery)
213 conn.commit()
214 def printMethod(func ):
215 def wrapper( *args , **kwargs ):
216 y = func(*args , **kwargs )
217 print(y)
218 return y
219 return wrapper
220 def filePath():
221 path1 = ['..','supportFiles' , 'cptrdData']
222 path2 = [ ".." , 'tempFiles' , 'Device']
223 return '/'.join( path1 )
224
225 #@printMethod
226 def createTableQuery(device):
227 queryStart = ['create table if not exists ','Device',device,'(ID int auto_increment ,']
228 headers = ['device_id ' ,' mac ',' logtime ' , ' mac_timestamp ' ,'rdblTime ']
229 headersPair = ['varchar(200) ,' ,' varchar(200) , ', 'DECIMAL(18,6) , ' ,'DECIMAL(18,6) ,' ,'varchar(200) ,' ]
230 queryEnd = [' primary key(ID))']
231 headerMap = lambda x , y : x+y
232 return [''.join(queryStart)+''.join(list(map(headerMap , headers , headersPair)))+''.join (queryEnd) , headers]
233 #@printMethod
234 def updateQuery(dev):
235 sqlQuery = 'insert into ' + 'Device' + dev+"("+",".join(wifiProject.createTableQuery("")[1])+")"+"""Values("""
236 return sqlQuery
237
238 # @printMethod
239 def resultQuery1( dev1 , dev2 , time_now , time_past ,etfthTime):
240 queryList = ['select ' , 'Device' , dev1 , ".mac , " , 'Device' , dev1 ,'.logtime ,' , 'Device' , dev2 ,
241 '.logtime ' , ' from ' ,'Device' , dev1 ,' inner join ' , 'Device' , dev2 , ' on ' , '''
242 ''' , 'Device' , dev1 , '.mac=' , 'Device' , dev2 , '.mac' , ' and ' , 'Device' , dev1 ,
243 '.logtime between ' , str(time_past) , ' and ' , str(time_now) , ' and ' , 'Device' , dev2 ,
244 '.logtime between ' , str(time_now - etfthTime) , ' and ' , str(time_now)]
245 return ''.join(queryList)
246
247 # @printMethod
248 def resultQuery2( dev1 , dev2 , time_now , time_past , etfthTime):
249 queryList = ['select ' , 'Device' , dev1 , ".mac , " , 'Device' , dev1 ,'.logtime ,' , 'Device' , dev2 ,
250 '.logtime ' , ' from ' ,'Device' , dev1 ,' inner join ' , 'Device' , dev2 , ' on ' , '''
251 ''' , 'Device' , dev1 , '.mac=' , 'Device' , dev2 , '.mac' , ' and ' , 'Device' , dev1 ,
252 '.logtime between ' , str(time_now - etfthTime ) , ' and ' , str(time_past) , ' and ' , 'Device' , dev2 ,
253 '.logtime between ' , str(time_past) , ' and ' , str(time_now)]
254
255 return ''.join(queryList)
256
257 #@printMethod
258 def listOfDevices():
259 with open('../supportFiles/pairDevices', 'r') as file:
260 Devices = json.loads(file.read())
261 return Devices
262 def qcondPrint( func ):
263 def wrapper( *args , **kwargs ):
264 y = func(*args , **kwargs )
265 print(f"""
266 "end":{datetime.strftime(datetime.fromtimestamp(float(y[":et"]["S"])) , "%d-%m,%H:%M:%S")},
267 "start":{datetime.strftime(datetime.fromtimestamp(float(y[":st"]["S"])) , "%d-%m,%H:%M:%S")}
268 """)
269 return y
270 return wrapper
271 #@qcondPrint
272 def qcond(dev , time_now , time_past ):
273 q_cond = {
274 ":did": {
275 "S": dev
276 },
277 ":st": {
278 "S": str(time_past)
279 },
280 ":et": {
281 "S": str(time_now)
282 }
283 }
284 return q_cond
285 def csvFilePath ( ):
286 #path = '/home/busportal/Desktop/resData/'
287 path = '/home/busportal/Desktop/NewSection/WiFi_Realtime/Travel_times/'
288 #path = '/home/its_lab/Desktop/resData/'
289 return path
290 def dataFrameColumns ():
291 return ['device_id', 'logtime', 'mac', 'mac_timestamp']
292
293 def createCsvTableIfNotExists(dev):
294 # print('createcsvtable')
295 query = 'create table if not exists tt_' + dev + '(id int auto_increment , timestamp decimal(18 ,6) , datetime varchar ( 500 ) , \
296 route_id varchar (500) ,travel_time decimal ( 18 ,6) , primary key(id))'
297 # print(query)
298 cur.execute(query)
299 conn.commit()
300 # print('created successfully')
301
302 def csvSqlUpdate(df, dev):
303 print( df.columns , df.index )
304 try:
305
306 inner = lambda x: cur.execute('insert into tt_' + dev + '( timestamp , datetime , route_id , travel_time)'+ ' values(' + S(df.loc[x , 'timestamp']) + ' , ' + S(df.loc[x , 'datetime'] ) + " , " + S(df.loc[x ,'route_id']) + " , " + S(df.loc[x ,'travel_time']) + ')')
307 # inner = lambda x : print(type((df.loc[x].timestamp))) # + ' , '+S( df.loc[x].datetime)
308 (list(map(inner, df.index
309 )))
310 print('csvupdateloopafter')
311 except:
312 wifiProject.createCsvTableIfNotExists(dev)
313 try:
314
315 inner = lambda x: cur.execute(
316 'insert into tt_' + dev + '( timestamp , datetime , route_id , travel_time)'+ ' values(' + S(
317 df.loc[x].timestamp) + ' , ' + S(df.loc[x].datetime) + " , " + S(
318 df.loc[x].route_id) + " , " + S(df.loc[
319 x].travel_time) + ')')
320 # inner = lambda x : print(type((df.loc[x].timestamp))) # + ' , '+S( df.loc[x].datetime)
321 (list(map(inner, df.index)))
322 except:
323 print('issue in csvfileUpdate')
324 conn.commit()
325
326
327#while True :
328# print('start')
329# x = time.time()
330 # wifiProject().mainExecutor( datetime.timestamp(datetime.now()) )
331 # y = time.time()
332 # print('sleep mode ')
333 # time.sleep( 60 - y + x )
334wifiProject().mainExecutor(datetime.timestamp(datetime.now()))