· 6 years ago · Jul 05, 2019, 02:04 PM
1/*
2 * Copyright (c) 2016, Peter Thorson. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 * * Redistributions of source code must retain the above copyright
7 * notice, this list of conditions and the following disclaimer.
8 * * Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 * * Neither the name of the WebSocket++ Project nor the
12 * names of its contributors may be used to endorse or promote products
13 * derived from this software without specific prior written permission.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 *
26 */
27
28#include "websocketpp/config/asio_no_tls_client.hpp"
29#include "websocketpp/client.hpp"
30#include <iostream>
31#include <sqlite3.h>
32#include <string>
33#include <stdio.h>
34#include <unistd.h>
35#include <math.h>
36#include <cmath>
37#include <time.h>
38#include <stdlib.h>
39#include <iostream>
40#include <fstream>
41#include <time.h>
42#include "json/json.h"
43#include <unistd.h>
44
45typedef websocketpp::client<websocketpp::config::asio_client> client;
46
47using websocketpp::lib::placeholders::_1;
48using websocketpp::lib::placeholders::_2;
49using websocketpp::lib::bind;
50clock_t prev_time, current_time;
51double delta_time;
52double prev,curr;
53
54struct timeval tv1;
55
56typedef struct intf_vals
57{
58 unsigned long long prev;
59 unsigned long long curr;
60 unsigned long long delta;
61 float thru;
62}intf_val_t;
63
64//std::vector<intf_val_t> intf_arr;
65intf_val_t LANRxv4;
66intf_val_t LANRxv6;
67intf_val_t LANTxv4;
68intf_val_t LANTxv6;
69
70intf_val_t WANRxv4;
71intf_val_t WANRxv6;
72intf_val_t WANTxv4;
73intf_val_t WANTxv6;
74
75intf_val_t MgmtRx;
76intf_val_t MgmtTx;
77
78typedef websocketpp::config::asio_client::message_type::ptr message_ptr;
79std::string input_file ="";
80std::string header_str="headers:";
81std::string data_str="data:";
82
83
84/*Initialize SQLitecpp parameters*/
85void SQL_CMD(std::string comand);
86static int callback(void* data, int argc, char** argv, char** azColName);
87sqlite3 *db;
88char *zErrMsg = 0;
89
90/*SQLite table parameters*/
91int day_counter = 0;
92int max_counter = 0;
93
94unsigned long long convert(std::string val)
95{
96 if(val != "A Default Value if not exists")
97 {
98 std::stringstream stream(val);
99 unsigned long long result;
100 stream >> result;
101 return result;
102 }
103 return 0;
104}
105
106bool entered_flag = false;
107// bool connectDB() {
108// if (false == isOpenDB && sqlite3_open(DB, &dbfile) == SQLITE_OK) {
109// isOpenDB = true;
110// }
111// return isOpenDB;
112// }
113/*----------------------------------------------------------------------
114Function: callback(void* data, int argc, char** argv, char** azColName)
115Data Change Notification Callbacks
116Purpose: Registers a callback function with the database connection identified by the first argument
117 to be invoked whenever a row is updated, inserted or deleted in a rowid table.
118 This function is essential for the functionality of SQL_CMD(std::string command)
119------------------------------------------------------------------------*/
120static int callback(void* data, int argc, char** argv, char** azColName)
121{
122 int i;
123 fprintf(stderr, "%s: ", (const char*)data);
124
125 for (i = 0; i < argc; i++)
126 {
127 printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
128 }
129
130 printf("\n");
131 return 0;
132}
133/*----------------------------------------------------------------------
134Function: SQL_CMD(std::string command)
135Purpose: Executes common sqlite3 commands with the command as an input string 'command'
136------------------------------------------------------------------------*/
137void SQL_CMD(std::string command)
138{
139 sqlite3_exec(db, command.c_str(), callback, NULL, NULL);
140}
141
142// This message handler will be invoked once for each incoming message. It
143// prints the message and then sends a copy of the message back to the server.
144void on_message(client* c, websocketpp::connection_hdl hdl, message_ptr msg)
145{
146
147 std::string results = msg->get_payload();
148 std::string parsed_data;
149 size_t found_header = results.find("headers:");
150 size_t found_data = results.find("data:");
151
152
153 if (found_data!=std::string::npos)
154 {
155 parsed_data = results.substr(found_data+data_str.length(), results.length());
156 }
157 else
158 {
159 //something else
160 return;
161 }
162
163
164 Json::Value pdata;
165 Json::StyledWriter styledWriter;
166 Json::Reader reader;
167 bool parsingSuccessful = reader.parse(parsed_data, pdata);
168 std::string RxV4, RxV6, TxV4, TxV6, RMx, TMx, WRxV4, WRxV6, WTxV4, WTxV6;
169 if (parsingSuccessful)
170 {
171 /*Grabs the stats values from the parsed json data*/
172 RxV4 = pdata.get("system_stats_curr_intf_lan_v4_pkt_stats_rx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
173 RxV6 = pdata.get("system_stats_curr_intf_lan_v6_pkt_stats_rx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
174 TxV4 = pdata.get("system_stats_curr_intf_lan_v4_pkt_stats_tx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
175 TxV6 = pdata.get("system_stats_curr_intf_lan_v6_pkt_stats_tx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
176 RMx = pdata.get("system_stats_curr_intf_v6_mgmt_pkt_stats_rx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
177 TMx = pdata.get("system_stats_curr_intf_v6_mgmt_pkt_stats_tx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
178
179 WRxV4 = pdata.get("system_stats_curr_intf_wan_v4_pkt_stats_rx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
180 WRxV6 = pdata.get("system_stats_curr_intf_wan_v6_pkt_stats_rx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
181 WTxV4 = pdata.get("system_stats_curr_intf_wan_v4_pkt_stats_tx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
182 WTxV6 = pdata.get("system_stats_curr_intf_wan_v6_pkt_stats_tx_pkt_stats_tot_bytes", "A Default Value if not exists" ).asString();
183
184
185 if(RxV4 != "A Default Value if not exists")
186 {
187 int rc = sqlite3_open("/fl0/StatsCollector.db", &db);
188 if (rc == 0)
189 {
190 /**checks for open database. If rc == 0, the database is generated**/
191 SQL_CMD("/fl0/StatsCollector.db");
192 SQL_CMD("CREATE TABLE 'live' ( 'Time' TEXT NOT NULL, 'RxV4' REAL,'RxV6' REAL,'TxV4' REAL,'TxV6' REAL,'MRx' REAL,'MTx' REAL);");
193 SQL_CMD("CREATE TABLE 'day' ( 'Time' TEXT NOT NULL, 'RxV4' REAL,'RxV6' REAL,'TxV4' REAL,'TxV6' REAL,'MRx' REAL,'MTx' REAL);");
194 SQL_CMD("CREATE TABLE 'max' ( 'Time' TEXT NOT NULL, 'RxV4' REAL,'RxV6' REAL,'TxV4' REAL,'TxV6' REAL,'MRx' REAL,'MTx' REAL);");
195 }
196
197 SQL_CMD("ATTACH DATABASE '/fl0/StatsCollector' as 'statscollector';");
198
199 if(entered_flag == false)
200 {
201 //std::cout << "Got here";
202 /*First iteration is always empty, this sets up the baseline for forward progression*/
203 LANRxv4.curr=convert(RxV4);
204 LANRxv6.curr=convert(RxV6);
205 LANTxv4.curr=convert(TxV4);
206 LANTxv6.curr=convert(TxV6);
207 WANRxv4.curr=convert(WRxV4);
208 WANRxv6.curr=convert(WRxV6);
209 WANTxv4.curr=convert(WTxV4);
210 WANTxv6.curr=convert(WTxV6);
211 MgmtRx.curr=convert(RMx);
212 MgmtTx.curr=convert(TMx);
213
214
215 LANRxv4.prev = LANRxv4.curr;
216 LANRxv6.prev = LANRxv6.curr;
217 LANTxv4.prev = LANTxv4.curr;
218 LANTxv6.prev = LANTxv6.curr;
219
220 WANRxv4.prev = WANRxv4.curr;
221 WANRxv6.prev = WANRxv6.curr;
222 WANTxv4.prev = WANTxv4.curr;
223 WANTxv6.prev = WANTxv6.curr;
224 MgmtRx.prev = MgmtRx.curr;
225 MgmtTx.prev = MgmtTx.curr;
226
227 entered_flag = true;
228 }
229 else
230 {
231 gettimeofday(&tv1, NULL);
232 curr=(double) (tv1.tv_usec)/1000000 + (double) (tv1.tv_sec);
233 delta_time = (double)(curr - prev);
234 //printf ("Total time = %f seconds\n",delta_time);
235
236 LANRxv4.curr=convert(RxV4);
237 LANRxv4.thru = 8*(LANRxv4.curr - LANRxv4.prev)/ (1024.0*1024.0 * delta_time);
238
239
240 LANRxv6.curr=convert(RxV6);
241 LANRxv6.thru = 8*(LANRxv6.curr - LANRxv6.prev)/ (1024.0*1024.0 * delta_time);
242
243 LANTxv4.curr=convert(TxV4);
244 LANTxv4.thru = 8*(LANTxv4.curr - LANTxv4.prev)/ (1024.0*1024.0 * delta_time);
245
246 LANTxv6.curr=convert(TxV6);
247 LANTxv6.thru = 8*(LANTxv6.curr - LANTxv6.prev)/ (1024.0*1024.0 * delta_time);
248
249 WANRxv4.curr=convert(WRxV4);
250 WANRxv4.thru = 8*(WANRxv4.curr - WANRxv4.prev)/ (1024.0*1024.0 * delta_time);
251
252 WANRxv6.curr=convert(WRxV6);
253 WANRxv6.thru = 8*(WANRxv6.curr - WANRxv6.prev)/ (1024.0*1024.0 * delta_time);
254
255 WANTxv4.curr=convert(WTxV4);
256 WANTxv4.thru = 8*(WANTxv4.curr - WANTxv4.prev)/ (1024.0*1024.0 * delta_time);
257
258 WANTxv6.curr=convert(WTxV6);
259 WANTxv6.thru = 8*(WANTxv6.curr - WANTxv6.prev)/ (1024.0*1024.0 * delta_time);
260
261 MgmtRx.curr=convert(RMx);
262 MgmtRx.thru = 8*(MgmtRx.curr - MgmtRx.prev)/ (1024.0 * delta_time);
263
264 MgmtTx.curr=convert(TMx);
265 MgmtTx.thru = 8*(MgmtTx.curr - MgmtTx.prev)/ (1024.0 * delta_time);
266
267
268 char buffer[200];
269 std::string time_prefix = "strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours')";
270 std::cout << "Time: " << time_prefix << std::endl;
271 std::cout << "Rxcurr= " << WANRxv4.curr << std::endl;
272 std::cout << "Rx_prev = " << WANRxv4.prev << std::endl;
273 std::cout << "Rx_thru = " << WANRxv4.thru << std::endl << std::endl;
274
275 sprintf(buffer,
276 "INSERT INTO live (Time,RxV4,RxV6,TxV4, TxV6, MRx, MTx) VALUES (%s,%-.2f,%-.2f,%-.2f,%-.2f,%-.2f,%-.2f);",
277 time_prefix.c_str(),
278 WANRxv4.thru,
279 WANRxv6.thru,
280 WANTxv4.thru,
281 WANTxv6.thru,
282 MgmtRx.thru,
283 MgmtTx.thru
284 );
285
286
287 /**Store Previous values for next iteration. Used for delta calculation **/
288 LANRxv4.prev = LANRxv4.curr;
289 LANRxv6.prev = LANRxv6.curr;
290 LANTxv4.prev = LANTxv4.curr;
291 LANTxv6.prev = LANTxv6.curr;
292
293 WANRxv4.prev = WANRxv4.curr;
294 WANRxv6.prev = WANRxv6.curr;
295 WANTxv4.prev = WANTxv4.curr;
296 WANTxv6.prev = WANTxv6.curr;
297 MgmtRx.prev = MgmtRx.curr;
298 MgmtTx.prev = MgmtTx.curr;
299
300 prev=curr;
301
302
303 SQL_CMD(buffer);
304 day_counter++;
305 max_counter++;
306 SQL_CMD("Delete from live where Time < strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hour');");
307 if(day_counter == 300)
308 {
309 /**Averages last 5 minutes from live into table day**/
310
311
312 SQL_CMD("INSERT INTO day (Time, RxV4, RxV6, TxV4, TxV6, MRx, MTx) VALUES"
313 "(datetime('now', 'localtime'),"
314 "(select avg(RxV4) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')),"
315 "(select avg(RxV6) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')),"
316 "(select avg(TxV4) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')),"
317 "(select avg(TxV6) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')),"
318 "(select avg(MRx) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')),"
319 "(select avg(MTx) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-5 minutes')));");
320
321 SQL_CMD("Delete from day where Time < strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime', '-4 hours','-1 day');");
322
323 day_counter = 0;
324 }
325 if(max_counter == 3600)
326 {
327 /**Averages last hour from live into table max**/
328 SQL_CMD(""
329 "INSERT INTO max (Time, RxV4, RxV6, TxV4, TxV6, MRx, MTx) VALUES"
330 "(datetime('now', 'localtime'),"
331 "(select avg(RxV4) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')),"
332 "(select avg(RxV6) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')),"
333 "(select avg(TxV4) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')),"
334 "(select avg(TxV6) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')),"
335 "(select avg(MRx) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')),"
336 "(select avg(MTx) from live where Time > strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-5 hours')));");
337 SQL_CMD("Delete from day where Time < strftime('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime','-15 days');");
338
339
340 max_counter = 0;
341 }
342 rc = sqlite3_close_v2(db);
343 }
344 }
345 }
346
347
348}
349void on_open(client* c, websocketpp::connection_hdl hdl)
350{
351
352 websocketpp::lib::error_code ec;
353 std::ifstream file_input(input_file.c_str());
354 Json::Reader reader;
355 Json::Value root;
356 reader.parse(file_input, root);
357 std::string output;
358
359 /*The first keys denote the format of data collection that the client needs to request.
360 * The time row is the frequency of statistics that the server needs to send in milliseconds. */
361 output.append("rows="+root["rows"].asString()+":");
362 output.append("style="+root["style"].asString()+":");
363 output.append("time="+root["time"].asString()+":");
364
365 Json::Value stats = root["stats"];
366
367 for ( int index = 0; index < stats.size(); ++index )
368 {
369 /* The parent struct key denotes the region of shared memory the statistics needs to be collected from.
370 * Root_struct: this is the root structure of the data point in the stats memory. In this case, it is mgmt structure
371 * Name: name is the variable name of data point
372 * "stats": [
373 * {
374 * "root_struct": "mgmt",
375 * "name": "genStUpTimeMins"
376 * } ]
377 * will translate to
378 * stats,system_stats_curr_mgmt_genStUpTimeMins
379 * ":" is used as a delimiter for the server .... for now (#TODO:change to json format string)
380 */
381 output.append("stats,");
382 output.append("system_stats_curr_"+stats[index]["parent_struct"].asString()+"_"+stats[index]["root_struct"].asString()+"_");
383 output.append(stats[index]["name"].asString()+":");
384 }
385
386 c->send(hdl,output,websocketpp::frame::opcode::text,ec);
387 gettimeofday(&tv1, NULL);
388 prev=(double) (tv1.tv_usec)/1000000 + (double) (tv1.tv_sec);
389 //std::cout << "Message sent : " << output << std::endl;
390 if (ec)
391 {
392 std::cout << "Echo failed because: " << ec.message() << std::endl;
393 }
394}
395
396
397int main(int argc, char* argv[]) {
398 // Create a client endpoint
399 client c;
400
401 std::string uri = "ws://127.0.0.1:9002";
402
403 input_file = argv[1];
404
405
406 try {
407 // Set logging to be pretty verbose (everything except message payloads)
408 c.set_access_channels(websocketpp::log::alevel::all);
409 c.clear_access_channels(websocketpp::log::alevel::frame_payload);
410
411 // Initialize ASIO
412 c.init_asio();
413
414 // Register our message handler
415 c.set_message_handler(bind(&on_message,&c,::_1,::_2));
416 c.set_open_handler(bind(&on_open,&c,::_1));
417 websocketpp::lib::error_code ec;
418 client::connection_ptr con = c.get_connection(uri, ec);
419 if (ec) {
420 std::cout << "could not create connection because: " << ec.message() << std::endl;
421 return 0;
422 }
423
424 // Note that connect here only requests a connection. No network messages are
425 // exchanged until the event loop starts running in the next line.
426 c.connect(con);
427
428 // Start the ASIO io_service run loop
429 // this will cause a single connection to be made to the server. c.run()
430 // will exit when this connection is closed.
431 c.run();
432 } catch (websocketpp::exception const & e) {
433 std::cout << e.what() << std::endl;
434 }
435}