替换旧版帧处理函数;

This commit is contained in:
2024-10-05 02:21:38 +08:00
parent 3706a51c6b
commit 7056b73237
6 changed files with 346 additions and 838 deletions

View File

@@ -1,8 +1,10 @@
import time import time
from math import ceil
from tqdm import tqdm
from pathlib import Path from pathlib import Path
from serial import Serial from serial import Serial
from tools.ByteConv import trans_list_to_str from tools.ByteConv import trans_list_to_str
from func_frame import make_frame_dlt645, check_frame_dlt645 from func_frame import make_frame_dlt645, check_frame_dlt645, print_display
from func_upgrade import GeneratePackage_SLCP101_p460 from func_upgrade import GeneratePackage_SLCP101_p460
modbus_map = { modbus_map = {
@@ -115,6 +117,8 @@ modbus_map = {
class LaminaAdapter: class LaminaAdapter:
""" 叠光适配器\优化器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信 # 初始化串口通信
if com_name is not None: if com_name is not None:
@@ -129,6 +133,7 @@ class LaminaAdapter:
self.flag_print = 'frame_print' in kwargs.keys() self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
@@ -140,50 +145,74 @@ class LaminaAdapter:
'data_define': modbus_map, 'data_define': modbus_map,
}, },
} }
self.output = {} self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __transfer_data(self, frame): def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_dlt645(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 报文数据传输 """ """ 报文数据传输 """
if self.__com is None: if self.__com is None:
print(trans_list_to_str(frame)) print(trans_list_to_str(frame))
return False return False
cnt = 0 fail_count = 0
while cnt < self.retry: while fail_count < self.retry:
self.__com.read_all() frame_discard = self.__com.read_all()
self.__com.write(bytearray(frame)) self.__com.write(frame)
self.log['send'] += 1
cnt_sub = 0 if self.flag_print and frame_discard:
frame_recv = None print("Discard Data: " , frame_discard)
while not frame_recv: if self.flag_print:
time.sleep(self.time_out) print("Send Frame: ", trans_list_to_str(frame))
frame_recv = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try: if self.__read_frame():
output_text = check_frame_dlt645(frame_recv, block=self.block) if 'Regs' in self.output.keys():
except Exception as e: print_display(self.output['Regs'])
print(e) self.log['read'] += 1
cnt += 1 break
continue else:
print(output_text) fail_count += 1
for line in output_text.split('\n'): if fail_count >= self.log['keep-fail']:
line = line.strip() self.log['keep-fail'] = fail_count
line_info = line.split('\t') time.sleep(2*self.time_out)
if line_info[0] == '0x0100':
self.output['version'] = line_info[2].rstrip('\x00')
elif line_info[0] == '0x0082':
self.output['address'] = line_info[2].rstrip('\x00')
break
if self.flag_print: return fail_count < self.retry
print(trans_list_to_str(frame))
print(trans_list_to_str(frame_recv))
return cnt < self.retry
def frame_read(self, daddr=0x60, dlen=0x50): def frame_read(self, daddr=0x60, dlen=0x50):
self.block['data']['type'] = 'read' self.block['data']['type'] = 'read'
@@ -222,6 +251,8 @@ class LaminaAdapter:
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
""" """
param_saved = self.flag_print, self.retry, self.time_out
self.block['data']['type'] = 'update' self.block['data']['type'] = 'update'
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
self.block['data']['index'] = 0 self.block['data']['index'] = 0
@@ -230,125 +261,65 @@ class LaminaAdapter:
# 启动帧 # 启动帧
frame_master = bytearray(make_frame_dlt645(self.block)) frame_master = bytearray(make_frame_dlt645(self.block))
# 等待擦除完成返回 if not self.__transfer_data(frame_master):
try_times = 30 self.flag_print, self.retry, self.time_out = param_saved
self.__com.read_all() print('Upgrade Fail: start')
while try_times: return False
time.sleep(self.time_out) self.block["data"]['file_block_size'] = self.output['upgrade']['length']
self.__com.write(frame_master)
frame_slave = self.__com.read_all()
if not frame_slave:
try_times -= 1
continue
_, _, self.block["data"]['file_block_size'] = check_frame_dlt645(frame_slave, self.block)
break
if self.block["data"]['file_block_size'] == 0:
raise Exception("Error slave response.")
# 避免接收到延迟返回报文 # 避免接收到延迟返回报文
time.sleep(self.time_out) time.sleep(self.time_out)
# 文件传输 # 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans' self.block["data"]['step'] = 'trans'
data_remain = len(self.block["data"]['file']) - self.block['data']['header_offset'] self.block['data']['index'] = 0
while data_remain > 0: frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
frame_master = bytearray(make_frame_dlt645(self.block)) for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
cnt = 0 frame_master = make_frame_dlt645(self.block['data'])
while cnt < self.retry: if not self.__transfer_data(frame_master):
self.__com.read_all() self.flag_print, self.retry, self.time_out = param_saved
self.__com.write(frame_master) print(f'Upgrade Fail: trans data in {idx}')
return False
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_dlt645(frame_slave, self.block)
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
self.block["data"]['index'] += 1
data_remain -= self.block["data"]['file_block_size']
# 结束升级 # 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end' self.block["data"]['step'] = 'end'
frame_master = bytearray(make_frame_dlt645(self.block)) self.block["data"]['index'] += 1
frame_master = make_frame_dlt645(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
cnt = 0 self.flag_print, self.retry, self.time_out = param_saved
while cnt < self.retry: return True
self.__com.read_all()
self.__com.write(frame_master)
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_dlt645(frame_slave, self.block)
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
def test_communication(time_out=2): def test_communication(time_out=2):
""" 通信成功率测试 """ """ 通信成功率测试 """
log_success = 0
log_failed = 0
log_failedseries = 0
cnt_failedseries = 0
time_start = time.time() time_start = time.time()
saveconfig_print = dev_lamina.flag_print param_saved = dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out
dev_lamina.flag_print = False dev_lamina.flag_print = False
dev_lamina.retry = 1
try: try:
while 1: while True:
if dev_lamina.frame_read(0x0E, 0x13): dev_lamina.frame_read(0x0C, 0x20)
log_success += 1
cnt_failedseries = 0
else:
log_failed += 1
cnt_failedseries += 1
if log_failedseries <= cnt_failedseries:
log_failedseries = cnt_failedseries
print(f"Time Stamp: {time.ctime()}") print(f"Time Stamp: {time.ctime()}")
print(f"Success Frame: {log_success}") print(f"Success Frame: {dev_lamina.log['read']}")
print(f"Failed Frame: {log_failed}") print(f"Failed Frame: {dev_lamina.log['send'] - dev_lamina.log['read']}")
print(f"Max Series Failed Frame: {log_failedseries}") print(f"Max Series Failed Frame: {dev_lamina.log['keep-fail']}")
time.sleep(time_out) time.sleep(time_out)
finally: finally:
time_end = time.time() time_end = time.time()
print("Test Result: ") print("Test Result: ")
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}") print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
print(f"Time Elapsed: {time_end - time_start}") print(f"Time Elapsed: {time_end - time_start}")
print(f"Success Rate: {log_success / (log_success + log_failed) * 100}%") print(f"Success Rate: {dev_lamina.log['read'] / dev_lamina.log['send'] * 100}%")
dev_lamina.flag_print = saveconfig_print dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out = param_saved
def make_Pakeage(fp: Path): def make_Pakeage(fp: Path):
""" 生成升级包 """ """ 生成升级包 """
@@ -364,25 +335,8 @@ def make_Pakeage(fp: Path):
return file_package return file_package
if __name__=='__main__':
mode_config = {
"Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
},
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.1, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.5, 'retry': 3, 'retry_sub': 10},
}
dev_lamina = LaminaAdapter(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20)
def test():
if 0: if 0:
dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压 dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压
dev_lamina.frame_write_one(0xA9, 2000) # 写ADC参考电压:2.0V dev_lamina.frame_write_one(0xA9, 2000) # 写ADC参考电压:2.0V
@@ -398,7 +352,6 @@ if __name__=='__main__':
dev_lamina.frame_write_one(0x99, 1500) # 写校准参数Vout_a: 1.500 dev_lamina.frame_write_one(0x99, 1500) # 写校准参数Vout_a: 1.500
dev_lamina.frame_write_one(0x9A, 1000) # 写校准参数Vout_a: 1.00 dev_lamina.frame_write_one(0x9A, 1000) # 写校准参数Vout_a: 1.00
dev_lamina.frame_write_one(0x9A, 1500) # 写校准参数Vout_a: 1.50 dev_lamina.frame_write_one(0x9A, 1500) # 写校准参数Vout_a: 1.50
if 0: if 0:
dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]) dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
@@ -438,7 +391,6 @@ if __name__=='__main__':
dev_lamina.frame_read(0x0E, 0x13) dev_lamina.frame_read(0x0E, 0x13)
time.sleep(0.5) time.sleep(0.5)
dev_lamina.frame_write_one(0x50, 0x00) dev_lamina.frame_write_one(0x50, 0x00)
if 0: if 0:
dev_lamina.frame_write_one(0x0054, 0x01) dev_lamina.frame_write_one(0x0054, 0x01)
dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_read(0x000E, 0x02)
@@ -471,25 +423,41 @@ if __name__=='__main__':
time.sleep(2) time.sleep(2)
dev_lamina.frame_read(0x0170, 0x30) dev_lamina.frame_read(0x0170, 0x30)
if __name__=='__main__':
mode_config = {
"Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
},
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.1, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.5, 'retry': 3, 'retry_sub': 10},
}
dev_lamina = LaminaAdapter(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"): if not hasattr(__builtins__,"__IPYTHON__"):
# file_package = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p460_o1\result\lamina_optimizer_t1.dat")
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240520_0000_T1.11.bin")
# 工程-即时转换 # 工程-即时转换
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
if not file_hex.exists(): if not file_hex.exists():
raise Exception("工程编译目标文件不存在.") raise Exception("工程编译目标文件不存在.")
file_package = make_Pakeage(file_hex) file_package = make_Pakeage(file_hex)
# 江苏发货产品灌装版本
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240603_2100_V1.18.bin")
version = "DLSY001_240911_1600_V1.01" version = "DLSY001_240911_1600_V1.01"
addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00] addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00]
while True: while True:
""" 自动检测升级流程 """ """ 自动检测升级流程(需要调试) """
ret = False ret = False
while not ret or ('version' not in dev_lamina.output.keys()) or (version == dev_lamina.output['version']): while not ret or ('Reg' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Reg'][0x0100][1]):
dev_lamina.frame_read(0x82, 3) dev_lamina.frame_read(0x82, 3)
ret = dev_lamina.frame_read(0x0100, 0x20) ret = dev_lamina.frame_read(0x0100, 0x20)
time.sleep(1) time.sleep(1)
@@ -500,7 +468,7 @@ if __name__=='__main__':
ret = dev_lamina.frame_read(0x0100, 0x20) ret = dev_lamina.frame_read(0x0100, 0x20)
if ret and (version == dev_lamina.output['version']): if ret and (version == dev_lamina.output['Reg'][0x0100][1]):
dev_lamina.frame_write_one(0x52, 0x01) dev_lamina.frame_write_one(0x52, 0x01)
print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}") print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}")

View File

@@ -7,7 +7,7 @@ from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed from tenacity import retry, stop_after_attempt, wait_fixed
from tools.ByteConv import trans_list_to_str from tools.ByteConv import trans_list_to_str
from tools.IntelHex import file_Bin_to_IntelHex from tools.IntelHex import file_Bin_to_IntelHex
from func_frame_re import make_frame_modbus, check_frame_modbus, print_display from func_frame import make_frame_modbus, check_frame_modbus, print_display
from func_upgrade import GenerateImage_DLSP001_p280039, GeneratePackage_DLSP001_p280039 from func_upgrade import GenerateImage_DLSP001_p280039, GeneratePackage_DLSP001_p280039
modbus_map = { modbus_map = {
@@ -114,6 +114,8 @@ modbus_map = {
class LaminaController: class LaminaController:
""" 叠光控制器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信 # 初始化串口通信
if com_name is not None: if com_name is not None:
@@ -177,7 +179,7 @@ class LaminaController:
self.output['result'] = False self.output['result'] = False
return self.output['result'] return self.output['result']
def __transfer_data(self, frame: bytes) -> bool: def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """ """ 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None: if self.__com is None:
print(trans_list_to_str(frame)) print(trans_list_to_str(frame))

View File

@@ -3,7 +3,7 @@ import socket
import random import random
from pathlib import Path from pathlib import Path
from tools.ByteConv import trans_list_to_str from tools.ByteConv import trans_list_to_str
from func_frame import make_frame_modbus, check_frame_modbus from source.func_frame import make_frame_modbus, check_frame_modbus, print_display
modbus_map = { modbus_map = {
0x00: ['编译日期', 4, 6], 0x00: ['编译日期', 4, 6],
@@ -11,18 +11,59 @@ modbus_map = {
} }
class EnergyRouter: class EnergyRouter:
def __init__(self, ip="192.168.100.10", port=7, adddr_modbus=0x01): """ 能量路由器远程升级测试(未完成)
"""
def __init__(self, ip="192.168.100.10", port=7, adddr_modbus=0x01, **kwargs):
self._ip = ip self._ip = ip
self._port = port self._port = port
self._addr_modbus =adddr_modbus self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_socket.connect((self._ip, self._port)) self.tcp_socket.connect((self._ip, self._port))
self.block = { self.block = {
'addr_dev' : self._addr_modbus, 'addr_dev' : adddr_modbus,
'data_define': modbus_map, 'data_define': modbus_map,
} }
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __transfer_data(self, frame: bytes) -> bool:
""" 数据传输处理函数 """
if self.tcp_socket is None:
print(trans_list_to_str(frame))
return False
try:
self.tcp_socket.send(frame)
time.sleep(self.time_out)
frame_recv = self.tcp_socket.recv(128)
self.output = check_frame_modbus(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def frame_read(self, daddr=0x00, dlen=0x10): def frame_read(self, daddr=0x00, dlen=0x10):
self.block['type'] = 'read' self.block['type'] = 'read'
@@ -30,40 +71,26 @@ class EnergyRouter:
self.block['data_len'] = dlen self.block['data_len'] = dlen
frame = make_frame_modbus(self.block) frame = make_frame_modbus(self.block)
if self.tcp_socket is None: return self.__transfer_data(frame)
print(trans_list_to_str(frame))
return
# self.tcp_socket.recv(8)
self.tcp_socket.send(bytearray(frame))
time.sleep(0.5)
frame_recv = self.tcp_socket.recv(128)
output_text = check_frame_modbus(frame_recv, self.block)
print(output_text)
def frame_update(self, path_bin): def frame_update(self, path_bin):
""" 程序升级 """ 程序升级
""" """
param_saved = self.flag_print, self.retry, self.time_out
self.block['type'] = 'update' self.block['type'] = 'update'
self.block['step'] = 'start' self.block['step'] = 'start'
self.block['file'] = Path(path_bin).read_bytes() self.block['file'] = Path(path_bin).read_bytes()
self.block['header_offset'] = 128 self.block['header_offset'] = 128
# 启动帧 # 启动帧
frame_master = bytearray(make_frame_modbus(self.block)) frame_master = make_frame_modbus(self.block)
# 等待擦除完成返回 if not self.__transfer_data(frame_master):
time.sleep(0.4) self.flag_print, self.retry, self.time_out = param_saved
self.tcp_socket.send(frame_master) print('Upgrade Fail: start')
frame_slave = self.tcp_socket.recv(32) return False
if frame_slave == '': self.block['file_block_size'] = self.output['upgrade']['length']
raise Exception("TCP closed.")
self.block['file_block_size'] = check_frame_modbus(frame_slave, self.block)
if self.block['file_block_size'] == 0:
raise Exception("Error slave response.")
# 避免接收到延迟返回报文 # 避免接收到延迟返回报文
time.sleep(0.4) time.sleep(0.4)
@@ -87,7 +114,7 @@ class EnergyRouter:
continue continue
seq_window[i] = 1 seq_window[i] = 1
self.block['index'] = seq_offset + i self.block['index'] = seq_offset + i
seq_frame_master[i] = bytearray(make_frame_modbus(self.block)) seq_frame_master[i] = make_frame_modbus(self.block)
self.tcp_socket.send(seq_frame_master[i]) self.tcp_socket.send(seq_frame_master[i])
# 接收帧回复 # 接收帧回复
tmp = list(zip(range(len(seq_window)), seq_window)) tmp = list(zip(range(len(seq_window)), seq_window))
@@ -97,10 +124,11 @@ class EnergyRouter:
# 接收到空数据, 对端已关闭连接 # 接收到空数据, 对端已关闭连接
if seq_frame_slave[i] == '': if seq_frame_slave[i] == '':
raise Exception("TCP closed.") raise Exception("TCP closed.")
result, seq_current, seq_hope = check_frame_modbus(seq_frame_slave[i], None) self.output = check_frame_modbus(seq_frame_slave[i], None)
seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope']
if seq_current < seq_offset: if seq_current < seq_offset:
raise Exception("Error.") raise Exception("Error.")
elif result: elif self.output['result']:
seq_window[seq_current - seq_offset] = 2 seq_window[seq_current - seq_offset] = 2
data_remain -= self.block['file_block_size'] data_remain -= self.block['file_block_size']
if seq_hope is not None and seq_hope < seq_offset: if seq_hope is not None and seq_hope < seq_offset:
@@ -121,16 +149,15 @@ class EnergyRouter:
seq_window[i] = 0 seq_window[i] = 0
# 结束升级 # 结束升级
ret = 0
self.block['step'] = 'end' self.block['step'] = 'end'
frame_master = bytearray(make_frame_modbus(self.block)) frame_master = make_frame_modbus(self.block)
while ret == 0: while self.output['result'] is False:
self.tcp_socket.send(frame_master) self.tcp_socket.send(frame_master)
frame_slave = self.tcp_socket.recv(8) frame_slave = self.tcp_socket.recv(8)
if frame_slave == '': if frame_slave == '':
raise Exception("TCP closed.") raise Exception("TCP closed.")
ret = check_frame_modbus(frame_slave[:18], self.block) self.output = check_frame_modbus(frame_slave[:18], self.block)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,3 +1,4 @@
import re
import struct import struct
from crc import Calculator, Crc16 from crc import Calculator, Crc16
from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex
@@ -31,7 +32,8 @@ frame_modbus = {
0x11: [11, 2, 7] 0x11: [11, 2, 7]
} }
def make_frame_modbus(block:dict):
def make_frame_modbus(block:dict) -> bytearray:
""" modbus 生成函数""" """ modbus 生成函数"""
frame = [] frame = []
calculator = Calculator(Crc16.MODBUS) calculator = Calculator(Crc16.MODBUS)
@@ -73,8 +75,6 @@ def make_frame_modbus(block:dict):
elif block['type'][:6] == 'record': elif block['type'][:6] == 'record':
""" 录波系列报文 """ """ 录波系列报文 """
frame.append(0x11) frame.append(0x11)
frame_types = block['type'].split('_')
if block['type'][7:10] == 'cfg': if block['type'][7:10] == 'cfg':
frame.append(0x01) frame.append(0x01)
elif block['type'][7:11] == 'data': elif block['type'][7:11] == 'data':
@@ -151,14 +151,14 @@ def make_frame_modbus(block:dict):
else: else:
raise Exception("Modbus Frame Type Error.") raise Exception("Modbus Frame Type Error.")
crc = calculator.checksum(bytearray(frame)) crc = calculator.checksum(bytearray(frame))
frame.append(crc % 256) frame.append(crc % 256)
frame.append(crc // 256) frame.append(crc // 256)
return frame return bytearray(frame)
def make_frame_dlt645(block:dict):
def make_frame_dlt645(block:dict) -> bytearray:
""" dlt645 生成函数""" """ dlt645 生成函数"""
frame = [] frame = []
@@ -179,79 +179,85 @@ def make_frame_dlt645(block:dict):
frame.append(sum(frame) % 256) frame.append(sum(frame) % 256)
frame.append(0x16) frame.append(0x16)
return frame return bytearray(frame)
def display_data(modbus_map: dict, address: int, data: list):
""" 格式化表示数据 """
def display_str(data, len_data):
item = bytearray(data[:2 * len_data])
for i in range(len_data):
t = item[2*i]
item[2*i] = item[2*i+1]
item[2*i+1] = t
return item
output = "Parse Result: \n" def check_frame_modbus(frame:bytes, block:dict) -> dict:
idx = address """ 校验modbus帧回复 """
len_max = 0 if len(frame:=find_frame_modbus(frame, block['addr_dev'])) == 0:
while data: raise Exception("No frame data")
data_len = 1
data_label = "未知数据" output = {
if idx not in modbus_map.keys(): 'result': False,
item = data[0] * 0x0100 + data[1] 'code_func': frame[1],
item = display_hex(item, 4) 'code_sub': frame[2],
}
if output['code_func'] == 0x07:
""" 升级回复帧 """
output['upgrade'] = {}
output['upgrade']['index'] = frame[4] * 256 + frame[5]
if output['code_sub'] == 0x01:
""" 升级开始, 处理文件头 """
output['upgrade']['length'] = frame[6] * 256 + frame[7]
elif output['code_sub'] == 0x02:
""" 升级传输, 解析帧序号及返回值, 不做序列号校验 """
pass
elif output['code_sub'] == 0x03:
""" 升级结束 """
pass
else: else:
current_map = modbus_map[idx] raise Exception(" Upgrade SubFunc code error.")
data_label = current_map[0] output['result'] = True if frame[3] == 0x00 else False
if current_map[1] == 1: output['code_error'] = frame[3]
""" Hex字符表示 """ elif output['code_func'] == 0x03 or output['code_func'] == 0x04:
item = data[0] * 0x0100 + data[1] """ 数据读取帧 """
item = display_hex(item, 4) if frame[2] == len(frame[3:-2]):
elif current_map[1] == 2: output['Regs'] = display_data(block['data_addr'], frame[3:-2], block['data_define'])
""" 16位数值表示 """ else:
item = data[0] * 0x0100 + data[1] raise Exception("Frame read data length error.")
if item > 0x8000: output['result'] = True
item -= 0x1_0000 elif output['code_func'] == 0x06:
elif current_map[1] == 3: """ 单个数据写入帧 """
""" 32位数值表示 """ output['result'] = True
item = data[2] * 0x0100 + data[3] elif output['code_func'] == 0x10:
item *= 0x10000 """ 多个数据写入帧 """
item += data[0] * 0x0100 + data[1] output['result'] = True
if data[2] > 0x80: elif output['code_func'] == 0x11:
item -= 0x1_0000_0000 """ 录波功能帧 """
data_len = 2 data_record = {}
elif current_map[1] == 4: if frame[2] == 0x01:
""" 字符串表示 """ data_record['type'] = 'config'
data_len = current_map[2] elif frame[2] == 0x02:
item = display_str(data, data_len) data_record["type"] = 'data'
item = item.replace(b'\xff', b' 0xFF') else:
item = item.decode() raise Exception("Unknow data type")
elif current_map[1] == 5: data_record['seq'] = frame[5] * 0x100 + frame[6]
""" 载波地址表示 """ data_record['total'] = frame[3] * 0x100 + frame[4]
data_len = current_map[2] data_record['data'] = frame[9:-2]
item = display_str(data, data_len) output['record'] = data_record
item = trans_list_to_str(item) output['result'] = True
elif current_map[1] == 6: elif output['code_func'] & 0x80:
""" 浮点数值表示 """ """ 错误返回帧 """
temp = [data[2], data[3], data[0], data[1]] output['code_error'] = frame[2] if output['code_func'] == 0x87 else frame[3]
item = struct.unpack('>f', bytes(temp))[0] else:
data_len = 2 raise Exception(f"Frame Date error. func={output['code_func']}, func_sub={output['code_sub']}, len={len(frame)}")
elif current_map[1] == 7:
""" 正序数值表示 """
data_len = current_map[2]
item = " ".join(map(lambda x: str(x), data[:2 * data_len]))
if len_max < len(data_label):
len_max = len(data_label)
len_remain = len_max - len(data_label)
output_line = "\t".join([display_hex(idx, 4), data_label + " " * len_remain, str(item)])
idx += data_len
del data[0:data_len * 2]
output += output_line + "\n"
return output return output
def find_frame_modbus(buffer, address, frame_defines: dict=frame_modbus):
def check_frame_dlt645(frame:bytes, block:dict) -> dict:
""" 校验dlt645帧回复 """
if len(frame:=find_frame_dlt645(frame, block['addr'])) == 0:
raise Exception("No frame data")
code_func = frame[8]
if code_func == 0x9F:
return check_frame_modbus(frame[10:-2], block['data'])
else:
raise Exception("DLT645 Frame type error.")
def find_frame_modbus(buffer:bytes, address:int, frame_defines: dict=frame_modbus) -> bytes:
""" 搜索合法modbus帧子串 """ """ 搜索合法modbus帧子串 """
len_buffer = len(buffer) len_buffer = len(buffer)
@@ -286,7 +292,8 @@ def find_frame_modbus(buffer, address, frame_defines: dict=frame_modbus):
return buffer[pos_frame: pos_frame+len_frame] return buffer[pos_frame: pos_frame+len_frame]
def find_frame_dlt645(buffer, address: list):
def find_frame_dlt645(buffer:bytes, address: list) -> bytes:
""" 搜索合法645帧子串 """ """ 搜索合法645帧子串 """
len_buffer = len(buffer) len_buffer = len(buffer)
@@ -304,186 +311,81 @@ def find_frame_dlt645(buffer, address: list):
return buffer[pos_frame: pos_frame+len_frame] return buffer[pos_frame: pos_frame+len_frame]
def check_frame_modbus(frame, block=None):
""" 校验modbus帧回复 """
if len(frame:=find_frame_modbus(frame, block['addr_dev'])) == 0: def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict:
raise Exception("No frame data") """ 格式化表示数据, 得到显示数据字典 """
def swapping_words(data:bytes, length:int) -> bytearray:
code_func = frame[1] item = bytearray(data[:2 * length])
code_subfunc = frame[2] for i in range(length):
if code_func == 0x07: item[2*i], item[2*i+1] = item[2*i+1], item[2*i]
""" 升级回复帧 """ return item
if frame[3] == 0x00: def convert_regs_to_int(data:bytes, length:int, signed: bool=True) -> int:
check_result = True result = 0
for idx_data, byte_data in enumerate(swapping_words(data, length)):
result += byte_data * 2 ** (8 * idx_data)
result -= 2 ** (16 * length) if signed and result >= (2 ** (16 * length - 1)) else 0
return result
output_data = {}
idx = address
while data:
data_label, data_len = "未知数据", 1
if idx not in modbus_map.keys():
item = convert_regs_to_int(data, 1, signed=False)
item = display_hex(item, 4)
else: else:
check_result = False current_map = modbus_map[idx]
update_index = frame[4] * 256 + frame[5] data_label = current_map[0]
check_hope = 0 if current_map[1] == 1:
if code_subfunc == 0x01: """ Hex字符表示 """
""" 处理帧头 """ item = convert_regs_to_int(data, 1, signed=False)
check_hope = frame[6] * 256 + frame[7] item = display_hex(item, 4)
elif code_subfunc == 0x02: elif current_map[1] == 2:
""" 解析帧序号及返回值, 不做序列号校验 """ """ 16位数值表示 """
elif code_subfunc == 0x03: item = convert_regs_to_int(data, 1)
""" 升级结束 """ if len(current_map) > 2:
else: item /= current_map[2]
raise Exception("Func code or Return code error.") elif current_map[1] == 3:
return check_result, update_index, check_hope """ 32位数值表示 """
elif code_func == 0x03 or code_func == 0x04: data_len = 2
""" 数据读取帧 """ item = convert_regs_to_int(data, 2)
if frame[2] == len(frame[3:-2]): if len(current_map) > 2:
if type(block) is dict: item /= current_map[2]
data_addr = block['data_addr'] elif current_map[1] == 4:
return display_data(block['data_define'], data_addr, list(frame[3:-2])) """ 字符串表示 """
else: data_len = current_map[2]
return trans_list_to_str(frame[3:-2]) item = swapping_words(data, data_len)
else: item = item.replace(b'\xff', b' 0xFF').decode()
raise Exception("Frame read error.") elif current_map[1] == 5:
elif code_func == 0x06: """ 载波地址表示 """
""" 单个数据写入帧 """ data_len = current_map[2]
return trans_list_to_str(frame[2:-2]) item = swapping_words(data, data_len)
elif code_func == 0x10: item = trans_list_to_str(item)
""" 多个数据写入帧 """ elif current_map[1] == 6:
return trans_list_to_str(frame[2:-2]) """ 浮点数值表示 """
elif code_func == 0x11: data_len = 2
""" 录波功能帧 """ item = struct.unpack('>f', bytes([data[2], data[3], data[0], data[1]]))[0]
frame_data = { elif current_map[1] == 7:
'seq': frame[5] * 0x100 + frame[6], """ 正序数值表示 """
'total': frame[3] * 0x100 + frame[4], data_len = current_map[2]
'data': frame[9:-2] item = list(map(lambda x: x, data[:2 * data_len]))
}
if frame[2] == 0x01:
frame_data['type'] = 'config'
elif frame[2] == 0x02:
frame_data["type"] = 'data'
else:
raise Exception("Unknow data type")
return True, frame_data
elif code_func & 0x80:
""" 错误返回帧 """
if code_func & 0x7F == 0x07:
return False, frame[3], code_func, code_subfunc
return False, frame[2], code_func
else:
raise Exception(f"Frame Date error. func={code_func}, func_sub={code_subfunc}, len={len(frame)}")
def check_frame_dlt645(frame, block=None): output_data[idx] = data_label, item
""" 校验dlt645帧回复 """ idx += data_len
if len(frame:=find_frame_dlt645(frame, block['addr'])) == 0: data = data[2*data_len:]
raise Exception("No frame data") return output_data
code_func = frame[8]
if code_func == 0x9F:
block = block['data']
return check_frame_modbus(frame[10:-2], block)
else:
raise Exception("DLT645 Frame type error.")
if __name__ == "__main__": def print_display(output_data: dict):
frame_slave = '01 07 02 00 00 A7 00 F0 A1 A4 1F DC 0B 1D 27 08 0F CB A7 49 03 99 09 C6 87 10 FB 08 86 71 9F 2A 2F BB 8F 69 4D 5C 9F 90 51 8A 8B D3 E0 85 9F 2B C1 9A 7E A3 22 9B 29 EE 6B 70 28 D3 31 F6 EF 78 43 8A DF FC F3 8C 13 02 31 F4 65 B5 EE 46 80 F2 E9 D4 E9 C8 F2 84 13 3A DF 50 1D 45 53 52 1D 89 6F F8 CB 7F 56 28 DF A2 11 D4 47 93 04 04 FF AB 35 1F C3 BA D2 F0 65 F2 F6 A3 AC A5 A2 AF AF 3E 88 65 EC 7B 35 62 DA 4A CF A4 69 A5 9E C7 70 E6 DC DD BE 49 DD DA 09 CE 42 18 5C 57 86 12 E0 A0 74 5F 5C F7 35 B3 FE 7C 51 94 5C 57 28 1A 86 1E 9F DE F6 B2 4B A9 29 E6 30 EA F2 BB E6 72 81 E9 80 3A DE FC DC C2 F8 8E 30 F4 66 B3 25 12 30 FA 90 09 3C 8C 1D FD 49 8E 4A 58 1C 17 48 54 CF 6A DE B4 05 7F 3D DD 68 F2 7E C2 CE 01 11 D6 DE 5E 7C 27 10 FE 28 28 3E 06 4D C8 01 07 02 00 00 A7 F4 08' """ 格式化表示输出数据 """
buffer_test = trans_str_to_list("62 01 03 25 1C 00 01 03 3A 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 10 01 F3 00 06 00 12 01 DB 00 19 00 BA 00 00 24 62 00 00 25 1C 00 00 00 0F 00 01 00 07 00 2B 02 4D FD 6C FD 6C 01 20 01 20 73 08 4D FD 6C FD") print("Parse Result:")
buffer_test1 = trans_str_to_list("01 03 3A 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 11 01 F4 00 06 00 13 01 DB 00 1A 00 D5 00 00 25 AE 00 00 26 83 00 00 00 0C FF FF 00 56 00 72 02 1D FD 6C FD 6C 01 26 01 26 94 E3") label_len_max = max(map(lambda x: len(x[0]), output_data.values()))
buffer_test2 = trans_str_to_list("01 06 00 60 00 00 89 D4 94 E3") data_len_max = max(map(lambda x: len(str(x[1])), output_data.values()))
buffer_test3 = trans_str_to_list(frame_slave) for key, value in output_data.items():
modbus_map = { label = value[0]
# 1 - Hex data = "-".join(map(str, value[1])) if type(value) == list else value[1]
# 2 - Int16 print(f"{display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}")
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
0x0C: ["告警字1", 1],
0x0D: ["告警字2", 1],
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["系统工作状态" , 1],
0x11: ["Boost1工作状态" , 1],
0x12: ["Boost2工作状态" , 1],
0x13: ["开关机状态" , 1],
0x14: ["光伏组串1输入电压" , 2],
0x15: ["光伏组串2输入电压" , 2],
0x16: ["Boost1电感电流" , 2],
0x17: ["Boost2电感电流" , 2],
0x18: ["Boost输出电压" , 2],
0x19: ["Boost输出总电流" , 2],
0x1A: ["Boost1功率" , 3],
0x1C: ["Boost2功率" , 3],
0x1E: ["输入总功率" , 3],
0x20: ["LLC输出电压" , 2],
0x21: ["端口输出电压" , 2],
0x22: ["LLC输出电流均值" , 2],
0x23: ["LLC输出电流峰值" , 2],
0x24: ["绝缘检测电压" , 2],
0x25: ["散热片温度" , 2],
0x26: ["腔体1温度" , 2],
0x27: ["腔体2温度" , 2],
0x28: ["设备温度" , 2],
0x50: ["启停控制命令" , 2],
0x51: ["故障清除命令" , 2],
0x52: ["参数还原命令" , 2],
0x53: ["设备复位命令" , 2],
0x60: ["整机运行使能", 1],
0x61: ["最小启动允许输入电压", 2],
0x62: ["最大启动允许输入电压", 2],
0x63: ["最小停机输入电压", 2],
0x64: ["最大停机输入电压", 2],
0x65: ["最小启动允许输出电压", 2],
0x66: ["最大启动允许输出电压", 2],
0x67: ["最小停止允许输出电压", 2],
0x68: ["最大停止允许输出电压", 2],
0x69: ["最小MPPT电流限值", 2],
0x6A: ["最大MPPT电流限值", 2],
0x6B: ["保留数据项", 2],
0x6C: ["最大功率限值", 3],
0x6E: ["最大功率限值存储值", 3],
0x70: ["Boost输入过压保护值", 2],
0x71: ["Boost输出过压保护值", 2],
0x72: ["LLC输出过压保护值", 2],
0x73: ["LLC输出欠压保护值", 2],
0x74: ["Boost电感过流保护值", 2],
0x75: ["LLC输出电流均值保护值", 2],
0x76: ["LLC输出电流峰值保护值", 2],
0x77: ["保留数据项", 2],
0x78: ["过载保护值", 3],
0x7A: ["过温故障值", 2],
0x7B: ["过温告警值", 2],
0x7C: ["过温恢复值", 2],
0x7D: ["输出继电器故障判断差值", 2],
0x7E: ["保留数据项", 1],
0x7F: ["保留数据项", 1],
0x80: ["三点法中间阈值", 2],
0x81: ["浮充电压", 2],
0x82: ["恒压充电电压", 2],
0x83: ["llc软起开始电压", 2],
0x84: ["boost开始运行电压", 2],
0x85: ["boost停止运行电压", 2],
0x86: ["绝缘检测正阻抗限值", 3],
0x88: ["绝缘检测负阻抗限值", 3],
0x8A: ["保留地址项", 2],
0x8B: ["保留地址项", 2],
0x8C: ["保留地址项", 2],
0x8D: ["保留地址项", 2],
0x8E: ["保留地址项", 2],
0x8F: ["保留地址项", 2],
0x100: ["程序版本字符串", 4, 16],
0x110: ["设备型号字符串", 4, 16],
0x120: ["保留地址项", 4, 16],
0x130: ["生产厂家字符串", 4, 8],
0x138: ["保留地址项", 4, 8],
0x140: ["保留地址项", 4, 16],
0x150: ["保留地址项", 4, 16],
0x160: ["硬件版本字符串", 4, 16],
0x170: ["设备序列号", 4, 16],
0x180: ["设备MES码", 4, 16],
0x190: ["出厂日期批次", 4, 16],
}
block = {
'addr_dev' : 0x01,
'data_define': modbus_map,
}
check_frame_modbus(bytearray(buffer_test3), block)

View File

@@ -1,391 +0,0 @@
import re
import struct
from crc import Calculator, Crc16
from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex
modbus_map = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
0x01: ["Hex示例", 1],
0x02: ["Int示例", 2],
0x03: ["Int32示例", 3],
0x04: ["str示例", 4, 16],
0x10: ["addr示例", 5, 6],
0x20: ["Float示例", 6],
}
frame_modbus = {
# func: len_base, type, idx
# type = 0, fixed length
# type = 1, add uint8 length by idx
# type = 2, add uint16 length by idx
# type = 3, check sub func code by idx, choose fixed length
0x03: [5, 1, 2],
0x04: [5, 1, 2],
0x06: [8, 0],
0x07: [8, 3, 2, {0x01: 2, 0x02: 0, 0x03: 0}],
0x10: [8, 0],
0x11: [11, 2, 7]
}
def make_frame_modbus(block:dict) -> bytearray:
""" modbus 生成函数"""
frame = []
calculator = Calculator(Crc16.MODBUS)
frame.append(block['addr_dev'])
if block['type'] == 'update':
""" 升级系列报文 """
frame.append(0x07)
if len(block['file']) <= block["header_offset"]:
raise Exception("Modbus Update error, file too small.")
if block['step'] == 'start':
frame.append(0x01)
frame.append(0x00)
frame.append(0x00)
frame.append(0x00)
frame.append(block['header_offset'] // 256)
frame.append(block['header_offset'] % 256)
frame += list(block['file'][:block['header_offset']])
elif block['step'] == 'trans':
file_offset = block["header_offset"] + block['index'] * block['file_block_size']
file_block = block['file'][file_offset: file_offset + block['file_block_size']]
frame.append(0x02)
frame.append(0x00)
frame.append(block['index'] // 256)
frame.append(block['index'] % 256)
frame.append(len(file_block) // 256)
frame.append(len(file_block) % 256)
frame += list(file_block)
elif block['step'] == 'end':
frame.append(0x03)
frame.append(0x00)
frame.append(block['index'] // 256)
frame.append(block['index'] % 256)
frame.append(0x00)
frame.append(0x00)
else:
raise Exception("Modbus Update Frame Step Error.")
elif block['type'][:6] == 'record':
""" 录波系列报文 """
frame.append(0x11)
if block['type'][7:10] == 'cfg':
frame.append(0x01)
elif block['type'][7:11] == 'data':
frame.append(0x02)
else:
raise Exception("Modbus Record Frame Type Error.")
if block['step'] == 'start':
frame.append(0x00)
elif block['step'] == 'next':
frame.append(0x01)
elif block['step'] == 'repeat':
frame.append(0x02)
else:
raise Exception("Modbus Record Frame Step Error.")
frame.append(block['file_block_size'] // 256)
frame.append(block['file_block_size'] % 256)
else:
""" 数据读取系列报文 """
data_addr = block['data_addr']
if block['type'] == "read":
frame.append(0x03)
data_len = block['data_len']
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_len // 256 % 256)
frame.append(data_len % 256)
elif block['type'] == "write_one":
frame.append(0x06)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_dual":
frame.append(0x10)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(0x02)
frame.append(0x04)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
data_val //= 0x1_0000
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_str":
frame.append(0x10)
data_len = len(block['data_val'])
item_len = 2 * block['data_define'][data_addr][2]
data_val = block['data_val']
if data_len > item_len:
raise Exception("Modbus data len oversize.")
elif data_len < item_len:
data_val += '\000' * (item_len - data_len)
data_len = len(data_val)
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(data_len // 2)
frame.append(data_len)
if type(data_val[0]) == str:
data_val = list(map(lambda x: x.encode()[0], data_val))
for i in range(data_len//2):
frame.append(data_val[2*i + 1])
frame.append(data_val[2*i])
else:
raise Exception("Modbus Frame Type Error.")
crc = calculator.checksum(bytearray(frame))
frame.append(crc % 256)
frame.append(crc // 256)
return bytearray(frame)
def make_frame_dlt645(block:dict) -> bytearray:
""" dlt645 生成函数"""
frame = []
if block['type'] == 'modbus':
""" Modbus 透传帧 """
ctrl_code = 0x1F # Modbus 透传帧功能码
data_frame = make_frame_modbus(block["data"])
len_data = len(data_frame)
else:
raise Exception("Unknown dlt645 frame type.")
frame.append(0x68)
frame += block['addr'][:6]
frame.append(0x68)
frame.append(ctrl_code)
frame.append(len_data)
frame += data_frame
frame.append(sum(frame) % 256)
frame.append(0x16)
return bytearray(frame)
def check_frame_modbus(frame:bytes, block:dict) -> dict:
""" 校验modbus帧回复 """
if len(frame:=find_frame_modbus(frame, block['addr_dev'])) == 0:
raise Exception("No frame data")
output = {
'result': False,
'code_func': frame[1],
'code_sub': frame[2],
}
if output['code_func'] == 0x07:
""" 升级回复帧 """
output['upgrade'] = {}
output['upgrade']['index'] = frame[4] * 256 + frame[5]
if output['code_sub'] == 0x01:
""" 升级开始, 处理文件头 """
output['upgrade']['length'] = frame[6] * 256 + frame[7]
elif output['code_sub'] == 0x02:
""" 升级传输, 解析帧序号及返回值, 不做序列号校验 """
pass
elif output['code_sub'] == 0x03:
""" 升级结束 """
pass
else:
raise Exception(" Upgrade SubFunc code error.")
output['result'] = True if frame[3] == 0x00 else False
output['code_error'] = frame[3]
elif output['code_func'] == 0x03 or output['code_func'] == 0x04:
""" 数据读取帧 """
if frame[2] == len(frame[3:-2]):
output['Regs'] = display_data(block['data_addr'], frame[3:-2], block['data_define'])
else:
raise Exception("Frame read data length error.")
output['result'] = True
elif output['code_func'] == 0x06:
""" 单个数据写入帧 """
output['result'] = True
elif output['code_func'] == 0x10:
""" 多个数据写入帧 """
output['result'] = True
elif output['code_func'] == 0x11:
""" 录波功能帧 """
data_record = {}
if frame[2] == 0x01:
data_record['type'] = 'config'
elif frame[2] == 0x02:
data_record["type"] = 'data'
else:
raise Exception("Unknow data type")
data_record['seq'] = frame[5] * 0x100 + frame[6]
data_record['total'] = frame[3] * 0x100 + frame[4]
data_record['data'] = frame[9:-2]
output['record'] = data_record
output['result'] = True
elif output['code_func'] & 0x80:
""" 错误返回帧 """
output['code_error'] = frame[2] if output['code_func'] == 0x87 else frame[3]
else:
raise Exception(f"Frame Date error. func={output['code_func']}, func_sub={output['code_sub']}, len={len(frame)}")
return output
def check_frame_dlt645(frame:bytes, block:dict) -> dict:
""" 校验dlt645帧回复 """
if len(frame:=find_frame_dlt645(frame, block['addr'])) == 0:
raise Exception("No frame data")
code_func = frame[8]
if code_func == 0x9F:
return check_frame_modbus(frame[10:-2], block['data'])
else:
raise Exception("DLT645 Frame type error.")
def find_frame_modbus(buffer:bytes, address:int, frame_defines: dict=frame_modbus) -> bytes:
""" 搜索合法modbus帧子串 """
len_buffer = len(buffer)
pos_frame, len_frame = 0, 0
calculator = Calculator(Crc16.MODBUS)
for i in range(len_buffer):
if buffer[i] != address:
continue
if (buffer[i+1] & 0x7F) not in frame_defines.keys():
continue
frame_define = frame_defines[buffer[i+1] & 0x7F]
j = frame_define[0]
if buffer[i+1] & 0x80:
j = 5
elif frame_define[1] == 0:
pass
elif frame_define[1] == 1:
j += buffer[i+frame_define[2]]
elif frame_define[1] == 2:
j += buffer[i+frame_define[2]] * 0x100 + buffer[i+frame_define[2]+1]
elif frame_define[1] == 3:
if buffer[i+frame_define[2]] not in frame_define[3].keys():
continue
j += frame_define[3][buffer[i+frame_define[2]]]
else:
raise Exception("Unknow Modbus Define type.")
if ((i+j) <= len_buffer) and calculator.checksum(buffer[i:i+j-2]) == (buffer[i+j-1] * 0x100 + buffer[i+j-2]):
pos_frame, len_frame = i, j
break
return buffer[pos_frame: pos_frame+len_frame]
def find_frame_dlt645(buffer:bytes, address: list) -> bytes:
""" 搜索合法645帧子串 """
len_buffer = len(buffer)
pos_frame, len_frame = 0, 0
for i in range(len_buffer):
if buffer[i] != 0x68 or buffer[i+7] != 0x68:
continue
if address[0] != 0xAA and buffer[i+1:i+7] == bytes(address):
continue
j = buffer[i+9] + 12
if sum(buffer[i:i+j-2]) % 0x100 == buffer[i+j-2]:
pos_frame, len_frame = i, j
break
return buffer[pos_frame: pos_frame+len_frame]
def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict:
""" 格式化表示数据, 得到显示数据字典 """
def swapping_words(data:bytes, length:int) -> bytearray:
item = bytearray(data[:2 * length])
for i in range(length):
item[2*i], item[2*i+1] = item[2*i+1], item[2*i]
return item
def convert_regs_to_int(data:bytes, length:int, signed: bool=True) -> int:
result = 0
for idx_data, byte_data in enumerate(swapping_words(data, length)):
result += byte_data * 2 ** (8 * idx_data)
result -= 2 ** (16 * length) if signed and result >= (2 ** (16 * length - 1)) else 0
return result
output_data = {}
idx = address
while data:
data_label, data_len = "未知数据", 1
if idx not in modbus_map.keys():
item = convert_regs_to_int(data, 1, signed=False)
item = display_hex(item, 4)
else:
current_map = modbus_map[idx]
data_label = current_map[0]
if current_map[1] == 1:
""" Hex字符表示 """
item = convert_regs_to_int(data, 1, signed=False)
item = display_hex(item, 4)
elif current_map[1] == 2:
""" 16位数值表示 """
item = convert_regs_to_int(data, 1)
if len(current_map) > 2:
item /= current_map[2]
elif current_map[1] == 3:
""" 32位数值表示 """
data_len = 2
item = convert_regs_to_int(data, 2)
if len(current_map) > 2:
item /= current_map[2]
elif current_map[1] == 4:
""" 字符串表示 """
data_len = current_map[2]
item = swapping_words(data, data_len)
item = item.replace(b'\xff', b' 0xFF').decode()
elif current_map[1] == 5:
""" 载波地址表示 """
data_len = current_map[2]
item = swapping_words(data, data_len)
item = trans_list_to_str(item)
elif current_map[1] == 6:
""" 浮点数值表示 """
data_len = 2
item = struct.unpack('>f', bytes([data[2], data[3], data[0], data[1]]))[0]
elif current_map[1] == 7:
""" 正序数值表示 """
data_len = current_map[2]
item = list(map(lambda x: x, data[:2 * data_len]))
output_data[idx] = data_label, item
idx += data_len
data = data[2*data_len:]
return output_data
def print_display(output_data: dict):
""" 格式化表示输出数据 """
print("Parse Result:")
label_len_max = max(map(lambda x: len(x[0]), output_data.values()))
data_len_max = max(map(lambda x: len(str(x[1])), output_data.values()))
for key, value in output_data.items():
label = value[0]
data = "-".join(map(str, value[1])) if type(value) == list else value[1]
print(f"{display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}")

View File

@@ -1,7 +1,7 @@
import time import time
from webui import webui from webui import webui
from pathlib import Path from pathlib import Path
from func_frame import check_frame_dlt645 from source.func_frame import check_frame_dlt645
from source.dev_LaminaAdapter import LaminaAdapter from source.dev_LaminaAdapter import LaminaAdapter
from source.device.EnergyRouter import EnergyRouter from source.device.EnergyRouter import EnergyRouter
@@ -15,9 +15,9 @@ def my_function(e : webui.event):
print("Data from JavaScript: " + e.window.get_str(e, 0)) # Message from JS print("Data from JavaScript: " + e.window.get_str(e, 0)) # Message from JS
frame = e.window.get_str(e, 0) frame = e.window.get_str(e, 0)
block_dlt645 = e.window.get_str(e, 1) block_dlt645 = e.window.get_str(e, 1)
output_text = check_frame_dlt645(frame, block=block_dlt645) block_ouput = check_frame_dlt645(frame, block=block_dlt645)
return output_text return block_ouput
events = [] events = []
def main_webui(): def main_webui():