· 5 months ago · May 01, 2025, 08:15 AM
1#!/usr/bin/env python3
2import argparse
3import math
4
5import paho.mqtt.client
6
7import numbers
8import time
9import random
10import sys
11
12from influxdb_client import InfluxDBClient
13from influxdb_client.client.write_api import SYNCHRONOUS
14from threading import Thread
15from collections import deque
16
17import logging
18logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.WARNING)
19
20class DBWriterThread(Thread):
21 def __init__(self, influx_client, *args, **kwargs):
22 self.influx_client = influx_client
23 self.data_queue = deque()
24
25 super(DBWriterThread, self).__init__(*args, **kwargs)
26
27 def schedule_item(self, device):
28 item = (device.device_id, device.device_name,device.control_id, device.value)
29 logging.debug(f"schedule_item: {item}")
30 self.data_queue.append(device)
31
32 def get_items(self, mininterval, maxitems):
33 """ This will collect items from queue until either 'mininterval'
34 is over or 'maxitems' items are collected """
35 started = time.time()
36 items = []
37
38 while (time.time() - started < mininterval) and (len(items) < maxitems):
39 try:
40 item = self.data_queue.popleft()
41 except IndexError:
42 time.sleep(mininterval * 0.1)
43 else:
44 items.append(item)
45
46 logging.debug(f"get_items: {len(items)}")
47 return items
48
49 def run(self):
50 while True:
51 items = self.get_items(mininterval=0.05, maxitems=50)
52 db_req_body = []
53 for device in items:
54 ser_item = self.serialize_data_item(device)
55 logging.debug(f"device_id = {device.device_id}, device_name = {device.device_name}, control_id = {device.control_id}, value = {device.value}")
56 if ser_item:
57 db_req_body.append(ser_item)
58 #stat_clients.add(client)
59
60 logging.debug(f"\n\n db_req_body = {db_req_body} \n\n")
61 if db_req_body:
62 logging.info(f"Write {len(items)} items")
63 write_client = self.influx_client.write_api(write_options=SYNCHRONOUS).write(bucket=args.idb_database, record=db_req_body)
64
65 time.sleep(0.01)
66
67
68
69 def serialize_data_item(self, device):
70
71 if not device.value:
72 return
73
74 fields = {}
75 try:
76 value_f = float(device.value)
77 if not math.isnan(value_f):
78 fields["value_f"] = value_f
79 except ValueError:
80 pass
81 if "value_f" not in fields:
82 fields["value_s"] = device.value
83
84 item = {
85 'measurement': 'Serebrica',
86 'tags' : {
87 "device" : f"{device.device_id}",
88 "device_name" : f"{device.device_name}",
89 "control" : f"{device.control_id}"
90 },
91 "fields" : fields
92 }
93
94 return item
95
96class Device:
97 def __init__(self, _device_id=None, _device_name="", _control_id=None, _value=-999999):
98 self.device_id = _device_id
99 self.device_name = _device_name
100 self.control_id = _control_id
101 self.value = _value
102
103def add_or_update_device(devices_dict, new_device):
104 if new_device.device_id in devices_dict:
105 # Обновляем существующий экземпляр
106 existing_device = devices_dict[new_device.device_id]
107 if existing_device.device_name != None:
108 new_device.device_name = existing_device.device_name
109 if new_device.device_name != None and existing_device.device_name == None:
110 existing_device.device_name = new_device.device_name
111 if new_device.control_id != None:
112 if new_device.control_id == existing_device.control_id:
113 existing_device.value = new_device.value
114 else:
115 devices_dict[new_device.device_id] = new_device
116 #logging.info(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}")
117 #with open(filename, mode='a', encoding='utf-8') as file:
118 # file.write(f"U/{existing_device.device_id}/{existing_device.device_name}/{existing_device.control_id}/{existing_device.value}\n")
119 else:
120 # Добавляем новый экземпляр
121 devices_dict[new_device.device_id] = new_device
122 #logging.info(f"A/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}")
123 #with open(filename, mode='a', encoding='utf-8') as file:
124 # file.write(f"U/{new_device.device_id}/{new_device.device_name}/{new_device.control_id}/{new_device.value}\n")
125
126def convert_to_number(s):
127 try:
128 return int(s)
129 except ValueError:
130 try:
131 return float(s)
132 except ValueError:
133 return -999999
134
135db_writer = None
136devices = {}
137
138def on_mqtt_message(arg0, arg1, arg2=None):
139 if arg2 is None:
140 msg = arg1
141 else:
142 msg = arg2
143
144 # Эта штука не даёт получить мету из топиков. мета даёт имя устройства
145 #if msg.retain:
146 # return
147
148 parts = msg.topic.split('/')[1:]
149
150 #logging.warning(f"_____________________topic path = {parts}")
151
152 #if len(parts) < 4:
153 # return
154
155 #if (parts[0] == 'client'):
156 # client = parts[1]
157 # parts = parts[2:]
158
159 if len(parts) != 4:
160 return
161
162 ## Если подключать в шину не только wirenboard - стоит переделать на классический вариант
163 if parts[0] != 'devices':
164 return
165
166 # Отсеивает ненужные топики
167 if parts[1][:6] == "system_"[:6] or parts[1][:6] == "telegram2wb"[:6]:
168 return
169
170 device_id = parts[1]
171 device = Device(device_id, None, None, -999999)
172
173 if parts[2] == 'meta' and parts[3] == 'name':
174 device.device_name = msg.payload.decode('utf8')
175 add_or_update_device(devices, device)
176
177 if parts[2] == 'controls':
178 device.control_id = parts[3]
179 device.value = convert_to_number(msg.payload.decode('utf8'))
180 else:
181 return
182
183 if device.value != -999999:
184 if isinstance(device.value, numbers.Number):
185 add_or_update_device(devices, device)
186 else:
187 return
188
189 updated_device = devices[device.device_id]
190 if updated_device.device_name != None:
191 if updated_device.value != -999999:
192 db_writer.schedule_item(updated_device)
193
194
195if __name__ == '__main__':
196 parser = argparse.ArgumentParser(description='MQTT retained message deleter', add_help=False)
197 ###---Common---###
198
199 ###---MQTT---###
200 parser.add_argument('-hmq', '--host-mq', dest='mq_host', type=str,
201 help='MQTT host', default='localhost')
202
203 parser.add_argument('-umq', '--username-mq', dest='mq_username', type=str,
204 help='MQTT username', default='')
205
206 parser.add_argument('-Pmq', '--password-mq', dest='mq_password', type=str,
207 help='MQTT password', default='')
208
209 parser.add_argument('-pmq', '--port-mq', dest='mq_port', type=int,
210 help='MQTT port', default='1883')
211
212 parser.add_argument('-tmq', '--topic-mq', dest='mq_topic', type=str,
213 help='Topic mask to unpublish retained messages from. For example: "/devices/my-device/#"', default='/devices/#')
214
215 ###---InfluxDB---###
216 parser.add_argument('-hdb', '--host-idb', dest='idb_host', type=str,
217 help='InfluxDB host', default='localhost')
218
219 parser.add_argument('-pdb', '--port-idb', dest='idb_port', type=int,
220 help='InfluxDB port', default='8086')
221
222 parser.add_argument('-udb', '--username-idb', dest='idb_username', type=str,
223 help='InfluxDB username', default='')
224
225 parser.add_argument('-Pdb', '--password-idb', dest='idb_password', type=str,
226 help='InfluxDB password', default='')
227
228 parser.add_argument('-tdb', '--token-idb', dest='idb_token', type=str,
229 help='InfluxDB access token', default='')
230
231 parser.add_argument('-ddb', '--database-idb', dest='idb_database', type=str,
232 help='InfluxDB database', default='mqtt_data')
233
234 parser.add_argument('-odb', '--organization-idb', dest='idb_org', type=str,
235 help='InfluxDB organization', default='main-org')
236
237
238 mqtt_device_id = str(time.time()) + str(random.randint(0, 100000))
239
240 args = parser.parse_args()
241
242
243 ## Переназначение аргументов для тестирования
244 #args.mq_host = "***.***.***.***"
245 #args.mq_port = 1883
246 #args.mq_username = "***"
247 #args.mq_password = "***"
248 #args.mq_topic = "/devices/#"
249
250 #args.idb_host = "***.***.***.***"
251 #args.idb_port = 8086
252 #args.idb_username = "***"
253 #args.idb_password = "***"
254 #args.idb_token = "***" ##write access token
255 #args.idb_database = "wirenboard"
256 #args.idb_org = "main-org"
257
258
259 client = paho.mqtt.client.Client(client_id=None, clean_session=True, protocol=paho.mqtt.client.MQTTv31)
260
261 if args.mq_username:
262 client.username_pw_set(args.mq_username, args.mq_password)
263
264 client.connect(args.mq_host, args.mq_port)
265 client.on_message = on_mqtt_message
266
267 client.subscribe(args.mq_topic)
268
269
270 #influx_client = InfluxDBClient(args.idb_host, args.idb_port, args.idb_database)
271 influx_client = InfluxDBClient(
272 url=f"http://{args.idb_host}:{args.idb_port}",
273 token=f"{args.idb_token}",
274 verify_ssl=False,
275 org=args.idb_org
276 )
277
278 #args.idb_database
279 db_writer = DBWriterThread(influx_client, daemon=True)
280 db_writer.start()
281
282
283 while 1:
284 rc = client.loop()
285 if rc != 0:
286 break