添加录波报文解析功能;

优化升级流程, 简化调用方式;
This commit is contained in:
何 泽隆
2024-09-03 17:48:11 +08:00
parent 518a325072
commit 4897d17d11
2 changed files with 224 additions and 23 deletions

View File

@@ -44,6 +44,9 @@ modbus_map = {
0x51: ["故障清除命令" , 2],
0x52: ["参数还原命令" , 2],
0x53: ["设备复位命令" , 2],
0x54: ["模式更改命令" , 2],
0x55: ["短时停机命令(未启用)" , 2],
0x56: ["手动录波命令" , 2],
0x60: ["整机运行使能", 1],
0x61: ["最小启动允许输入电压", 2],
@@ -72,8 +75,8 @@ modbus_map = {
0x7B: ["过温告警值", 2],
0x7C: ["过温恢复值", 2],
0x7D: ["输出继电器故障判断差值", 2],
0x7E: ["保留数据项", 1],
0x7F: ["保留数据项", 1],
0x7E: ["LLC输出电压给定值", 2],
0x7F: ["Boost输出电压给定值", 2],
0x80: ["三点法中间阈值", 2],
0x81: ["浮充电压", 2],
0x82: ["恒压充电电压", 2],
@@ -198,6 +201,153 @@ class LaminaController:
return self.__transfer_data(frame)
def frame_record(self):
""" 读取录波数据
"""
self.block['data']['file_block_size'] = 240
# 读取config
self.block['data']['type'] = 'record_cfg'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret = True
frame_data_cfg = []
while ret:
self.__com.read_all()
self.__com.write(frame_master)
time.sleep(0.1)
frame_slave = self.__com.read_all()
ret, block_cfg = check_frame_modbus(frame_slave, self.block['data'])
if ret:
frame_data_cfg.append(block_cfg)
if block_cfg['seq'] == 0:
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (block_cfg['seq'] + 1) >= block_cfg['total']:
ret = False
# 读取data
self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret = True
frame_data_record = []
while ret:
self.__com.read_all()
self.__com.write(frame_master)
time.sleep(0.3)
frame_slave = self.__com.read_all()
ret, block_record = check_frame_modbus(frame_slave, self.block['data'])
if ret:
frame_data_record.append(block_record)
if block_record['seq'] == 0:
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (block_record['seq'] + 1) >= block_record['total']:
ret = False
# 处理数据
data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg])
config_record = {'LinePos': []}
pos = 0
if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
len_faultword = 2 * (data_cfg_bare[3] * 0x100 + data_cfg_bare[2])
pos += 2
config_record['FaultWord'] = []
for i in range(0, len_faultword, 2):
faultword = data_cfg_bare[pos+i+1] * 0x100 + data_cfg_bare[pos+i]
config_record['FaultWord'].append(faultword)
pos += len_faultword
config_record['SysStatus'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultRecordPosition'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Alg'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelDescription'] = []
config_record['ChannelCoefficient'] = []
for i in range(config_record['ChannelNum']):
len_str = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
ch_desc = []
for j in range(0, len_str, 2):
ch_desc.append(data_cfg_bare[pos + j])
pos += len_str
ch_coe = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChannelDescription'].append(bytearray(ch_desc).decode())
config_record['ChannelCoefficient'].append(ch_coe / 10000)
if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FreqNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SampleFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SamplePoint'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['DataType'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10],
'msec': data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12],
}
config_record['StartTime'] = time_stamp
pos += 14
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10],
'msec': data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12],
}
config_record['TriggerTime'] = time_stamp
data_record = b"".join([x['data'] for x in frame_data_record])
return config_record, data_record
def frame_update(self, path_bin):
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
@@ -341,27 +491,15 @@ def make_Image():
hex_boot = root / r"DLSP001_240828_0900_BL1.01.hex"
hex_main = root / r"DLSP001_240828_0900_V1.01.hex"
hex_back = root / r"DLSP001_240828_0900_B1.01.hex"
hex_update = root / r"DLSP001_240828_0900_T1.01.hex"
file_image = result / f'{hex_main.stem}_ROM.hex'
file_main_header = result / 'header_main.bin'
file_back_header = result / 'header_back.bin'
file_package = result / f'{hex_update.stem}.dat'
file_update_bin = result / f'{hex_update.stem}.bin'
file_package_buffer = result / f'{hex_update.stem}.datbuffer'
data_bins = GenerateImage_DLSP001_p280039(hex_boot, hex_main, hex_back)
data_package, data_update_bin = GeneratePackage_DLSP001_p280039(hex_update)
data_buffer = bytearray(2 * len(data_package))
for i in range(len(data_package)):
data_buffer[2*i] = data_package[i]
file_package.write_bytes(data_package)
file_main_header.write_bytes(data_bins[1])
file_back_header.write_bytes(data_bins[2])
file_update_bin.write_bytes(data_update_bin)
file_package_buffer.write_bytes(data_buffer)
data_hex = file_Bin_to_IntelHex(data_bins[0], 0x80000, memory_width=2)
file_image.write_text(data_hex)
@@ -389,6 +527,27 @@ def make_Image():
file_image3.write_text(data_hex)
def make_Pakeage(fp: Path):
""" 生成升级包 """
hex_update = fp
file_package = fp.parent / f'{hex_update.stem}.dat'
file_update_bin = fp.parent / f'{hex_update.stem}.bin'
file_package_buffer = fp.parent / f'{hex_update.stem}.datbuffer'
data_package, data_update_bin = GeneratePackage_DLSP001_p280039(hex_update)
data_buffer = bytearray(2 * len(data_package))
for i in range(len(data_package)):
data_buffer[2*i] = data_package[i]
file_package.write_bytes(data_package)
file_update_bin.write_bytes(data_update_bin)
file_package_buffer.write_bytes(data_buffer)
return file_package
if __name__=='__main__':
mode_config = {
"Log": {'com_name': None,
@@ -405,6 +564,8 @@ if __name__=='__main__':
dev_lamina.frame_read(0x0100, 0x20)
dev_lamina.frame_record()
if 0:
dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
@@ -479,20 +640,18 @@ if __name__=='__main__':
if not hasattr(__builtins__,"__IPYTHON__"):
make_Image()
path_project = Path("D:\WorkingProject\LightStackOptimizer\software\lamina_controller_dsp\lamina_controller_dsp")
file_hex = path_project / "DEBUG\lamina_controller_dsp.hex"
if not file_hex.exists():
raise Exception("工程编译目标文件不存在.")
file_package = make_Pakeage(file_hex)
path_bin = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\DLSP001_240828_0900_T1.01.dat")
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240520_0000_T1.11.bin")
# 生产镜像版本
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240525_1800_V1.12.bin")
# 江苏发货产品灌装版本
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240603_2100_V1.18.bin")
dev_lamina.frame_update(path_bin)
dev_lamina.frame_update(file_package)
time.sleep(6)
dev_lamina.frame_read(0x0100, 0x20)
dev_lamina.frame_write_one(0x52, 0x01)
# dev_lamina.frame_write_one(0x52, 0x01)

View File

@@ -70,6 +70,29 @@ def make_frame_modbus(block:dict):
frame.append(0x00)
else:
raise Exception("Modbus Update Frame Step Error.")
elif block['type'][:6] == 'record':
""" 录波系列报文 """
frame.append(0x11)
frame_types = block['type'].split('_')
if block['type'][7:10] == 'cfg':
frame.append(0x01)
elif block['type'][7:11] == 'data':
frame.append(0x02)
else:
raise Exception("Modbus Record Frame Type Error.")
if block['step'] == 'start':
frame.append(0x00)
elif block['step'] == 'next':
frame.append(0x01)
elif block['step'] == 'repeat':
frame.append(0x02)
else:
raise Exception("Modbus Record Frame Step Error.")
frame.append(block['file_block_size'] // 256)
frame.append(block['file_block_size'] % 256)
else:
""" 数据读取系列报文 """
data_addr = block['data_addr']
@@ -317,6 +340,25 @@ def check_frame_modbus(frame, block=None):
elif code_func == 0x10:
""" 多个数据写入帧 """
return trans_list_to_str(frame[2:-2])
elif code_func == 0x11:
""" 录波功能帧 """
frame_data = {
'seq': frame[5] * 0x100 + frame[6],
'total': frame[3] * 0x100 + frame[4],
'data': frame[9:-2]
}
if frame[2] == 0x01:
frame_data['type'] = 'config'
elif frame[2] == 0x02:
frame_data["type"] = 'data'
else:
raise Exception("Unknow data type")
return True, frame_data
elif code_func & 0x80:
""" 错误返回帧 """
if code_func & 0x7F == 0x07:
return False, frame[3], code_func, code_subfunc
return False, frame[2], code_func
else:
raise Exception(f"Frame Date error. func={code_func}, func_sub={code_subfunc}, len={len(frame)}")