添加MQTT主站连接脚本;
This commit is contained in:
179
source/dev_station.py
Normal file
179
source/dev_station.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
""" 主站通信脚本
|
||||||
|
mqtt协议
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import logging
|
||||||
|
from paho.mqtt import client as mqtt_client
|
||||||
|
|
||||||
|
from device.LaminaAdapter import ParamMap_LaminaAdapter
|
||||||
|
from device.DeviceMQTT import DeviceMQTT
|
||||||
|
from device.function import protocols
|
||||||
|
|
||||||
|
MainStation = {
|
||||||
|
"broker": '123.249.75.235',
|
||||||
|
"port": 1883,
|
||||||
|
"account": ('TTE0101TC2311000003', 'qh10579lcb7au8o2')
|
||||||
|
}
|
||||||
|
|
||||||
|
ParamMap_LaminaCombiner = {
|
||||||
|
# 1 - Hex
|
||||||
|
# 2 - Int16[ratio]
|
||||||
|
# 3 - lnt32[ratio]
|
||||||
|
# 4 - str
|
||||||
|
# 5 - addr
|
||||||
|
# 6 - float
|
||||||
|
0x000: ["系统型号", 4, 16],
|
||||||
|
0x010: ["程序版本", 4, 16],
|
||||||
|
|
||||||
|
0x800: ["设备地址", 5, 3],
|
||||||
|
0x803: ["时间", 5, 3],
|
||||||
|
0x806: ["文件记录日志级别", 1],
|
||||||
|
0x807: ["串口输出日志级别", 1],
|
||||||
|
0x808: ["日志输出控制字", 1],
|
||||||
|
0x809: ["mqtt设备编码(deviceID)", 4, 25],
|
||||||
|
0x822: ["mqtt直流表子设备编码(deviceID)", 4, 25],
|
||||||
|
0x83B: ["mqtt适配器子设备编码(deviceID)", 4, 25],
|
||||||
|
0x854: ["报文控制字", 1],
|
||||||
|
0x855: ["汇流箱类型", 1],
|
||||||
|
0x856: ["待升级适配器映射", 1],
|
||||||
|
0x857: ["适配器1地址", 5, 3],
|
||||||
|
0x85A: ["适配器1线路ID", 1],
|
||||||
|
0x85B: ["适配器2地址", 5, 3],
|
||||||
|
0x85E: ["适配器2线路ID", 1],
|
||||||
|
0x85F: ["适配器3地址", 5, 3],
|
||||||
|
0x862: ["适配器3线路ID", 1],
|
||||||
|
0x863: ["适配器4地址", 5, 3],
|
||||||
|
0x866: ["适配器4线路ID", 1],
|
||||||
|
0x867: ["适配器5地址", 5, 3],
|
||||||
|
0x86A: ["适配器5线路ID", 1],
|
||||||
|
0x86B: ["适配器6地址", 5, 3],
|
||||||
|
0x86E: ["适配器6线路ID", 1],
|
||||||
|
0x86F: ["适配器7地址", 5, 3],
|
||||||
|
0x872: ["适配器7线路ID", 1],
|
||||||
|
0x873: ["适配器8地址", 5, 3],
|
||||||
|
0x876: ["适配器8线路ID", 1],
|
||||||
|
0x877: ["适配器9地址", 5, 3],
|
||||||
|
0x87A: ["适配器9线路ID", 1],
|
||||||
|
0x87B: ["适配器10地址", 5, 3],
|
||||||
|
0x87E: ["适配器10线路ID", 1],
|
||||||
|
0x87F: ["适配器11地址", 5, 3],
|
||||||
|
0x882: ["适配器11线路ID", 1],
|
||||||
|
0x883: ["适配器12地址", 5, 3],
|
||||||
|
0x886: ["适配器12线路ID", 1],
|
||||||
|
0x887: ["适配器13地址", 5, 3],
|
||||||
|
0x88A: ["适配器13线路ID", 1],
|
||||||
|
0x88B: ["适配器14地址", 5, 3],
|
||||||
|
0x88E: ["适配器14线路ID", 1],
|
||||||
|
0x88F: ["适配器15地址", 5, 3],
|
||||||
|
0x892: ["适配器15线路ID", 1],
|
||||||
|
0x893: ["适配器16地址", 5, 3],
|
||||||
|
0x896: ["适配器16线路ID", 1],
|
||||||
|
0x897: ["档案自适用使能", 1],
|
||||||
|
0x898: ["档案自适用收集地址时间", 2, 1],
|
||||||
|
0x899: ["适配器开机时间", 2, 1],
|
||||||
|
0x89A: ["适配器关机时间", 2, 1],
|
||||||
|
0x89B: ["有流阈值", 2, 1],
|
||||||
|
0x89C: ["无流阈值", 2, 1],
|
||||||
|
0x89D: ["路由组网时间", 2, 1],
|
||||||
|
0x89E: ["并发抄读最大并发数", 2, 1],
|
||||||
|
0x89F: ["并发抄读等待回复超时时间", 2, 1],
|
||||||
|
0x8A0: ["并发抄读数据有效维持时间", 2, 1],
|
||||||
|
0x8A1: ["输出母线过压阈值", 2, 1],
|
||||||
|
0x8A2: ["默认电池基准电压", 2, 1],
|
||||||
|
0x8A3: ["实际电池基准电压", 2, 1],
|
||||||
|
0x8A4: ["功率限制功能关闭的适配器个数", 2, 1],
|
||||||
|
0x8A5: ["功率限制功能使关闭的适配器映射", 2, 1],
|
||||||
|
0x8A6: ["功率限制比率", 2, 1],
|
||||||
|
0x8A7: ["下挂电表类型", 1],
|
||||||
|
0x8A8: ["功率限制电池电压回执(差值)", 2, 1],
|
||||||
|
0x8A9: ["功率恢复电池电压回执(差值)", 2, 1],
|
||||||
|
0x8AA: ["是否ODM", 1],
|
||||||
|
0x8AB: ["系统类型", 4, 16],
|
||||||
|
0x8BB: ["控制器生产厂商", 4, 16],
|
||||||
|
0x8CB: ["控制器型号", 4, 16],
|
||||||
|
0x8DB: ["厂家缩写", 4, 4],
|
||||||
|
0x8DF: ["汇流箱软件版本前缀", 4, 8],
|
||||||
|
0x8E7: ["适配器软件版本前缀", 4, 8],
|
||||||
|
0x8EF: ["适配器生产厂商", 4, 8],
|
||||||
|
0x8F7: ["适配器型号", 4, 16],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class LaminaStation(DeviceMQTT):
|
||||||
|
def __init__(self, device_id, station=None, **kwargs):
|
||||||
|
""" 设备初始化 """
|
||||||
|
def check_frame_modbus_MultiDevice(frame):
|
||||||
|
""" 多设备帧报文检测 """
|
||||||
|
frame_block = self.block.copy()
|
||||||
|
if (0x4000 < frame_block['data_addr']) and (frame_block['data_addr'] < 0x6000):
|
||||||
|
""" 报文来自于适配器 """
|
||||||
|
dev_id = (frame_block['data_addr'] - 0x4000) // 0x200
|
||||||
|
frame_block['data_addr'] -= 0x4000 + 0x200 * dev_id
|
||||||
|
frame_block['data_define'] = ParamMap_LaminaAdapter
|
||||||
|
if ((frame_block['data_addr'] + frame_block['data_len']) >= 0x200):
|
||||||
|
raise ValueError("Data addresses across boundaries")
|
||||||
|
return protocols.check_frame_modbus(frame, frame_block)
|
||||||
|
|
||||||
|
if station is None:
|
||||||
|
station = MainStation
|
||||||
|
super().__init__(**station, device_id=device_id,
|
||||||
|
callbacks=(lambda : protocols.make_frame_modbus(self.block),
|
||||||
|
check_frame_modbus_MultiDevice),
|
||||||
|
**kwargs)
|
||||||
|
self.block = {
|
||||||
|
'addr_dev' : 0x00,
|
||||||
|
'data_define': ParamMap_LaminaCombiner,
|
||||||
|
}
|
||||||
|
|
||||||
|
def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
|
||||||
|
self.block['type'] = 'read'
|
||||||
|
self.block['data_addr'] = daddr
|
||||||
|
self.block['data_len'] = dlen
|
||||||
|
return self._transfer_data()
|
||||||
|
|
||||||
|
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
|
||||||
|
self.block['type'] = 'write_one'
|
||||||
|
self.block['data_addr'] = daddr
|
||||||
|
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
|
||||||
|
self.block['data_val'] = int(dval * item_coff)
|
||||||
|
return self._transfer_data()
|
||||||
|
|
||||||
|
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
|
||||||
|
self.block['type'] = 'write_dual'
|
||||||
|
self.block['data_addr'] = daddr
|
||||||
|
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
|
||||||
|
self.block['data_val'] = int(dval * item_coff)
|
||||||
|
return self._transfer_data()
|
||||||
|
|
||||||
|
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
|
||||||
|
self.block['type'] = 'write_str'
|
||||||
|
self.block['data_addr'] = daddr
|
||||||
|
self.block['data_val'] = dval
|
||||||
|
return self._transfer_data()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
mode_config = {
|
||||||
|
"dev1": {'device_id': 'TTE0101DX2406140046', # 张家港鹿苑北单管塔
|
||||||
|
'frame_print': True,
|
||||||
|
'time_out': 4, 'retry': 1},
|
||||||
|
"dev2": {'device_id': 'TTE0101DX2409230113', # 常来东-光伏
|
||||||
|
'frame_print': True,
|
||||||
|
'time_out': 4, 'retry': 1},
|
||||||
|
"dev3": {'device_id': 'TTE0101DX2406270041', # 大丰市镇区补点139
|
||||||
|
'frame_print': True,
|
||||||
|
'time_out': 4, 'retry': 1},
|
||||||
|
"dev4": {'device_id': 'TTE0101DX2407020114', # 大丰大龙南
|
||||||
|
'frame_print': True,
|
||||||
|
'time_out': 4, 'retry': 1},
|
||||||
|
}
|
||||||
|
dev_lamina = LaminaStation(**mode_config["dev4"])
|
||||||
|
|
||||||
|
dev_lamina.frame_read(0x0000, 0x20)
|
||||||
|
dev_lamina.frame_read(0x4100, 0x20)
|
||||||
|
|
||||||
|
if not hasattr(__builtins__,"__IPYTHON__"):
|
||||||
|
pass
|
||||||
190
source/device/DeviceMQTT.py
Normal file
190
source/device/DeviceMQTT.py
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
import time
|
||||||
|
import json
|
||||||
|
import random
|
||||||
|
import logging
|
||||||
|
from paho.mqtt import client as mqtt_client
|
||||||
|
|
||||||
|
from .tools import ByteConv
|
||||||
|
from .function import protocols
|
||||||
|
|
||||||
|
# 通信重连参数
|
||||||
|
FIRST_RECONNECT_DELAY = 1
|
||||||
|
RECONNECT_RATE = 2
|
||||||
|
MAX_RECONNECT_COUNT = 12
|
||||||
|
MAX_RECONNECT_DELAY = 60
|
||||||
|
|
||||||
|
class DeviceMQTT:
|
||||||
|
""" MQTT通信设备原型
|
||||||
|
Note: 配置通道及订阅主题
|
||||||
|
"""
|
||||||
|
def __init__(self, broker, port, account=None, device_id=None, callbacks=None, **kwargs):
|
||||||
|
""" 设备初始化 """
|
||||||
|
self.topic = None
|
||||||
|
self.client_id = f'python-DeviceMQTT-{random.randint(0, 1000)}'
|
||||||
|
self.client = self.open_connection(broker, port, account, **kwargs)
|
||||||
|
self._subscribe(device_id)
|
||||||
|
|
||||||
|
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
|
||||||
|
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
|
||||||
|
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
|
||||||
|
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
|
||||||
|
|
||||||
|
match callbacks:
|
||||||
|
case (maker, parser):
|
||||||
|
self._frame_maker = maker if maker is not None else lambda self: ''
|
||||||
|
self._frame_parser = parser if parser is not None else lambda self, frame: ''
|
||||||
|
case _:
|
||||||
|
self._frame_maker = lambda self: ''
|
||||||
|
self._frame_parser = lambda self, frame: ''
|
||||||
|
|
||||||
|
self._message = {}
|
||||||
|
self.output = {
|
||||||
|
'result': False,
|
||||||
|
'code_func': 0x00,
|
||||||
|
}
|
||||||
|
self.log = {
|
||||||
|
'send': 0,
|
||||||
|
'read': 0,
|
||||||
|
'keep-fail': 0,
|
||||||
|
'record': {
|
||||||
|
'config': None,
|
||||||
|
'data': None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
def open_connection(self, broker, port, account=None, **kwargs) ->bool:
|
||||||
|
""" 创建链接 """
|
||||||
|
def on_connect(client, userdata, flags, rc, properties):
|
||||||
|
""" 回调函数-创建链接 """
|
||||||
|
if rc == 0:
|
||||||
|
print("Connected to MQTT Broker!")
|
||||||
|
else:
|
||||||
|
print("Failed to connect, return code %d\n", rc)
|
||||||
|
def on_disconnect(client, userdata, rc):
|
||||||
|
""" 回调函数-断开链接 """
|
||||||
|
print("Disconnected with result code: %s", rc)
|
||||||
|
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
|
||||||
|
while reconnect_count < MAX_RECONNECT_COUNT:
|
||||||
|
print("Reconnecting in %d seconds...", reconnect_delay)
|
||||||
|
time.sleep(reconnect_delay)
|
||||||
|
|
||||||
|
try:
|
||||||
|
client.reconnect()
|
||||||
|
print("Reconnected successfully!")
|
||||||
|
return
|
||||||
|
except Exception as err:
|
||||||
|
logging.error("%s. Reconnect failed. Retrying...", err)
|
||||||
|
|
||||||
|
reconnect_delay *= RECONNECT_RATE
|
||||||
|
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
|
||||||
|
reconnect_count += 1
|
||||||
|
print("Reconnect failed after %s attempts. Exiting...", reconnect_count)
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
self._message = msg
|
||||||
|
print(f"Received message from `{msg.topic}` topic")
|
||||||
|
|
||||||
|
client = mqtt_client.Client(client_id=self.client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
|
||||||
|
if account is not None:
|
||||||
|
client.username_pw_set(account[0], account[1])
|
||||||
|
client.on_connect = on_connect if 'func_on_connect' not in kwargs.keys() else kwargs['func_on_connect']
|
||||||
|
client.on_disconnect = on_disconnect if 'func_on_disconnect' not in kwargs.keys() else kwargs['func_on_disconnect']
|
||||||
|
client.on_message = on_message if 'func_on_message' not in kwargs.keys() else kwargs['func_on_message']
|
||||||
|
client.connect(broker, port)
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
def close_connection(self) ->bool:
|
||||||
|
""" 关闭连接 """
|
||||||
|
self.client.disconnect()
|
||||||
|
|
||||||
|
def _subscribe(self, device_id):
|
||||||
|
""" 订阅主题 """
|
||||||
|
topic_send = f"ctiot/download/7/0101/{device_id}/function/invoke"
|
||||||
|
topic_read = f"ctiot/upload/7/0101/{device_id}/#"
|
||||||
|
self.device_id = device_id
|
||||||
|
self.topic = (topic_send, topic_read)
|
||||||
|
self.client.subscribe(topic_read)
|
||||||
|
|
||||||
|
def __send(self, msg: bytearray, topic=None) -> bool:
|
||||||
|
""" 发布消息 """
|
||||||
|
if topic is None:
|
||||||
|
topic = self.topic[0]
|
||||||
|
message = {
|
||||||
|
"deviceId": self.device_id,
|
||||||
|
"isSubDevice": 0,
|
||||||
|
"requestType": 0,
|
||||||
|
"messageId": "463g3ga53j4k25o9w",
|
||||||
|
"timestamp": int(time.time()),
|
||||||
|
"functionId": "168493059",
|
||||||
|
"inputs": [
|
||||||
|
{
|
||||||
|
"modbus_msg": ByteConv.trans_list_to_str(msg).replace(" ", '')
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
result = self.client.publish(topic, str(message).replace('\'', '\"'))
|
||||||
|
return result[0] == 1
|
||||||
|
|
||||||
|
def __read(self, timeout=None) -> bytes:
|
||||||
|
self._message = None
|
||||||
|
timeout = timeout if timeout is not None else 1
|
||||||
|
self.client.loop_start()
|
||||||
|
time.sleep(timeout)
|
||||||
|
self.client.loop_stop()
|
||||||
|
|
||||||
|
if self._message:
|
||||||
|
""" 报文接收成功 """
|
||||||
|
message_data = json.loads(self._message.payload)
|
||||||
|
frame_recv = " ".join((message_data['modbus_msg'][2*i:2*(i+1)] for i in range(len(message_data['modbus_msg'])//2)))
|
||||||
|
frame_recv = ByteConv.trans_str_to_list(frame_recv)
|
||||||
|
return bytearray(frame_recv)
|
||||||
|
else:
|
||||||
|
return b''
|
||||||
|
|
||||||
|
def __read_frame(self) ->bool:
|
||||||
|
""" 读取报文并解析帧 """
|
||||||
|
try:
|
||||||
|
frame_recv = self.__read(timeout=self.time_out)
|
||||||
|
self.output = self._frame_parser(frame_recv)
|
||||||
|
if self.flag_print:
|
||||||
|
print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv))
|
||||||
|
except Exception as ex:
|
||||||
|
print("Error Info: ", ex)
|
||||||
|
if self.flag_print and frame_recv:
|
||||||
|
print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv))
|
||||||
|
self.output['result'] = False
|
||||||
|
return self.output['result']
|
||||||
|
|
||||||
|
def _transfer_data(self) -> bool:
|
||||||
|
""" 串口收发报文, 包含重试逻辑与数据打印 """
|
||||||
|
# 生成发送帧
|
||||||
|
frame: bytearray = self._frame_maker()
|
||||||
|
|
||||||
|
# if not self.client.is_connected():
|
||||||
|
# """ 无效通信接口, 打印报文后返回 """
|
||||||
|
# print(ByteConv.trans_list_to_str(frame))
|
||||||
|
# return False
|
||||||
|
|
||||||
|
fail_count = 0
|
||||||
|
while fail_count < self.retry:
|
||||||
|
frame_discard = self.__read(timeout=0)
|
||||||
|
self.__send(frame)
|
||||||
|
self.log['send'] += 1
|
||||||
|
|
||||||
|
if self.flag_print and frame_discard:
|
||||||
|
print("Discard Data: " , frame_discard)
|
||||||
|
if self.flag_print:
|
||||||
|
print("Send Frame: ", ByteConv.trans_list_to_str(frame))
|
||||||
|
|
||||||
|
if self.__read_frame():
|
||||||
|
if (self.flag_print is not None) and 'Regs' in self.output.keys():
|
||||||
|
protocols.print_display(self.output['Regs'])
|
||||||
|
self.log['read'] += 1
|
||||||
|
break
|
||||||
|
|
||||||
|
fail_count += 1
|
||||||
|
self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail']
|
||||||
|
time.sleep(2 * self.time_out)
|
||||||
|
|
||||||
|
return fail_count < self.retry
|
||||||
@@ -37,6 +37,13 @@ ParamMap_LaminaAdapter = {
|
|||||||
0x21: ["输出电容电电压", 2, 10],
|
0x21: ["输出电容电电压", 2, 10],
|
||||||
0x22: ["参考电压", 2, 10],
|
0x22: ["参考电压", 2, 10],
|
||||||
|
|
||||||
|
0x50: ["启停控制命令" , 2],
|
||||||
|
0x51: ["故障清除命令" , 2],
|
||||||
|
0x52: ["参数还原命令" , 2],
|
||||||
|
0x53: ["设备复位命令" , 2],
|
||||||
|
0x54: ["主动故障命令" , 2],
|
||||||
|
0x55: ["短时停机命令" , 2],
|
||||||
|
|
||||||
0x60: ["光伏通道使能", 1],
|
0x60: ["光伏通道使能", 1],
|
||||||
0x61: ["最小启动输入电压", 2, 10],
|
0x61: ["最小启动输入电压", 2, 10],
|
||||||
0x62: ["最大启动输入电压", 2, 10],
|
0x62: ["最大启动输入电压", 2, 10],
|
||||||
@@ -194,7 +201,7 @@ class LaminaAdapter(DeviceSerial):
|
|||||||
self.block['data']['type'] = 'write_one'
|
self.block['data']['type'] = 'write_one'
|
||||||
self.block['data']['data_addr'] = daddr
|
self.block['data']['data_addr'] = daddr
|
||||||
self.block['data']['data_val'] = dval
|
self.block['data']['data_val'] = dval
|
||||||
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
|
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
|
||||||
self.block['data_val'] = int(dval * item_coff)
|
self.block['data_val'] = int(dval * item_coff)
|
||||||
return self._transfer_data()
|
return self._transfer_data()
|
||||||
|
|
||||||
@@ -202,7 +209,7 @@ class LaminaAdapter(DeviceSerial):
|
|||||||
self.block['data']['type'] = 'write_dual'
|
self.block['data']['type'] = 'write_dual'
|
||||||
self.block['data']['data_addr'] = daddr
|
self.block['data']['data_addr'] = daddr
|
||||||
self.block['data']['data_val'] = dval
|
self.block['data']['data_val'] = dval
|
||||||
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
|
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
|
||||||
self.block['data_val'] = int(dval * item_coff)
|
self.block['data_val'] = int(dval * item_coff)
|
||||||
return self._transfer_data()
|
return self._transfer_data()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user