Files
DebugTool/source/device/DeviceMQTT.py
2024-11-23 17:39:54 +08:00

196 lines
7.4 KiB
Python

import time
import json
import random
import logging
from paho.mqtt import client as mqtt_client
from .tools import ByteConv
from .function import protocols
# 通信重连参数
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
class DeviceMQTT:
""" MQTT通信设备原型
Note: 配置通道及订阅主题
"""
def __init__(self, broker, port, account=None, device_id=None, callbacks=None, **kwargs):
""" 设备初始化 """
self.topic = None
self.client_id = f'python-DeviceMQTT-{random.randint(0, 10000):04d}'
self.client = self.open_connection(broker, port, account, **kwargs)
self._subscribe(device_id)
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
match callbacks:
case (maker, parser):
self._frame_maker = maker if maker is not None else lambda self: ''
self._frame_parser = parser if parser is not None else lambda self, frame: ''
case _:
self._frame_maker = lambda self: ''
self._frame_parser = lambda self, frame: ''
self._message = []
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def open_connection(self, broker, port, account=None, **kwargs) ->bool:
""" 创建链接 """
def on_connect(client, userdata, flags, rc, properties):
""" 回调函数-创建链接 """
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
def on_disconnect(client, userdata, rc):
""" 回调函数-断开链接 """
print("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
print("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
print("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
print("Reconnect failed after %s attempts. Exiting...", reconnect_count)
def on_message(client, userdata, msg):
self._message.append(msg)
print(f"Received message from `{msg.topic}` topic")
client = mqtt_client.Client(client_id=self.client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
if account is not None:
client.username_pw_set(account[0], account[1])
client.on_connect = on_connect if 'func_on_connect' not in kwargs.keys() else kwargs['func_on_connect']
# client.on_disconnect = on_disconnect if 'func_on_disconnect' not in kwargs.keys() else kwargs['func_on_disconnect']
client.on_message = on_message if 'func_on_message' not in kwargs.keys() else kwargs['func_on_message']
client.connect(broker, port)
return client
def close_connection(self) ->bool:
""" 关闭连接 """
self.client.disconnect()
def _subscribe(self, device_id):
""" 订阅主题 """
if self.topic is not None:
self.client.unsubscribe(self.topic[1])
topic_send = f"ctiot/download/7/0101/{device_id}/function/invoke"
topic_read = f"ctiot/upload/7/0101/{device_id}/#"
self.client.subscribe(topic_read)
self.device_id = device_id
self.topic = (topic_send, topic_read)
def __send(self, msg: bytearray, topic=None) -> bool:
""" 发布消息 """
if topic is None:
topic = self.topic[0]
message = {
"deviceId": self.device_id,
"isSubDevice": 0,
"requestType": 0,
"messageId": f"debug-{self.client_id[18:]}",
"timestamp": int(time.time()),
"functionId": "168493059",
"inputs": [
{
"modbus_msg": ByteConv.trans_list_to_str(msg).replace(" ", '')
}
]
}
result = self.client.publish(topic, str(message).replace('\'', '\"'))
return result[0] == 1
def __read(self, timeout=None) -> bytes:
self._message.clear()
timeout = timeout if timeout is not None else 1
self.client.loop_start()
time.sleep(timeout)
self.client.loop_stop()
for message in self._message:
message_data = json.loads(message.payload)
if message_data['messageId'] == f"debug-{self.client_id[18:]}":
""" 报文接收成功 """
frame = " ".join((message_data['modbus_msg'][2*i:2*(i+1)] for i in range(len(message_data['modbus_msg'])//2)))
frame = ByteConv.trans_str_to_list(frame)
return bytearray(frame)
return b''
def __read_frame(self) ->bool:
""" 读取报文并解析帧 """
frame_recv = b''
try:
frame_recv = self.__read(timeout=self.time_out)
self.output = self._frame_parser(frame_recv)
if self.flag_print:
print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def _transfer_data(self) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
# 生成发送帧
frame: bytearray = self._frame_maker()
# if not self.client.is_connected():
# """ 无效通信接口, 打印报文后返回 """
# print(ByteConv.trans_list_to_str(frame))
# return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__read(timeout=0)
self.__send(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", ByteConv.trans_list_to_str(frame))
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
protocols.print_display(self.output['Regs'])
self.log['read'] += 1
break
fail_count += 1
self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail']
time.sleep(2 * self.time_out)
return fail_count < self.retry