· 4 years ago · Apr 08, 2021, 03:10 PM
1import datetime
2import psycopg2
3from paho.mqtt import client as mqtt_client
4
5
6class MqttScript(object):
7 def __init__(self, broker, port, topic, client_id):
8 self.broker = broker
9 self.port = port
10 self.topic = topic
11 self.client_id = client_id
12
13 @staticmethod
14 def on_connect(client, userdata, flags, rc):
15 if rc == 0:
16 print("Connected to MQTT Broker!")
17 else:
18 print("Failed to connect, return code %d\n", rc)
19
20 def connect_mqtt(self):
21 client = mqtt_client.Client(self.client_id)
22 # client.username_pw_set(username, password)
23 client.on_connect = self.on_connect
24 # client.username_pw_set(username='username', password='password')
25 client.connect(self.broker, self.port)
26 print("--- return client")
27 return client
28
29 def on_message(self, client, userdata, msg):
30 print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
31 self.send_data_to_db(msg)
32 return True
33
34 @staticmethod
35 def send_data_to_db(msg):
36 db = DBScript(db_name='testdb', host='localhost', db_port='5432', user='shafik', password='shafik')
37 db.create_db()
38 topic = msg.topic
39 data = msg.payload.decode()
40 _, device_id, sensor_name = topic.split("/")
41 value = float(data)
42 db.insert_data(device_id, sensor_name, value, datetime.datetime.now())
43
44 def subscribe(self, client: mqtt_client):
45 client.subscribe(self.topic)
46 client.on_message = self.on_message
47
48
49class DBScript(object):
50 def __init__(self, db_name, host, db_port, user, password):
51 self.conn = None
52 self.cursor = None
53 self._initial_setup(db_name, host, db_port, user, password)
54
55 def _initial_setup(self, db_name, host, db_port, user, password):
56 conn = psycopg2.connect(database=db_name, user=user, password=password, host=host, port=db_port)
57 self.conn = conn
58 self.cursor = conn.cursor()
59 return True
60
61 def create_db(self):
62 sql = '''CREATE TABLE IF NOT EXISTS sensor_data(
63 id serial primary key,
64 device_id integer ,
65 sensor_name varchar (45),
66 sensor_value integer ,
67 received_at timestamp
68 )'''
69 self.cursor.execute(sql)
70 self.conn.commit()
71 return True
72
73 def insert_data(self, device_id, sensor_name, value, received_at):
74 try:
75 self.cursor.execute(
76 "INSERT INTO SENSOR_DATA (device_id, sensor_name, sensor_value, received_at) VALUES (%s, %s, %s, %s)",
77 (device_id, sensor_name, value, received_at))
78 self.conn.commit()
79 self.cursor_close()
80 print("insertion success")
81 except Exception as error:
82 print("Error insertion: {}".format(error))
83
84 def cursor_close(self):
85 self.conn.close()
86
87
88if __name__ == '__main__':
89 mqtt = MqttScript(broker='172.105.37.284', port=8883, topic='sensors/+/+', client_id="server-script_01")
90 client = mqtt.connect_mqtt()
91 mqtt.subscribe(client)
92 client.loop_forever()
93