重构叠光控制器脚本文件;

This commit is contained in:
何 泽隆
2024-10-04 21:20:49 +08:00
parent b3835de75a
commit 8fa7c8f40b

View File

@@ -1,10 +1,13 @@
import time import time
from math import ceil
from tqdm import tqdm
from pathlib import Path from pathlib import Path
from serial import Serial from serial import Serial
from datetime import datetime from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed
from tools.ByteConv import trans_list_to_str from tools.ByteConv import trans_list_to_str
from tools.IntelHex import file_Bin_to_IntelHex from tools.IntelHex import file_Bin_to_IntelHex
from func_frame import make_frame_modbus, check_frame_modbus from func_frame_re import make_frame_modbus, check_frame_modbus, print_display
from func_upgrade import GenerateImage_DLSP001_p280039, GeneratePackage_DLSP001_p280039 from func_upgrade import GenerateImage_DLSP001_p280039, GeneratePackage_DLSP001_p280039
modbus_map = { modbus_map = {
@@ -14,6 +17,7 @@ modbus_map = {
# 4 - str # 4 - str
# 5 - addr # 5 - addr
# 6 - float # 6 - float
# 7 - numberList
0x0B: ["事件标志", 1], 0x0B: ["事件标志", 1],
0x0C: ["告警字1", 1], 0x0C: ["告警字1", 1],
0x0D: ["告警字2", 1], 0x0D: ["告警字2", 1],
@@ -23,24 +27,24 @@ modbus_map = {
0x11: ["Boost1工作状态" , 1], 0x11: ["Boost1工作状态" , 1],
0x12: ["Boost2工作状态" , 1], 0x12: ["Boost2工作状态" , 1],
0x13: ["开关机状态" , 1], 0x13: ["开关机状态" , 1],
0x14: ["光伏组串1输入电压" , 2], 0x14: ["光伏组串1输入电压" , 2, 10],
0x15: ["光伏组串2输入电压" , 2], 0x15: ["光伏组串2输入电压" , 2, 10],
0x16: ["Boost1电感电流" , 2], 0x16: ["Boost1电感电流" , 2, 100],
0x17: ["Boost2电感电流" , 2], 0x17: ["Boost2电感电流" , 2, 100],
0x18: ["Boost输出电压" , 2], 0x18: ["Boost输出电压" , 2, 10],
0x19: ["Boost输出总电流" , 2], 0x19: ["Boost输出总电流" , 2, 100],
0x1A: ["Boost1功率" , 3], 0x1A: ["Boost1功率" , 3, 1000],
0x1C: ["Boost2功率" , 3], 0x1C: ["Boost2功率" , 3, 1000],
0x1E: ["输入总功率" , 3], 0x1E: ["输入总功率" , 3, 1000],
0x20: ["LLC输出电压" , 2], 0x20: ["LLC输出电压" , 2, 10],
0x21: ["端口输出电压" , 2], 0x21: ["端口输出电压" , 2, 10],
0x22: ["LLC输出电流均值" , 2], 0x22: ["LLC输出电流均值" , 2, 100],
0x23: ["LLC输出电流峰值" , 2], 0x23: ["LLC输出电流峰值" , 2, 100],
0x24: ["绝缘检测电压" , 2], 0x24: ["绝缘检测电压" , 2, 10],
0x25: ["散热片温度" , 2], 0x25: ["散热片温度" , 2, 10],
0x26: ["腔体1温度" , 2], 0x26: ["腔体1温度" , 2, 10],
0x27: ["腔体2温度" , 2], 0x27: ["腔体2温度" , 2, 10],
0x28: ["设备温度" , 2], 0x28: ["设备温度" , 2, 10],
0x50: ["启停控制命令" , 2], 0x50: ["启停控制命令" , 2],
0x51: ["故障清除命令" , 2], 0x51: ["故障清除命令" , 2],
@@ -52,44 +56,44 @@ modbus_map = {
0x57: ["时间配置命令" , 7, 3], 0x57: ["时间配置命令" , 7, 3],
0x60: ["整机运行使能", 1], 0x60: ["整机运行使能", 1],
0x61: ["最小启动允许输入电压", 2], 0x61: ["最小启动允许输入电压", 2, 10],
0x62: ["最大启动允许输入电压", 2], 0x62: ["最大启动允许输入电压", 2, 10],
0x63: ["最小停机输入电压", 2], 0x63: ["最小停机输入电压", 2, 10],
0x64: ["最大停机输入电压", 2], 0x64: ["最大停机输入电压", 2, 10],
0x65: ["最小启动允许输出电压", 2], 0x65: ["最小启动允许输出电压", 2, 10],
0x66: ["最大启动允许输出电压", 2], 0x66: ["最大启动允许输出电压", 2, 10],
0x67: ["最小停止允许输出电压", 2], 0x67: ["最小停止允许输出电压", 2, 10],
0x68: ["最大停止允许输出电压", 2], 0x68: ["最大停止允许输出电压", 2, 10],
0x69: ["最小MPPT电流限值", 2], 0x69: ["最小MPPT电流限值", 2, 100],
0x6A: ["最大MPPT电流限值", 2], 0x6A: ["最大MPPT电流限值", 2, 100],
0x6B: ["保留数据项", 2], 0x6B: ["保留数据项", 2],
0x6C: ["最大功率限值", 3], 0x6C: ["最大功率限值", 3, 1000],
0x6E: ["最大功率限值存储值", 3], 0x6E: ["最大功率限值存储值", 3, 1000],
0x70: ["Boost输入过压保护值", 2], 0x70: ["Boost输入过压保护值", 2, 10],
0x71: ["Boost输出过压保护值", 2], 0x71: ["Boost输出过压保护值", 2, 10],
0x72: ["LLC输出过压保护值", 2], 0x72: ["LLC输出过压保护值", 2, 10],
0x73: ["LLC输出欠压保护值", 2], 0x73: ["LLC输出欠压保护值", 2, 10],
0x74: ["Boost电感过流保护值", 2], 0x74: ["Boost电感过流保护值", 2, 100],
0x75: ["LLC输出电流均值保护值", 2], 0x75: ["LLC输出电流均值保护值", 2, 100],
0x76: ["LLC输出电流峰值保护值", 2], 0x76: ["LLC输出电流峰值保护值", 2, 100],
0x77: ["保留数据项", 2], 0x77: ["保留数据项", 2],
0x78: ["过载保护值", 3], 0x78: ["过载保护值", 3, 1000],
0x7A: ["过温故障值", 2], 0x7A: ["过温故障值", 2, 10],
0x7B: ["过温告警值", 2], 0x7B: ["过温告警值", 2, 10],
0x7C: ["过温恢复值", 2], 0x7C: ["过温恢复值", 2, 10],
0x7D: ["输出继电器故障判断差值", 2], 0x7D: ["输出继电器故障判断差值", 2, 10],
0x7E: ["LLC输出电压给定值", 2], 0x7E: ["LLC输出电压给定值", 2, 10],
0x7F: ["Boost输出电压给定值", 2], 0x7F: ["Boost输出电压给定值", 2, 10],
0x80: ["三点法中间阈值", 2], 0x80: ["三点法中间阈值", 2, 10],
0x81: ["浮充电压", 2], 0x81: ["浮充电压", 2, 10],
0x82: ["恒压充电电压", 2], 0x82: ["恒压充电电压", 2, 10],
0x83: ["llc软起开始电压", 2], 0x83: ["llc软起开始电压", 2, 10],
0x84: ["boost开始运行电压", 2], 0x84: ["boost开始运行电压", 2, 10],
0x85: ["boost停止运行电压", 2], 0x85: ["boost停止运行电压", 2, 10],
0x86: ["绝缘检测正阻抗限值", 3], 0x86: ["绝缘检测正阻抗限值", 3],
0x88: ["绝缘检测负阻抗限值", 3], 0x88: ["绝缘检测负阻抗限值", 3],
0x8A: ["保留地址项", 2], 0x8A: ["抖动频率下限", 2, 10],
0x8B: ["保留地址项", 2], 0x8B: ["抖动频率上限", 2, 10],
0x8C: ["保留地址项", 2], 0x8C: ["保留地址项", 2],
0x8D: ["保留地址项", 2], 0x8D: ["保留地址项", 2],
0x8E: ["保留地址项", 2], 0x8E: ["保留地址项", 2],
@@ -118,14 +122,14 @@ class LaminaController:
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, **com_config) self.__com = Serial(com_name, timeout=0, **com_config)
else: else:
self.__com = None self.__com = None
self.flag_print = 'frame_print' in kwargs.keys() self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
self.block = { self.block = {
'addr' : addr_645, 'addr' : addr_645,
@@ -135,44 +139,75 @@ class LaminaController:
'data_define': modbus_map, 'data_define': modbus_map,
}, },
} }
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __transfer_data(self, frame): def __read_frame(self) -> bool:
""" 报文数据传输 """ """ 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytes) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None: if self.__com is None:
print(trans_list_to_str(frame)) print(trans_list_to_str(frame))
return False return False
cnt = 0 fail_count = 0
while cnt < self.retry: while fail_count < self.retry:
self.__com.read_all() frame_discard = self.__com.read_all()
self.__com.write(bytearray(frame)) self.__com.write(frame)
self.log['send'] += 1
cnt_sub = 0
frame_recv = None
while not frame_recv:
time.sleep(self.time_out)
frame_recv = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
output_text = check_frame_modbus(frame_recv, self.block['data'])
except Exception as e:
print(e)
cnt += 1
continue
print(output_text)
break
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print: if self.flag_print:
print(trans_list_to_str(frame)) print("Send Frame: ", trans_list_to_str(frame))
print(trans_list_to_str(frame_recv))
return cnt < self.retry if self.__read_frame():
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
def frame_read(self, daddr=0x60, dlen=0x30): return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
self.block['data']['type'] = 'read' self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen self.block['data']['data_len'] = dlen
@@ -180,23 +215,25 @@ class LaminaController:
return self.__transfer_data(frame) return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900): def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['data']['type'] = 'write_one' self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data']) frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600): def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['data']['type'] = 'write_dual' self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data']) frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
self.block['data']['type'] = 'write_str' self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
@@ -204,7 +241,7 @@ class LaminaController:
return self.__transfer_data(frame) return self.__transfer_data(frame)
def frame_record(self): def frame_record(self) -> bool:
""" 读取录波数据 """ 读取录波数据
""" """
self.block['data']['file_block_size'] = 240 self.block['data']['file_block_size'] = 240
@@ -213,78 +250,41 @@ class LaminaController:
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data']) frame_master = make_frame_modbus(self.block['data'])
ret, cnt, cnt_sub = True, 0, 0 ret, pbar = True, None
frame_data_cfg = [] frame_data_cfg = []
while ret: while ret:
self.__com.read_all() if ret := self.__transfer_data(frame_master):
self.__com.write(frame_master) frame_data_cfg.append(self.output['record'])
frame_slave = None if self.output['record']['seq'] == 0:
while not frame_slave: pbar = tqdm(total=self.output['record']['total'], desc="Record Config Reading")
time.sleep(0.1)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret, *block_cfg = check_frame_modbus(frame_slave, self.block['data'])
print(f"{trans_list_to_str(frame_slave[:9])}")
except Exception as ex:
print(ex)
cnt += 1
ret = False
if ret:
cnt = 0
frame_data_cfg.append(block_cfg[0])
if block_cfg[0]['seq'] == 0:
self.block['data']['step'] = 'next' self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data']) frame_master = make_frame_modbus(self.block['data'])
elif (block_cfg[0]['seq']) >= block_cfg[0]['total']: elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False ret = False
elif cnt < self.retry: pbar and pbar.update()
ret = True pbar and pbar.close()
# 读取data # 读取data
self.block['data']['type'] = 'record_data' self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data']) frame_master = make_frame_modbus(self.block['data'])
ret, cnt, cnt_sub = True, 0, 0 ret, pbar = True, None
frame_data_record = [] frame_data_record = []
while ret: while ret:
self.__com.read_all() if ret := self.__transfer_data(frame_master):
self.__com.write(frame_master) frame_data_record.append(self.output['record'])
time.sleep(0.3) if self.output['record']['seq'] == 0:
frame_slave = None pbar = tqdm(total=self.output['record']['total'], desc="Record Data Reading")
while not frame_slave:
time.sleep(0.1)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret, *block_record = check_frame_modbus(frame_slave, self.block['data'])
print(f"{trans_list_to_str(frame_slave[:9])}")
except Exception as ex:
print(ex)
cnt += 1
ret = False
if ret:
cnt = 0
frame_data_record.append(block_record[0])
if block_record[0]['seq'] == 0:
self.block['data']['step'] = 'next' self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data']) frame_master = make_frame_modbus(self.block['data'])
elif (block_record[0]['seq']) >= block_record[0]['total']: elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False ret = False
elif cnt < self.retry: pbar and pbar.update()
ret = True pbar and pbar.close()
if len(frame_data_record) == 0: if len(frame_data_record) == 0:
raise Exception("未取得录波数据.") print("Operation_Record: 未取得录波数据.")
return False
# 处理配置信息 # 处理配置信息
data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg]) data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg])
@@ -377,6 +377,9 @@ class LaminaController:
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001, (data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
} }
config_record['TriggerTime'] = time_stamp config_record['TriggerTime'] = time_stamp
self.log['record']['config'] = config_record
self.log['record']['bare_config'] = data_cfg_bare
# 处理录波数据 # 处理录波数据
data_record_bare = b"".join([x['data'] for x in frame_data_record]) data_record_bare = b"".join([x['data'] for x in frame_data_record])
data_record = [] data_record = []
@@ -407,152 +410,98 @@ class LaminaController:
pos += 2 pos += 2
data_record.append(record_point) data_record.append(record_point)
return config_record, data_record self.log['record']['data'] = data_record
self.log['record']['bare_data'] = data_record_bare
return True
def frame_update(self, path_bin): def frame_update(self, path_bin: Path, makefile: bool = False) -> bool:
""" 程序升级 """ 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
""" """
if makefile:
self.block['data']['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin)
else:
self.block['data']['file'] = path_bin.read_bytes()
self.block['data']['header_offset'] = 184
self.block['data']['type'] = 'update' self.block['data']['type'] = 'update'
param_saved = self.flag_print, self.retry, self.time_out
self.retry = 5
self.time_out = 0.5
self.flag_print = False
# 启动帧
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
self.block['data']['index'] = 0 self.block['data']['index'] = 0
self.block['data']['file'] = Path(path_bin).read_bytes() frame_master = make_frame_modbus(self.block['data'])
self.block['data']['header_offset'] = 184 if not self.__transfer_data(frame_master):
# 启动帧 self.flag_print, self.retry, self.time_out = param_saved
frame_master = bytearray(make_frame_modbus(self.block['data'])) print('Upgrade Fail: start')
return False
# 等待擦除完成返回 self.block["data"]['file_block_size'] = self.output['upgrade']['length']
try_times = 30
self.__com.read_all()
while try_times:
time.sleep(self.time_out)
self.__com.write(frame_master)
frame_slave = self.__com.read_all()
if not frame_slave:
try_times -= 1
continue
try:
ret, _, self.block["data"]['file_block_size'] = check_frame_modbus(frame_slave, self.block['data'])
break
except Exception as e:
print(e)
if self.block["data"]['file_block_size'] == 0:
raise Exception("Error slave response.")
# 避免接收到延迟返回报文 # 避免接收到延迟返回报文
time.sleep(self.time_out) time.sleep(self.time_out)
# 文件传输 # 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans' self.block["data"]['step'] = 'trans'
data_remain = len(self.block["data"]['file']) - self.block['data']['header_offset'] self.block['data']['index'] = 0
while data_remain > 0: frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
frame_master = bytearray(make_frame_modbus(self.block['data'])) for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
cnt = 0 frame_master = make_frame_modbus(self.block['data'])
while cnt < self.retry: if not self.__transfer_data(frame_master):
self.__com.read_all() self.flag_print, self.retry, self.time_out = param_saved
self.__com.write(frame_master) print(f'Upgrade Fail: trans data in {idx}')
return False
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_modbus(frame_slave, self.block['data'])
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
self.block["data"]['index'] += 1
data_remain -= self.block["data"]['file_block_size']
# 结束升级 # 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end' self.block["data"]['step'] = 'end'
frame_master = bytearray(make_frame_modbus(self.block['data'])) self.block["data"]['index'] += 1
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
cnt = 0 self.flag_print, self.retry, self.time_out = param_saved
while cnt < self.retry: return True
self.__com.read_all()
self.__com.write(frame_master)
cnt_sub = 0
frame_slave = None
while not frame_slave:
time.sleep(self.time_out)
frame_slave = self.__com.read_all()
cnt_sub += 1
if cnt_sub >= self.retry_sub:
break
try:
ret = check_frame_modbus(frame_slave, self.block['data'])
except Exception as e:
print(e)
ret = False, 0, 0
if ret[0]:
break
else:
cnt += 1
if cnt >= self.retry:
raise Exception("Error, Retry failed.")
def test_communication(time_out=2): def test_communication(time_out=2):
""" 通信成功率测试 """ """ 通信成功率测试 """
log_success = 0
log_failed = 0
log_failedseries = 0
cnt_failedseries = 0
time_start = time.time() time_start = time.time()
saveconfig_print = dev_lamina.flag_print param_saved = dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out
dev_lamina.flag_print = False dev_lamina.flag_print = False
dev_lamina.retry = 1
try: try:
while 1: while True:
if dev_lamina.frame_read(0x0C, 0x20): dev_lamina.frame_read(0x0C, 0x20)
log_success += 1
cnt_failedseries = 0
else:
log_failed += 1
cnt_failedseries += 1
if log_failedseries <= cnt_failedseries:
log_failedseries = cnt_failedseries
print(f"Time Stamp: {time.ctime()}") print(f"Time Stamp: {time.ctime()}")
print(f"Success Frame: {log_success}") print(f"Success Frame: {dev_lamina.log['read']}")
print(f"Failed Frame: {log_failed}") print(f"Failed Frame: {dev_lamina.log['send'] - dev_lamina.log['read']}")
print(f"Max Series Failed Frame: {log_failedseries}") print(f"Max Series Failed Frame: {dev_lamina.log['keep-fail']}")
time.sleep(time_out) time.sleep(time_out)
finally: finally:
time_end = time.time() time_end = time.time()
print("Test Result: ") print("Test Result: ")
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}") print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
print(f"Time Elapsed: {time_end - time_start}") print(f"Time Elapsed: {time_end - time_start}")
print(f"Success Rate: {log_success / (log_success + log_failed) * 100}%") print(f"Success Rate: {dev_lamina.log['read'] / dev_lamina.log['send'] * 100}%")
dev_lamina.flag_print = saveconfig_print dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out = param_saved
def test_record(path_CFG=None, path_data=None): def test_record(path_CFG=None, path_data=None):
""" 执行录波数据读取流程 """ """ 执行录波数据读取流程 """
if path_CFG is None: path_CFG = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\record") / f"record_{'-'.join(time.ctime().replace(':', '').split(' '))}.cfg" if path_CFG is None: path_CFG = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\record") / f"record_{'-'.join(time.ctime().replace(':', '').split(' '))}.cfg"
if path_data is None: path_data = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\record") / f"record_{'-'.join(time.ctime().replace(':', '').split(' '))}.dat" if path_data is None: path_data = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\record") / f"record_{'-'.join(time.ctime().replace(':', '').split(' '))}.dat"
config, data = dev_lamina.frame_record() if not dev_lamina.frame_record():
return
config = dev_lamina.log['record']['config']
data = dev_lamina.log['record']['data']
# Header File # Header File
@@ -593,6 +542,9 @@ def test_record(path_CFG=None, path_data=None):
text_record += line_record + "\r\n" text_record += line_record + "\r\n"
path_data.write_text(text_record) path_data.write_text(text_record)
print(f"Saved Path:")
print(f"\tconfig: {path_CFG}")
print(f"\tdata: {path_data}")
def make_Image(): def make_Image():
""" 叠光控制器DSP镜像与升级包生成流程 """ """ 叠光控制器DSP镜像与升级包生成流程 """
@@ -638,7 +590,6 @@ def make_Image():
data_hex = file_Bin_to_IntelHex(data_image, 0x80000, memory_width=2) data_hex = file_Bin_to_IntelHex(data_image, 0x80000, memory_width=2)
file_image3.write_text(data_hex) file_image3.write_text(data_hex)
def make_Pakeage(fp: Path): def make_Pakeage(fp: Path):
""" 生成升级包 """ """ 生成升级包 """
@@ -743,12 +694,14 @@ if __name__=='__main__':
"Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, "Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True, 'frame_print': True,
'time_out': 0.2, 'retry': 3, 'retry_sub': 10}, 'time_out': 0.5, 'retry': 3},
} }
dev_lamina = LaminaController(**mode_config['Debug']) dev_lamina = LaminaController(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_read(0x0A, 0x20)
# dev_lamina.frame_read(0x60, 0x60)
if not hasattr(__builtins__,"__IPYTHON__") and 0: # if not hasattr(__builtins__,"__IPYTHON__") and 0: #
@@ -770,15 +723,10 @@ if __name__=='__main__':
file_package = make_Pakeage(file_hex) file_package = make_Pakeage(file_hex)
dev_lamina.frame_write_one(0x60, 0) dev_lamina.frame_write_one(0x60, 0)
time.sleep(1) time.sleep(1)
if dev_lamina.frame_update(file_package):
dev_lamina.frame_update(file_package)
time.sleep(6) time.sleep(6)
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_write_one(0x52, 0x01) # dev_lamina.frame_write_one(0x52, 0x01)