From 9a01153f3c3fd4594ddca02c96737f5f2164c17d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=20=E6=B3=BD=E9=9A=86?= Date: Thu, 21 Nov 2024 18:05:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0MQTT=E4=B8=BB=E7=AB=99?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E8=84=9A=E6=9C=AC;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source/dev_station.py | 179 +++++++++++++++++++++++++++++++ source/device/DeviceMQTT.py | 190 +++++++++++++++++++++++++++++++++ source/device/LaminaAdapter.py | 11 +- 3 files changed, 378 insertions(+), 2 deletions(-) create mode 100644 source/dev_station.py create mode 100644 source/device/DeviceMQTT.py diff --git a/source/dev_station.py b/source/dev_station.py new file mode 100644 index 0000000..dc4fb28 --- /dev/null +++ b/source/dev_station.py @@ -0,0 +1,179 @@ +""" 主站通信脚本 + mqtt协议 + +""" + +import time +import random +import logging +from paho.mqtt import client as mqtt_client + +from device.LaminaAdapter import ParamMap_LaminaAdapter +from device.DeviceMQTT import DeviceMQTT +from device.function import protocols + +MainStation = { + "broker": '123.249.75.235', + "port": 1883, + "account": ('TTE0101TC2311000003', 'qh10579lcb7au8o2') +} + +ParamMap_LaminaCombiner = { + # 1 - Hex + # 2 - Int16[ratio] + # 3 - lnt32[ratio] + # 4 - str + # 5 - addr + # 6 - float + 0x000: ["系统型号", 4, 16], + 0x010: ["程序版本", 4, 16], + + 0x800: ["设备地址", 5, 3], + 0x803: ["时间", 5, 3], + 0x806: ["文件记录日志级别", 1], + 0x807: ["串口输出日志级别", 1], + 0x808: ["日志输出控制字", 1], + 0x809: ["mqtt设备编码(deviceID)", 4, 25], + 0x822: ["mqtt直流表子设备编码(deviceID)", 4, 25], + 0x83B: ["mqtt适配器子设备编码(deviceID)", 4, 25], + 0x854: ["报文控制字", 1], + 0x855: ["汇流箱类型", 1], + 0x856: ["待升级适配器映射", 1], + 0x857: ["适配器1地址", 5, 3], + 0x85A: ["适配器1线路ID", 1], + 0x85B: ["适配器2地址", 5, 3], + 0x85E: ["适配器2线路ID", 1], + 0x85F: ["适配器3地址", 5, 3], + 0x862: ["适配器3线路ID", 1], + 0x863: ["适配器4地址", 5, 3], + 0x866: ["适配器4线路ID", 1], + 0x867: ["适配器5地址", 5, 3], + 0x86A: ["适配器5线路ID", 1], + 0x86B: ["适配器6地址", 5, 3], + 0x86E: ["适配器6线路ID", 1], + 0x86F: ["适配器7地址", 5, 3], + 0x872: ["适配器7线路ID", 1], + 0x873: ["适配器8地址", 5, 3], + 0x876: ["适配器8线路ID", 1], + 0x877: ["适配器9地址", 5, 3], + 0x87A: ["适配器9线路ID", 1], + 0x87B: ["适配器10地址", 5, 3], + 0x87E: ["适配器10线路ID", 1], + 0x87F: ["适配器11地址", 5, 3], + 0x882: ["适配器11线路ID", 1], + 0x883: ["适配器12地址", 5, 3], + 0x886: ["适配器12线路ID", 1], + 0x887: ["适配器13地址", 5, 3], + 0x88A: ["适配器13线路ID", 1], + 0x88B: ["适配器14地址", 5, 3], + 0x88E: ["适配器14线路ID", 1], + 0x88F: ["适配器15地址", 5, 3], + 0x892: ["适配器15线路ID", 1], + 0x893: ["适配器16地址", 5, 3], + 0x896: ["适配器16线路ID", 1], + 0x897: ["档案自适用使能", 1], + 0x898: ["档案自适用收集地址时间", 2, 1], + 0x899: ["适配器开机时间", 2, 1], + 0x89A: ["适配器关机时间", 2, 1], + 0x89B: ["有流阈值", 2, 1], + 0x89C: ["无流阈值", 2, 1], + 0x89D: ["路由组网时间", 2, 1], + 0x89E: ["并发抄读最大并发数", 2, 1], + 0x89F: ["并发抄读等待回复超时时间", 2, 1], + 0x8A0: ["并发抄读数据有效维持时间", 2, 1], + 0x8A1: ["输出母线过压阈值", 2, 1], + 0x8A2: ["默认电池基准电压", 2, 1], + 0x8A3: ["实际电池基准电压", 2, 1], + 0x8A4: ["功率限制功能关闭的适配器个数", 2, 1], + 0x8A5: ["功率限制功能使关闭的适配器映射", 2, 1], + 0x8A6: ["功率限制比率", 2, 1], + 0x8A7: ["下挂电表类型", 1], + 0x8A8: ["功率限制电池电压回执(差值)", 2, 1], + 0x8A9: ["功率恢复电池电压回执(差值)", 2, 1], + 0x8AA: ["是否ODM", 1], + 0x8AB: ["系统类型", 4, 16], + 0x8BB: ["控制器生产厂商", 4, 16], + 0x8CB: ["控制器型号", 4, 16], + 0x8DB: ["厂家缩写", 4, 4], + 0x8DF: ["汇流箱软件版本前缀", 4, 8], + 0x8E7: ["适配器软件版本前缀", 4, 8], + 0x8EF: ["适配器生产厂商", 4, 8], + 0x8F7: ["适配器型号", 4, 16], +} + + +class LaminaStation(DeviceMQTT): + def __init__(self, device_id, station=None, **kwargs): + """ 设备初始化 """ + def check_frame_modbus_MultiDevice(frame): + """ 多设备帧报文检测 """ + frame_block = self.block.copy() + if (0x4000 < frame_block['data_addr']) and (frame_block['data_addr'] < 0x6000): + """ 报文来自于适配器 """ + dev_id = (frame_block['data_addr'] - 0x4000) // 0x200 + frame_block['data_addr'] -= 0x4000 + 0x200 * dev_id + frame_block['data_define'] = ParamMap_LaminaAdapter + if ((frame_block['data_addr'] + frame_block['data_len']) >= 0x200): + raise ValueError("Data addresses across boundaries") + return protocols.check_frame_modbus(frame, frame_block) + + if station is None: + station = MainStation + super().__init__(**station, device_id=device_id, + callbacks=(lambda : protocols.make_frame_modbus(self.block), + check_frame_modbus_MultiDevice), + **kwargs) + self.block = { + 'addr_dev' : 0x00, + 'data_define': ParamMap_LaminaCombiner, + } + + def frame_read(self, daddr=0x60, dlen=0x30) -> bool: + self.block['type'] = 'read' + self.block['data_addr'] = daddr + self.block['data_len'] = dlen + return self._transfer_data() + + def frame_write_one(self, daddr=0x85, dval=-900) -> bool: + self.block['type'] = 'write_one' + self.block['data_addr'] = daddr + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() + + def frame_write_dual(self, daddr=0x91, dval=600) -> bool: + self.block['type'] = 'write_dual' + self.block['data_addr'] = daddr + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() + + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: + self.block['type'] = 'write_str' + self.block['data_addr'] = daddr + self.block['data_val'] = dval + return self._transfer_data() + + +if __name__ == '__main__': + mode_config = { + "dev1": {'device_id': 'TTE0101DX2406140046', # 张家港鹿苑北单管塔 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev2": {'device_id': 'TTE0101DX2409230113', # 常来东-光伏 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev3": {'device_id': 'TTE0101DX2406270041', # 大丰市镇区补点139 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev4": {'device_id': 'TTE0101DX2407020114', # 大丰大龙南 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + } + dev_lamina = LaminaStation(**mode_config["dev4"]) + + dev_lamina.frame_read(0x0000, 0x20) + dev_lamina.frame_read(0x4100, 0x20) + + if not hasattr(__builtins__,"__IPYTHON__"): + pass diff --git a/source/device/DeviceMQTT.py b/source/device/DeviceMQTT.py new file mode 100644 index 0000000..2f9f074 --- /dev/null +++ b/source/device/DeviceMQTT.py @@ -0,0 +1,190 @@ +import time +import json +import random +import logging +from paho.mqtt import client as mqtt_client + +from .tools import ByteConv +from .function import protocols + +# 通信重连参数 +FIRST_RECONNECT_DELAY = 1 +RECONNECT_RATE = 2 +MAX_RECONNECT_COUNT = 12 +MAX_RECONNECT_DELAY = 60 + +class DeviceMQTT: + """ MQTT通信设备原型 + Note: 配置通道及订阅主题 + """ + def __init__(self, broker, port, account=None, device_id=None, callbacks=None, **kwargs): + """ 设备初始化 """ + self.topic = None + self.client_id = f'python-DeviceMQTT-{random.randint(0, 1000)}' + self.client = self.open_connection(broker, port, account, **kwargs) + self._subscribe(device_id) + + self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False + self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 + self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 + self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 + + match callbacks: + case (maker, parser): + self._frame_maker = maker if maker is not None else lambda self: '' + self._frame_parser = parser if parser is not None else lambda self, frame: '' + case _: + self._frame_maker = lambda self: '' + self._frame_parser = lambda self, frame: '' + + self._message = {} + self.output = { + 'result': False, + 'code_func': 0x00, + } + self.log = { + 'send': 0, + 'read': 0, + 'keep-fail': 0, + 'record': { + 'config': None, + 'data': None, + }, + } + + def open_connection(self, broker, port, account=None, **kwargs) ->bool: + """ 创建链接 """ + def on_connect(client, userdata, flags, rc, properties): + """ 回调函数-创建链接 """ + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + def on_disconnect(client, userdata, rc): + """ 回调函数-断开链接 """ + print("Disconnected with result code: %s", rc) + reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY + while reconnect_count < MAX_RECONNECT_COUNT: + print("Reconnecting in %d seconds...", reconnect_delay) + time.sleep(reconnect_delay) + + try: + client.reconnect() + print("Reconnected successfully!") + return + except Exception as err: + logging.error("%s. Reconnect failed. Retrying...", err) + + reconnect_delay *= RECONNECT_RATE + reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) + reconnect_count += 1 + print("Reconnect failed after %s attempts. Exiting...", reconnect_count) + def on_message(client, userdata, msg): + self._message = msg + print(f"Received message from `{msg.topic}` topic") + + client = mqtt_client.Client(client_id=self.client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) + if account is not None: + client.username_pw_set(account[0], account[1]) + client.on_connect = on_connect if 'func_on_connect' not in kwargs.keys() else kwargs['func_on_connect'] + client.on_disconnect = on_disconnect if 'func_on_disconnect' not in kwargs.keys() else kwargs['func_on_disconnect'] + client.on_message = on_message if 'func_on_message' not in kwargs.keys() else kwargs['func_on_message'] + client.connect(broker, port) + return client + + + def close_connection(self) ->bool: + """ 关闭连接 """ + self.client.disconnect() + + def _subscribe(self, device_id): + """ 订阅主题 """ + topic_send = f"ctiot/download/7/0101/{device_id}/function/invoke" + topic_read = f"ctiot/upload/7/0101/{device_id}/#" + self.device_id = device_id + self.topic = (topic_send, topic_read) + self.client.subscribe(topic_read) + + def __send(self, msg: bytearray, topic=None) -> bool: + """ 发布消息 """ + if topic is None: + topic = self.topic[0] + message = { + "deviceId": self.device_id, + "isSubDevice": 0, + "requestType": 0, + "messageId": "463g3ga53j4k25o9w", + "timestamp": int(time.time()), + "functionId": "168493059", + "inputs": [ + { + "modbus_msg": ByteConv.trans_list_to_str(msg).replace(" ", '') + } + ] + } + + result = self.client.publish(topic, str(message).replace('\'', '\"')) + return result[0] == 1 + + def __read(self, timeout=None) -> bytes: + self._message = None + timeout = timeout if timeout is not None else 1 + self.client.loop_start() + time.sleep(timeout) + self.client.loop_stop() + + if self._message: + """ 报文接收成功 """ + message_data = json.loads(self._message.payload) + frame_recv = " ".join((message_data['modbus_msg'][2*i:2*(i+1)] for i in range(len(message_data['modbus_msg'])//2))) + frame_recv = ByteConv.trans_str_to_list(frame_recv) + return bytearray(frame_recv) + else: + return b'' + + def __read_frame(self) ->bool: + """ 读取报文并解析帧 """ + try: + frame_recv = self.__read(timeout=self.time_out) + self.output = self._frame_parser(frame_recv) + if self.flag_print: + print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv)) + except Exception as ex: + print("Error Info: ", ex) + if self.flag_print and frame_recv: + print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv)) + self.output['result'] = False + return self.output['result'] + + def _transfer_data(self) -> bool: + """ 串口收发报文, 包含重试逻辑与数据打印 """ + # 生成发送帧 + frame: bytearray = self._frame_maker() + + # if not self.client.is_connected(): + # """ 无效通信接口, 打印报文后返回 """ + # print(ByteConv.trans_list_to_str(frame)) + # return False + + fail_count = 0 + while fail_count < self.retry: + frame_discard = self.__read(timeout=0) + self.__send(frame) + self.log['send'] += 1 + + if self.flag_print and frame_discard: + print("Discard Data: " , frame_discard) + if self.flag_print: + print("Send Frame: ", ByteConv.trans_list_to_str(frame)) + + if self.__read_frame(): + if (self.flag_print is not None) and 'Regs' in self.output.keys(): + protocols.print_display(self.output['Regs']) + self.log['read'] += 1 + break + + fail_count += 1 + self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail'] + time.sleep(2 * self.time_out) + + return fail_count < self.retry diff --git a/source/device/LaminaAdapter.py b/source/device/LaminaAdapter.py index c96c349..4fad488 100644 --- a/source/device/LaminaAdapter.py +++ b/source/device/LaminaAdapter.py @@ -37,6 +37,13 @@ ParamMap_LaminaAdapter = { 0x21: ["输出电容电电压", 2, 10], 0x22: ["参考电压", 2, 10], + 0x50: ["启停控制命令" , 2], + 0x51: ["故障清除命令" , 2], + 0x52: ["参数还原命令" , 2], + 0x53: ["设备复位命令" , 2], + 0x54: ["主动故障命令" , 2], + 0x55: ["短时停机命令" , 2], + 0x60: ["光伏通道使能", 1], 0x61: ["最小启动输入电压", 2, 10], 0x62: ["最大启动输入电压", 2, 10], @@ -194,7 +201,7 @@ class LaminaAdapter(DeviceSerial): self.block['data']['type'] = 'write_one' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data_val'] = int(dval * item_coff) return self._transfer_data() @@ -202,7 +209,7 @@ class LaminaAdapter(DeviceSerial): self.block['data']['type'] = 'write_dual' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data_val'] = int(dval * item_coff) return self._transfer_data()