Files
DebugTool/source/dev_LaminaAdapter.py
何 泽隆 5cb2a423c6 - 重命名模式文件;
- 更新适配器参数项;
 - 修改优化器报文函数调用;
 - 添加自定义升级模式标志位;
 - 添加升级包生成流程函数;
2024-08-27 21:20:10 +08:00

452 lines
17 KiB
Python

import time
from pathlib import Path
from serial import Serial
from utl import trans_list_to_str
from func_frame import make_frame_dlt645, check_frame_dlt645
modbus_map = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
0x11: ["系统工作状态", 1],
0x12: ["系统工作模式", 1],
0x13: ["输入电压", 2],
0x14: ["电感电流", 2],
0x15: ["12V电压", 2],
0x16: ["输出电压", 2],
0x17: ["输入电流", 2],
0x18: ["温度1", 2],
0x19: ["温度2", 2],
0x1A: ["输入功率", 3],
0x1C: ["设备温度", 2],
0x1D: ["开关机状态", 1],
0x1E: ["电池电压", 2],
0x1F: ["并机功率限值", 3],
0x60: ["光伏通道使能", 1],
0x61: ["最小启动输入电压", 2],
0x62: ["最大启动输入电压", 2],
0x63: ["最小停止输入电压", 2],
0x64: ["最大停止输入电压", 2],
0x65: ["最小MPPT电压", 2],
0x66: ["最大MPPT电压", 2],
0x67: ["最小启动输出电压", 2],
0x68: ["最大启动输出电压", 2],
0x69: ["最小停止输出电压", 2],
0x6A: ["最大停止输出电压", 2],
0x6B: ["输入过压保护值", 2],
0x6C: ["输出过压保护值", 2],
0x6D: ["输出欠压保护值", 2],
0x6E: ["电感过流保护值", 2],
0x6F: ["输入过流保护值", 2],
0x70: ["最小电感电流限值", 2],
0x71: ["最大电感电流限值", 2],
0x72: ["浮充电压阈值", 2],
0x73: ["三点法中间阈值", 2],
0x74: ["恒压充电电压", 2],
0x75: ["过温故障值", 2],
0x76: ["过温告警值", 2],
0x77: ["温度恢复值", 2],
0x78: ["最低满载电压", 2],
0x79: ["最高满载电压", 2],
0x7A: ["输入过载保护值", 3],
0x7C: ["最小功率限值", 3],
0x7E: ["最大功率限值", 3],
0x80: ["最大功率限值存储值", 3],
0x82: ["载波通信地址", 5, 3],
0x85: ["电压环out_max", 2],
0x86: ["电压环out_min", 2],
0x87: ["电流环out_max", 2],
0x88: ["电流环out_min", 2],
0x89: ["MPPT扰动系数k_d_vin", 2],
0x8A: ["dmin", 2],
0x8B: ["dmax", 2],
0x8C: ["扫描电压偏移scanvolt_offset", 2],
0x8D: ["电压环Kp", 3],
0x8F: ["电压环Ki", 3],
0x91: ["电流环Kp", 3],
0x93: ["电流环Ki", 3],
0x95: ["日志级别", 1],
0x96: ["日志输出方式", 1],
0x97: ["采样校准volt_in_a", 2],
0x98: ["采样校准volt_in_b", 2],
0x99: ["采样校准volt_out_a", 2],
0x9A: ["采样校准volt_out_b", 2],
0x9B: ["采样校准curr_in_a", 2],
0x9C: ["采样校准curr_in_b", 2],
0x9D: ["采样校准curr_induc_a", 2],
0x9E: ["采样校准curr_induc_b", 2],
0x9F: ["采样校准volt_12V_a", 2],
0xA0: ["采样校准volt_12V_b", 2],
0xA1: ["温度补偿temp1_b", 2],
0xA2: ["温度补偿temp2_b", 2],
0xA3: ["系统工作模式", 2],
0xA4: ["电感电流给定值curr_set", 2],
0xA5: ["抖动频率上限", 2],
0xA6: ["抖动频率下限", 2],
0xA7: ["电池电压判断限值", 2],
0xA8: ["MPPT追踪模式", 1],
0xA9: ["ADC参考电压", 1],
0xAA: ["保留", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
0xAE: ["保留", 1],
0xAF: ["保留", 1],
0x100: ["版本", 4, 16],
0x110: ["型号", 4, 16],
0x120: ["载波芯片地址", 4, 16],
0x130: ["厂商", 4, 8],
0x138: ["保留", 4, 8],
0x140: ["保留", 4, 16],
0x150: ["保留", 4, 16],
0x160: ["硬件", 4, 16],
0x170: ["SN", 4, 16],
0x180: ["MES", 4, 16],
0x190: ["Datetime", 4, 16],
}
class LaminaAdapter:
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, **com_config)
else:
self.__com =None
self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
self.block = {
'addr' : addr_645,
'type' : 'modbus',
'data' : {
'addr_dev' : addr_modbus,
'data_define': modbus_map,
},
}
def __transfer_data(self, frame):
""" 报文数据传输 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
cnt = 0
while cnt < self.retry:
self.__com.read_all()
self.__com.write(bytearray(frame))
cnt_sub = 0
frame_recv = None
while not frame_recv:
time.sleep(self.time_out)
frame_recv = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
output_text = check_frame_dlt645(frame_recv, block=self.block)
except Exception as e:
print(e)
cnt += 1
continue
print(output_text)
break
if self.flag_print:
print(trans_list_to_str(frame))
print(trans_list_to_str(frame_recv))
return cnt < self.retry
def frame_read(self, daddr=0x60, dlen=0x50):
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900):
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600):
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_update(self, path_bin):
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
self.block['data']['type'] = 'update'
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
self.block['data']['file'] = Path(path_bin).read_bytes()
self.block['data']['header_offset'] = 184
# 启动帧
frame_master = bytearray(make_frame_dlt645(self.block))
# 等待擦除完成返回
try_times = 30
self.__com.read_all()
while try_times:
time.sleep(self.time_out)
self.__com.write(frame_master)
frame_slave = self.__com.read_all()
if not frame_slave:
try_times -= 1
continue
_, _, self.block["data"]['file_block_size'] = check_frame_dlt645(frame_slave, self.block)
break
if self.block["data"]['file_block_size'] == 0:
raise Exception("Error slave response.")
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.block["data"]['step'] = 'trans'
data_remain = len(self.block["data"]['file']) - self.block['data']['header_offset']
while data_remain > 0:
frame_master = bytearray(make_frame_dlt645(self.block))
cnt = 0
while cnt < self.retry:
self.__com.read_all()
self.__com.write(frame_master)
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_dlt645(frame_slave, self.block)
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
self.block["data"]['index'] += 1
data_remain -= self.block["data"]['file_block_size']
# 结束升级
self.block["data"]['step'] = 'end'
frame_master = bytearray(make_frame_dlt645(self.block))
cnt = 0
while cnt < self.retry:
self.__com.read_all()
self.__com.write(frame_master)
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_dlt645(frame_slave, self.block)
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
def test_communication(time_out=2):
""" 通信成功率测试 """
log_success = 0
log_failed = 0
log_failedseries = 0
cnt_failedseries = 0
time_start = time.time()
saveconfig_print = dev_lamina.flag_print
dev_lamina.flag_print = False
try:
while 1:
if dev_lamina.frame_read(0x0E, 0x13):
log_success += 1
cnt_failedseries = 0
else:
log_failed += 1
cnt_failedseries += 1
if log_failedseries <= cnt_failedseries:
log_failedseries = cnt_failedseries
print(f"Time Stamp: {time.ctime()}")
print(f"Success Frame: {log_success}")
print(f"Failed Frame: {log_failed}")
print(f"Max Series Failed Frame: {log_failedseries}")
time.sleep(time_out)
finally:
time_end = time.time()
print("Test Result: ")
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
print(f"Time Elapsed: {time_end - time_start}")
print(f"Success Rate: {log_success / (log_success + log_failed) * 100}%")
dev_lamina.flag_print = saveconfig_print
if __name__=='__main__':
mode_config = {
"Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
},
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.1, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM4', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.5, 'retry': 3, 'retry_sub': 10},
}
dev_lamina = LaminaAdapter(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20)
if 0:
dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
dev_lamina.frame_write_str(0x82, [0x0A, 0x00, 0x00, 0x00, 0x00, 0x00])
dev_lamina.frame_write_str(0x82, [0xFF, 0x00, 0x00, 0x00, 0x00, 0x00])
dev_lamina.frame_write_str(0x82, [0xA1, 0x00, 0x00, 0x00, 0x00, 0x00])
dev_lamina.frame_write_str(0x82, [0x1A, 0x00, 0x00, 0x00, 0x00, 0x00])
dev_lamina.frame_write_str(0x82, [0x99, 0x99, 0x99, 0x99, 0x99, 0x99])
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
if 0:
dev_lamina.frame_write_one(0x52, 0x01)
time.sleep(0.5)
dev_lamina.frame_write_one(0x50, 0x00)
if 0:
while 1:
code_mes = input("扫描数据")
temp = code_mes[5:7] + code_mes[-10:]
code_addr = [int(temp[i:i+2], 16) for i in range(0, len(temp), 2)]
print(f"扫描结果: {code_mes}")
print(f"载波地址: {trans_list_to_str(code_addr)}")
dev_lamina.frame_read(0x100, 0x20)
time.sleep(0.5)
dev_lamina.frame_write_str(0x0180, list(code_mes))
time.sleep(0.5)
dev_lamina.frame_read(0x180, 0x10)
time.sleep(0.5)
dev_lamina.frame_write_str(0x82, code_addr)
time.sleep(0.5)
dev_lamina.frame_read(0x80, 0x08)
time.sleep(0.5)
dev_lamina.frame_write_one(0xA3, 0x01)
time.sleep(0.5)
dev_lamina.frame_write_one(0x51, 0x01)
time.sleep(5)
dev_lamina.frame_read(0x0E, 0x13)
time.sleep(5)
dev_lamina.frame_read(0x0E, 0x13)
time.sleep(0.5)
dev_lamina.frame_write_one(0x50, 0x00)
if 0:
dev_lamina.frame_write_one(0x0054, 0x01)
dev_lamina.frame_read(0x000E, 0x02)
dev_lamina.frame_write_one(0x0054, 0x03)
dev_lamina.frame_read(0x000E, 0x02)
dev_lamina.frame_write_one(0x0054, 0x07)
dev_lamina.frame_read(0x000E, 0x02)
dev_lamina.frame_write_one(0x0054, 0x06)
dev_lamina.frame_read(0x000E, 0x02)
dev_lamina.frame_write_one(0x0054, 0x04)
dev_lamina.frame_read(0x000E, 0x02)
dev_lamina.frame_write_one(0x0054, 0x00)
dev_lamina.frame_read(0x000E, 0x02)
if 0:
dev_lamina.frame_read(0x0E, 0x14)
if 0:
dev_lamina.frame_read(0x60, 0x60)
if 0:
dev_lamina.frame_read(0x70, 0x02)
dev_lamina.frame_write_one(0x70, 2100)
dev_lamina.frame_write_one(0x71, 2200)
dev_lamina.frame_read(0x70, 0x02)
dev_lamina.frame_write_one(0x70, 2300)
dev_lamina.frame_write_one(0x71, 2100)
dev_lamina.frame_read(0x70, 0x02)
if 0:
dev_lamina.frame_write_str(0x0170, list("SN202405201117-1"))
dev_lamina.frame_write_str(0x0180, list("MES202405201117-2"))
dev_lamina.frame_write_str(0x0190, list("D202405211500-3"))
time.sleep(2)
dev_lamina.frame_read(0x0170, 0x30)
if not hasattr(__builtins__,"__IPYTHON__"):
path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240604_1200_V1.01a.bin")
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240520_0000_T1.11.bin")
# 生产镜像版本
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240525_1800_V1.12.bin")
# 江苏发货产品灌装版本
# path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240603_2100_V1.18.bin")
dev_lamina.frame_update(path_bin)
time.sleep(6)
dev_lamina.frame_read(0x0100, 0x20)
dev_lamina.frame_write_one(0x52, 0x01)