重构代码.

This commit is contained in:
何 泽隆
2024-10-25 09:32:53 +08:00
parent b6aa0e8b75
commit eeb476a538
15 changed files with 1657 additions and 1521 deletions

View File

@@ -1,302 +1,11 @@
import time import time
from math import ceil
from tqdm import tqdm
from pathlib import Path from pathlib import Path
from serial import Serial from device.LaminaAdapter import LaminaAdapter
from tools.ByteConv import trans_list_to_str from source.device.LaminaAdapter import GeneratePackage_SLCP101_p460
from func_frame import make_frame_dlt645, check_frame_dlt645, print_display from source.device.LaminaAdapter import GenerateImage_SLCP101_p460
from func_upgrade import GeneratePackage_SLCP101_p460 from source.device.LaminaAdapter import GeneratePackage_DLSY001_p460
from source.device.LaminaAdapter import GenerateImage_DLSY001_p460
modbus_map = { from function.tools.ByteConv import trans_list_to_str
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
0x11: ["系统工作状态", 1],
0x12: ["系统工作模式", 1],
0x13: ["输入电压", 2],
0x14: ["电感电流", 2],
0x15: ["12V电压", 2],
0x16: ["输出电压", 2],
0x17: ["输入电流", 2],
0x18: ["温度1", 2],
0x19: ["温度2", 2],
0x1A: ["输入功率", 3],
0x1C: ["设备温度", 2],
0x1D: ["开关机状态", 1],
0x1E: ["电池电压", 2],
0x1F: ["并机功率限值", 3],
0x60: ["光伏通道使能", 1],
0x61: ["最小启动输入电压", 2],
0x62: ["最大启动输入电压", 2],
0x63: ["最小停止输入电压", 2],
0x64: ["最大停止输入电压", 2],
0x65: ["最小MPPT电压", 2],
0x66: ["最大MPPT电压", 2],
0x67: ["最小启动输出电压", 2],
0x68: ["最大启动输出电压", 2],
0x69: ["最小停止输出电压", 2],
0x6A: ["最大停止输出电压", 2],
0x6B: ["输入过压保护值", 2],
0x6C: ["输出过压保护值", 2],
0x6D: ["输出欠压保护值", 2],
0x6E: ["电感过流保护值", 2],
0x6F: ["输入过流保护值", 2],
0x70: ["最小电感电流限值", 2],
0x71: ["最大电感电流限值", 2],
0x72: ["浮充电压阈值", 2],
0x73: ["三点法中间阈值", 2],
0x74: ["恒压充电电压", 2],
0x75: ["过温故障值", 2],
0x76: ["过温告警值", 2],
0x77: ["温度恢复值", 2],
0x78: ["最低满载电压", 2],
0x79: ["最高满载电压", 2],
0x7A: ["输入过载保护值", 3],
0x7C: ["最小功率限值", 3],
0x7E: ["最大功率限值", 3],
0x80: ["最大功率限值存储值", 3],
0x82: ["载波通信地址", 5, 3],
0x85: ["电压环out_max", 2],
0x86: ["电压环out_min", 2],
0x87: ["电流环out_max", 2],
0x88: ["电流环out_min", 2],
0x89: ["MPPT扰动系数k_d_vin", 2],
0x8A: ["dmin", 2],
0x8B: ["dmax", 2],
0x8C: ["扫描电压偏移scanvolt_offset", 2],
0x8D: ["电压环Kp", 3],
0x8F: ["电压环Ki", 3],
0x91: ["电流环Kp", 3],
0x93: ["电流环Ki", 3],
0x95: ["日志级别", 1],
0x96: ["日志输出方式", 1],
0x97: ["采样校准volt_in_a", 2],
0x98: ["采样校准volt_in_b", 2],
0x99: ["采样校准volt_out_a", 2],
0x9A: ["采样校准volt_out_b", 2],
0x9B: ["采样校准curr_in_a", 2],
0x9C: ["采样校准curr_in_b", 2],
0x9D: ["采样校准curr_induc_a", 2],
0x9E: ["采样校准curr_induc_b", 2],
0x9F: ["采样校准volt_12V_a", 2],
0xA0: ["采样校准volt_12V_b", 2],
0xA1: ["温度补偿temp1_b", 2],
0xA2: ["温度补偿temp2_b", 2],
0xA3: ["系统工作模式", 2],
0xA4: ["电感电流给定值curr_set", 2],
0xA5: ["抖动频率上限", 2],
0xA6: ["抖动频率下限", 2],
0xA7: ["电池电压判断限值", 2],
0xA8: ["MPPT追踪模式", 1],
0xA9: ["ADC参考电压", 2],
0xAA: ["保留", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
0xAE: ["保留", 1],
0xAF: ["保留", 1],
0x100: ["版本", 4, 16],
0x110: ["型号", 4, 16],
0x120: ["载波芯片地址", 4, 16],
0x130: ["厂商", 4, 8],
0x138: ["保留", 4, 8],
0x140: ["保留", 4, 16],
0x150: ["保留", 4, 16],
0x160: ["硬件", 4, 16],
0x170: ["SN", 4, 16],
0x180: ["MES", 4, 16],
0x190: ["Datetime", 4, 16],
}
class LaminaAdapter:
""" 叠光适配器\优化器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, **com_config)
else:
self.__com =None
self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
self.block = {
'addr' : addr_645,
'type' : 'modbus',
'data' : {
'addr_dev' : addr_modbus,
'data_define': modbus_map,
},
}
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_dlt645(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 报文数据传输 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
if self.__read_frame():
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x50):
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900):
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600):
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_update(self, path_bin):
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
param_saved = self.flag_print, self.retry, self.time_out
self.block['data']['type'] = 'update'
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
self.block['data']['file'] = Path(path_bin).read_bytes()
self.block['data']['header_offset'] = 184
# 启动帧
frame_master = bytearray(make_frame_dlt645(self.block))
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
return False
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
self.flag_print, self.retry, self.time_out = param_saved
return True
def test_communication(time_out=2): def test_communication(time_out=2):
""" 通信成功率测试 """ """ 通信成功率测试 """
@@ -336,6 +45,97 @@ def make_Pakeage(fp: Path):
return file_package return file_package
def Process1():
""" 适配器镜像生成流程 """
root = Path(r"test\p460")
result = Path(r"test\p460\result")
# 正常启动镜像
hex_boot = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR\bootloader.hex")
hex_main = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
hex_back = root / r"lamina_adapter_back.hex"
hex_update = hex_main
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'SLCP101_header_main.bin'
file_back_header = result / 'SLCP101_header_back.bin'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_SLCP101_p460(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_SLCP101_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def Process2():
""" 优化镜像生成流程 """
root = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug")
# result = Path(r"test\p460_o1\result")
result = root
# 正常启动镜像
hex_boot = Path(r"test\p460_o1\bootloader.hex")
hex_main = root / r"lamina_optimizer.hex"
hex_back = root / r"lamina_optimizer.hex"
hex_update = root / r"lamina_optimizer.hex"
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'DLSY001_header_main.bin'
file_back_header = result / 'DLSY001_header_back.bin'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_DLSY001_p460(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_DLSY001_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def test(): def test():
if 0: if 0:
dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压 dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压
@@ -429,17 +229,20 @@ if __name__=='__main__':
"Log": {'com_name': None, "Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40], # 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
}, },
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, "Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True, 'frame_print': True,
'time_out': 0.1, 'retry': 1, 'retry_sub': 10}, 'time_out': 0.1, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, "HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x33, 0x00, 0x10, 0x03, 0x09, 0x24],
'frame_print': True, 'frame_print': True,
'time_out': 0.5, 'retry': 3, 'retry_sub': 10}, 'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10},
} }
dev_lamina = LaminaAdapter(**mode_config['Debug']) Process1()
exit(0)
dev_lamina = LaminaAdapter(**mode_config['HPLC'])
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
@@ -457,7 +260,7 @@ if __name__=='__main__':
while True: while True:
""" 自动检测升级流程(需要调试) """ """ 自动检测升级流程(需要调试) """
ret = False ret = False
while not ret or ('Reg' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Reg'][0x0100][1]): while not ret or ('Regs' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Regs'][0x0100][1]):
dev_lamina.frame_read(0x82, 3) dev_lamina.frame_read(0x82, 3)
ret = dev_lamina.frame_read(0x0100, 0x20) ret = dev_lamina.frame_read(0x0100, 0x20)
time.sleep(1) time.sleep(1)
@@ -468,7 +271,7 @@ if __name__=='__main__':
ret = dev_lamina.frame_read(0x0100, 0x20) ret = dev_lamina.frame_read(0x0100, 0x20)
if ret and (version == dev_lamina.output['Reg'][0x0100][1]): if ret and (version == dev_lamina.output['Regs'][0x0100][1]):
dev_lamina.frame_write_one(0x52, 0x01) dev_lamina.frame_write_one(0x52, 0x01)
print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}") print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}")
@@ -478,4 +281,3 @@ if __name__=='__main__':
if addr[5] & 0x0F >= 10: if addr[5] & 0x0F >= 10:
addr[5] += 0x10 - 10 addr[5] += 0x10 - 10

View File

@@ -1,484 +1,12 @@
import time import time
from math import ceil
from tqdm import tqdm
from pathlib import Path from pathlib import Path
from serial import Serial
from datetime import datetime from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed from tenacity import retry, stop_after_attempt, wait_fixed
from tools.ByteConv import trans_list_to_str from device.LaminaController import LaminaController
from device.LaminaController import GeneratePackage_DLSP001_p280039
from device.LaminaController import GenerateImage_DLSP001_p280039
from function.tools.ByteConv import trans_list_to_str
from tools.IntelHex import file_Bin_to_IntelHex from tools.IntelHex import file_Bin_to_IntelHex
from func_frame import make_frame_modbus, check_frame_modbus, print_display
from func_upgrade import GenerateImage_DLSP001_p280039, GeneratePackage_DLSP001_p280039
ParamMap_LaminaController = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
# 7 - numberList
0x0B: ["事件标志", 1],
0x0C: ["告警字1", 1],
0x0D: ["告警字2", 1],
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["系统工作状态" , 1],
0x11: ["Boost1工作状态" , 1],
0x12: ["Boost2工作状态" , 1],
0x13: ["开关机状态" , 1],
0x14: ["光伏组串1输入电压" , 2, 10],
0x15: ["光伏组串2输入电压" , 2, 10],
0x16: ["Boost1电感电流" , 2, 100],
0x17: ["Boost2电感电流" , 2, 100],
0x18: ["Boost输出电压" , 2, 10],
0x19: ["Boost输出总电流" , 2, 100],
0x1A: ["Boost1功率" , 3, 1000],
0x1C: ["Boost2功率" , 3, 1000],
0x1E: ["输入总功率" , 3, 1000],
0x20: ["LLC输出电压" , 2, 10],
0x21: ["端口输出电压" , 2, 10],
0x22: ["LLC输出电流均值" , 2, 100],
0x23: ["LLC输出电流峰值" , 2, 100],
0x24: ["绝缘检测电压" , 2, 10],
0x25: ["散热片温度" , 2, 10],
0x26: ["腔体1温度" , 2, 10],
0x27: ["腔体2温度" , 2, 10],
0x28: ["设备温度" , 2, 10],
0x50: ["启停控制命令" , 2],
0x51: ["故障清除命令" , 2],
0x52: ["参数还原命令" , 2],
0x53: ["设备复位命令" , 2],
0x54: ["模式更改命令" , 2],
0x55: ["短时停机命令(未启用)" , 2],
0x56: ["手动录波命令" , 2],
0x57: ["时间配置命令" , 7, 3],
0x60: ["整机运行使能", 1],
0x61: ["最小启动允许输入电压", 2, 10],
0x62: ["最大启动允许输入电压", 2, 10],
0x63: ["最小停机输入电压", 2, 10],
0x64: ["最大停机输入电压", 2, 10],
0x65: ["最小启动允许输出电压", 2, 10],
0x66: ["最大启动允许输出电压", 2, 10],
0x67: ["最小停止允许输出电压", 2, 10],
0x68: ["最大停止允许输出电压", 2, 10],
0x69: ["最小MPPT电流限值", 2, 100],
0x6A: ["最大MPPT电流限值", 2, 100],
0x6B: ["保留数据项", 2],
0x6C: ["最大功率限值", 3, 1000],
0x6E: ["最大功率限值存储值", 3, 1000],
0x70: ["Boost输入过压保护值", 2, 10],
0x71: ["Boost输出过压保护值", 2, 10],
0x72: ["LLC输出过压保护值", 2, 10],
0x73: ["LLC输出欠压保护值", 2, 10],
0x74: ["Boost电感过流保护值", 2, 100],
0x75: ["LLC输出电流均值保护值", 2, 100],
0x76: ["LLC输出电流峰值保护值", 2, 100],
0x77: ["保留数据项", 2],
0x78: ["过载保护值", 3, 1000],
0x7A: ["过温故障值", 2, 10],
0x7B: ["过温告警值", 2, 10],
0x7C: ["过温恢复值", 2, 10],
0x7D: ["输出继电器故障判断差值", 2, 10],
0x7E: ["LLC输出电压给定值", 2, 10],
0x7F: ["Boost输出电压给定值", 2, 10],
0x80: ["三点法中间阈值", 2, 10],
0x81: ["浮充电压", 2, 10],
0x82: ["恒压充电电压", 2, 10],
0x83: ["llc软起开始电压", 2, 10],
0x84: ["boost开始运行电压", 2, 10],
0x85: ["boost停止运行电压", 2, 10],
0x86: ["绝缘检测正阻抗限值", 3, 1],
0x88: ["绝缘检测负阻抗限值", 3, 1],
0x8A: ["抖动频率下限", 2, 10],
0x8B: ["抖动频率上限", 2, 10],
0x8C: ["保留地址项", 2],
0x8D: ["保留地址项", 2],
0x8E: ["保留地址项", 2],
0x8F: ["保留地址项", 2],
0x100: ["程序版本字符串", 4, 16],
0x110: ["设备型号字符串", 4, 16],
0x120: ["保留地址项", 4, 16],
0x130: ["生产厂家字符串", 4, 8],
0x138: ["保留地址项", 4, 8],
0x140: ["保留地址项", 4, 16],
0x150: ["保留地址项", 4, 16],
0x160: ["硬件版本字符串", 4, 16],
0x170: ["设备序列号", 4, 16],
0x180: ["设备MES码", 4, 16],
0x190: ["出厂日期批次", 4, 16],
}
class LaminaController:
""" 叠光控制器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, timeout=0, **com_config)
else:
self.__com = None
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.block = {
'addr' : addr_645,
'type' : 'modbus',
'data' : {
'addr_dev' : addr_modbus,
'data_define': ParamMap_LaminaController,
},
}
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
time.sleep(10 * self.time_gap)
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_record(self) -> bool:
""" 读取录波数据
"""
param_saved = self.flag_print, self.retry, self.time_out
self.flag_print = False
self.retry = 3
self.time_out = 1.5
self.block['data']['file_block_size'] = 240
# 读取config
self.block['data']['type'] = 'record_cfg'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_cfg = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_cfg.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
# 读取data
self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_record = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_record.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
self.flag_print, self.retry, self.time_out = param_saved
if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3):
print("Operation_Record: 未取得录波数据.")
return False
# 处理配置信息
data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg])
config_record = {'LinePos': []}
pos = 0
if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2])
pos += 2
config_record['FaultWord'] = []
for i in range(0, len_faultword, 2):
faultword = data_cfg_bare[pos+i+1] * 0x100 + data_cfg_bare[pos+i]
config_record['FaultWord'].append(faultword)
pos += len_faultword
config_record['SysStatus'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultRecordPosition'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Alg'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelDescription'] = []
config_record['ChannelCoefficient'] = []
for i in range(config_record['ChannelNum']):
len_str = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
ch_desc = []
for j in range(0, len_str, 2):
ch_desc.append(data_cfg_bare[pos + j])
pos += len_str
ch_coe = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChannelDescription'].append(bytearray(ch_desc).decode())
config_record['ChannelCoefficient'].append(ch_coe)
if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FreqNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SampleFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SamplePoint'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['DataType'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['StartTime'] = time_stamp
pos += 14
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['TriggerTime'] = time_stamp
self.log['record']['config'] = config_record
self.log['record']['bare_config'] = data_cfg_bare
# 处理录波数据
data_record_bare = b"".join([x['data'] for x in frame_data_record])
data_record = []
pos = 0
while pos < len(data_record_bare):
record_point = {
'index': data_record_bare[pos+3] * 0x1000 +
data_record_bare[pos+2] * 0x100 +
data_record_bare[pos+1] * 0x10 +
data_record_bare[pos],
'timestamp': data_record_bare[pos+7] * 0x1000 +
data_record_bare[pos+6] * 0x100 +
data_record_bare[pos+5] * 0x10 +
data_record_bare[pos + 4],
'ChAlg': [],
'ChDgt': [],
}
pos += 8
for i in range(config_record['ChNum_Alg']):
point_data_alg = data_record_bare[pos+1] * 0x100 + data_record_bare[pos]
if data_record_bare[pos+1] & 0x80:
point_data_alg -= 0x10000
record_point['ChAlg'].append(point_data_alg)
pos += 2
for i in range(config_record['ChNum_Dgt']):
point_data_dgt = (data_record_bare[pos+(i // 8)] & (0x01 << (i % 8))) == (0x01 << (i % 8))
record_point['ChDgt'].append(point_data_dgt)
pos += 2
data_record.append(record_point)
self.log['record']['data'] = data_record
self.log['record']['bare_data'] = data_record_bare
return True
def frame_update(self, path_bin: Path, makefile: bool = False) -> bool:
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
if makefile:
self.block['data']['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin)
else:
self.block['data']['file'] = path_bin.read_bytes()
self.block['data']['header_offset'] = 184
self.block['data']['type'] = 'update'
param_saved = self.flag_print, self.retry, self.time_out
self.retry = 5
self.time_out = 0.5
self.flag_print = False
# 启动帧
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
return False
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
self.flag_print, self.retry, self.time_out = param_saved
return True
def test_communication(time_out=2): def test_communication(time_out=2):
""" 通信成功率测试 """ """ 通信成功率测试 """
@@ -555,6 +83,7 @@ def test_record(path_CFG=None, path_data=None):
print(f"\tconfig: {path_CFG}") print(f"\tconfig: {path_CFG}")
print(f"\tdata: {path_data}") print(f"\tdata: {path_data}")
def make_Image(): def make_Image():
""" 叠光控制器DSP镜像与升级包生成流程 """ """ 叠光控制器DSP镜像与升级包生成流程 """
root = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039") root = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039")
@@ -599,6 +128,7 @@ def make_Image():
data_hex = file_Bin_to_IntelHex(data_image, 0x80000, memory_width=2) data_hex = file_Bin_to_IntelHex(data_image, 0x80000, memory_width=2)
file_image3.write_text(data_hex) file_image3.write_text(data_hex)
def make_Pakeage(fp: Path): def make_Pakeage(fp: Path):
""" 生成升级包 """ """ 生成升级包 """
@@ -700,7 +230,7 @@ if __name__=='__main__':
"Log": {'com_name': None, "Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40], # 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
}, },
"Debug": {'com_name': 'COM5', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, "Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True, 'frame_print': True,
'time_out': 0.5, 'retry': 3}, 'time_out': 0.5, 'retry': 3},

View File

@@ -0,0 +1,111 @@
import time
from serial import Serial
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.frame import make_frame_modbus, check_frame_modbus, print_display
class DeviceSerial:
""" 串口通信设备原型
"""
def __init__(self, com_name="COM1", **kwargs):
""" 初始化设备 """
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, timeout=0, **com_config)
else:
self.__com = None
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
time.sleep(10 * self.time_gap)
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry

View File

@@ -1,11 +1,13 @@
import time import time
import socket import socket
import random import random
import hashlib
from pathlib import Path from pathlib import Path
from tools.ByteConv import trans_list_to_str from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from source.func_frame import make_frame_modbus, check_frame_modbus, print_display from function.frame import make_frame_modbus, check_frame_modbus, print_display
from function.file_upgrade import build_header, file_encryption
modbus_map = { ParamMap_EnergyRouter = {
0x00: ['编译日期', 4, 6], 0x00: ['编译日期', 4, 6],
0x06: ['编译时间', 4, 5], 0x06: ['编译时间', 4, 5],
} }
@@ -25,7 +27,7 @@ class EnergyRouter:
self.block = { self.block = {
'addr_dev' : adddr_modbus, 'addr_dev' : adddr_modbus,
'data_define': modbus_map, 'data_define': ParamMap_EnergyRouter,
} }
self.output = { self.output = {
'result': False, 'result': False,
@@ -160,6 +162,51 @@ class EnergyRouter:
self.output = check_frame_modbus(frame_slave[:18], self.block) self.output = check_frame_modbus(frame_slave[:18], self.block)
def GeneratePackage_Demo_Xilinx(path_bin: Path):
""" 完整升级包生成测试 """
config = {
'file_type': [0x10, 0x01], # Xilinx-Demo 自机升级文件
'file_version': [0x00, 0x00], # 文件版本-00 用于兼容文件格式升级
# 'file_length': [], # 文件长度(自动生成)
# 'md5': [], # 文件MD5(自动生成)
'encrypt': [0x01], # 默认加密算法
'update_type': [0x01], # APP升级
'update_spec': [0x00, 0x00, 0x00, 0x00], # 升级特征字
'update_verison': [0x02, 0x00, 0x00, 0x01], # 升级版本号
'update_date': [0x22, 0x04, 0x24], # 升级版本日期
# 'area_code': [], # 省份特征
# 'uptate_str': [], # 升级段描述
# 'device_str': [], # 设备特征描述
# 'hex_name': [], # Hex文件名(自动读取)
# 文件Hex结构信息
# 'flash_addr': 0x3E8020, # 程序起始地址
# 'flash_size': 0x005FC0, # 程序空间大小
}
data_bin = path_bin.read_bytes()
md5_ctx = hashlib.md5()
md5_ctx.update(data_bin)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(data_bin))
config['hex_name'] = list(path_bin.name.encode())[:80]
if (header:= build_header(config, 128)) is None:
raise Exception("Header tag oversize. ")
if (header_512:= build_header(config, 512)) is None:
raise Exception("Header tag oversize. ")
data_encrypt = file_encryption(data_bin)
print("Upgrade file generated successfully.")
print(f"\t header_length={len(header)}, bin_length={len(data_bin)}[{hex(len(data_bin))}]")
print(f"\t file md5: {trans_list_to_str(config['md5'])}")
file1 = path_bin.parent / (path_bin.stem + '.dat')
file1.write_bytes(header + data_bin)
file2 = path_bin.parent / (path_bin.stem + '_h512.dat')
file2.write_bytes(header_512 + data_bin)
if __name__ == "__main__": if __name__ == "__main__":
""" 升级测试 """ """ 升级测试 """
path_file = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.dat") path_file = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.dat")

View File

@@ -0,0 +1,541 @@
import time
import hashlib
from math import ceil
from tqdm import tqdm
from pathlib import Path
from serial import Serial
from function.file_upgrade import build_header_new, file_encryption
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.tools.IntelHex import file_IntelHex_to_Bin
from function.frame import check_frame_dlt645, make_frame_dlt645, print_display
ParamMap_LaminaAdapter = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
0x11: ["系统工作状态", 1],
0x12: ["系统工作模式", 1],
0x13: ["输入电压", 2, 10],
0x14: ["电感电流", 2, 100],
0x15: ["12V电压", 2, 10],
0x16: ["输出电压", 2, 10],
0x17: ["输入电流", 2, 100],
0x18: ["温度1", 2, 10],
0x19: ["温度2", 2, 10],
0x1A: ["输入功率", 3, 1000],
0x1C: ["设备温度", 2, 10],
0x1D: ["开关机状态", 1],
0x1E: ["电池电压", 2, 10],
0x1F: ["并机功率限值", 3, 1000],
0x21: ["输出电容电电压", 2, 10],
0x22: ["参考电压", 2, 10],
0x60: ["光伏通道使能", 1],
0x61: ["最小启动输入电压", 2, 10],
0x62: ["最大启动输入电压", 2, 10],
0x63: ["最小停止输入电压", 2, 10],
0x64: ["最大停止输入电压", 2, 10],
0x65: ["最小MPPT电压", 2, 10],
0x66: ["最大MPPT电压", 2, 10],
0x67: ["最小启动输出电压", 2, 10],
0x68: ["最大启动输出电压", 2, 10],
0x69: ["最小停止输出电压", 2, 10],
0x6A: ["最大停止输出电压", 2, 10],
0x6B: ["输入过压保护值", 2, 10],
0x6C: ["输出过压保护值", 2, 10],
0x6D: ["输出欠压保护值", 2, 10],
0x6E: ["电感过流保护值", 2, 100],
0x6F: ["输入过流保护值", 2, 100],
0x70: ["最小电感电流限值", 2, 100],
0x71: ["最大电感电流限值", 2, 100],
0x72: ["浮充电压阈值", 2, 10],
0x73: ["三点法中间阈值", 2, 10],
0x74: ["恒压充电电压", 2, 10],
0x75: ["过温故障值", 2, 10],
0x76: ["过温告警值", 2, 10],
0x77: ["温度恢复值", 2, 10],
0x78: ["最低满载电压", 2, 10],
0x79: ["最高满载电压", 2, 10],
0x7A: ["输入过载保护值", 3, 1000],
0x7C: ["最小功率限值", 3, 1000],
0x7E: ["最大功率限值", 3, 1000],
0x80: ["最大功率限值存储值", 3, 1000],
0x82: ["载波通信地址", 5, 3],
0x85: ["电压环out_max", 2, 100],
0x86: ["电压环out_min", 2, 100],
0x87: ["电流环out_max", 2, 100],
0x88: ["电流环out_min", 2, 100],
0x89: ["MPPT扰动系数k_d_vin", 2, 100],
0x8A: ["dmin", 2, 1000],
0x8B: ["dmax", 2, 1000],
0x8C: ["扫描电压偏移scanvolt_offset", 2, 10],
0x8D: ["电压环Kp", 3, 100000],
0x8F: ["电压环Ki", 3, 100000],
0x91: ["电流环Kp", 3, 100000],
0x93: ["电流环Ki", 3, 100000],
0x95: ["日志级别", 1],
0x96: ["日志输出方式", 1],
0x97: ["采样校准volt_in_a", 2, 1000],
0x98: ["采样校准volt_in_b", 2, 100],
0x99: ["采样校准volt_out_a", 2, 1000],
0x9A: ["采样校准volt_out_b", 2, 100],
0x9B: ["采样校准curr_in_a", 2, 1000],
0x9C: ["采样校准curr_in_b", 2, 100],
0x9D: ["采样校准curr_induc_a", 2, 1000],
0x9E: ["采样校准curr_induc_b", 2, 100],
0x9F: ["采样校准volt_12V_a", 2, 1000],
0xA0: ["采样校准volt_12V_b", 2, 100],
0xA1: ["温度补偿temp1_b", 2, 10],
0xA2: ["温度补偿temp2_b", 2, 10],
0xA3: ["系统工作模式", 2],
0xA4: ["电感电流给定值curr_set", 2, 100],
0xA5: ["抖动频率上限", 2, 100],
0xA6: ["抖动频率下限", 2, 100],
0xA7: ["电池电压判断限值", 2, 10],
0xA8: ["MPPT追踪模式", 1],
0xA9: ["ADC参考电压", 2, 1000],
0xAA: ["保留", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
0xAE: ["保留", 1],
0xAF: ["保留", 1],
0x100: ["版本", 4, 16],
0x110: ["型号", 4, 16],
0x120: ["载波芯片地址", 4, 16],
0x130: ["厂商", 4, 8],
0x138: ["保留", 4, 8],
0x140: ["保留", 4, 16],
0x150: ["保留", 4, 16],
0x160: ["硬件", 4, 16],
0x170: ["SN", 4, 16],
0x180: ["MES", 4, 16],
0x190: ["Datetime", 4, 16],
}
class LaminaAdapter:
""" 叠光适配器\优化器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, **com_config)
else:
self.__com =None
self.flag_print = 'frame_print' in kwargs.keys()
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
self.block = {
'addr' : addr_645,
'type' : 'modbus',
'data' : {
'addr_dev' : addr_modbus,
'data_define': ParamMap_LaminaAdapter,
},
}
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_dlt645(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 报文数据传输 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
if self.__read_frame():
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x50):
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900):
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600):
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
return self.__transfer_data(frame)
def frame_update(self, path_bin):
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
param_saved = self.flag_print, self.retry, self.time_out
self.flag_print = False
self.block['data']['type'] = 'update'
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
self.block['data']['file'] = Path(path_bin).read_bytes()
self.block['data']['header_offset'] = 184
# 启动帧
frame_master = bytearray(make_frame_dlt645(self.block))
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
return False
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
self.flag_print, self.retry, self.time_out = param_saved
return True
def GeneratePackage_SLCP101_p460(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-4A0平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x100000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x010000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x0BC000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x0FC000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x0FE000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GeneratePackage_DLSY001_p460(path_hex: Path):
""" 叠光优化器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSY001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光优化器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSY001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header

View File

@@ -0,0 +1,584 @@
import time
from math import ceil
from tqdm import tqdm
from pathlib import Path
from serial import Serial
from function.file_upgrade import build_header_new, file_encryption, parser_version_info
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.tools.IntelHex import file_IntelHex_to_Bin
from function.frame import check_frame_modbus, make_frame_modbus, print_display
ParamMap_LaminaController = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
# 6 - float
# 7 - numberList
0x0B: ["事件标志", 1],
0x0C: ["告警字1", 1],
0x0D: ["告警字2", 1],
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["系统工作状态" , 1],
0x11: ["Boost1工作状态" , 1],
0x12: ["Boost2工作状态" , 1],
0x13: ["开关机状态" , 1],
0x14: ["光伏组串1输入电压" , 2, 10],
0x15: ["光伏组串2输入电压" , 2, 10],
0x16: ["Boost1电感电流" , 2, 100],
0x17: ["Boost2电感电流" , 2, 100],
0x18: ["Boost输出电压" , 2, 10],
0x19: ["Boost输出总电流" , 2, 100],
0x1A: ["Boost1功率" , 3, 1000],
0x1C: ["Boost2功率" , 3, 1000],
0x1E: ["输入总功率" , 3, 1000],
0x20: ["LLC输出电压" , 2, 10],
0x21: ["端口输出电压" , 2, 10],
0x22: ["LLC输出电流均值" , 2, 100],
0x23: ["LLC输出电流峰值" , 2, 100],
0x24: ["绝缘检测电压" , 2, 10],
0x25: ["散热片温度" , 2, 10],
0x26: ["腔体1温度" , 2, 10],
0x27: ["腔体2温度" , 2, 10],
0x28: ["设备温度" , 2, 10],
0x50: ["启停控制命令" , 2],
0x51: ["故障清除命令" , 2],
0x52: ["参数还原命令" , 2],
0x53: ["设备复位命令" , 2],
0x54: ["模式更改命令" , 2],
0x55: ["短时停机命令(未启用)" , 2],
0x56: ["手动录波命令" , 2],
0x57: ["时间配置命令" , 7, 3],
0x60: ["整机运行使能", 1],
0x61: ["最小启动允许输入电压", 2, 10],
0x62: ["最大启动允许输入电压", 2, 10],
0x63: ["最小停机输入电压", 2, 10],
0x64: ["最大停机输入电压", 2, 10],
0x65: ["最小启动允许输出电压", 2, 10],
0x66: ["最大启动允许输出电压", 2, 10],
0x67: ["最小停止允许输出电压", 2, 10],
0x68: ["最大停止允许输出电压", 2, 10],
0x69: ["最小MPPT电流限值", 2, 100],
0x6A: ["最大MPPT电流限值", 2, 100],
0x6B: ["保留数据项", 2],
0x6C: ["最大功率限值", 3, 1000],
0x6E: ["最大功率限值存储值", 3, 1000],
0x70: ["Boost输入过压保护值", 2, 10],
0x71: ["Boost输出过压保护值", 2, 10],
0x72: ["LLC输出过压保护值", 2, 10],
0x73: ["LLC输出欠压保护值", 2, 10],
0x74: ["Boost电感过流保护值", 2, 100],
0x75: ["LLC输出电流均值保护值", 2, 100],
0x76: ["LLC输出电流峰值保护值", 2, 100],
0x77: ["保留数据项", 2],
0x78: ["过载保护值", 3, 1000],
0x7A: ["过温故障值", 2, 10],
0x7B: ["过温告警值", 2, 10],
0x7C: ["过温恢复值", 2, 10],
0x7D: ["输出继电器故障判断差值", 2, 10],
0x7E: ["LLC输出电压给定值", 2, 10],
0x7F: ["Boost输出电压给定值", 2, 10],
0x80: ["三点法中间阈值", 2, 10],
0x81: ["浮充电压", 2, 10],
0x82: ["恒压充电电压", 2, 10],
0x83: ["llc软起开始电压", 2, 10],
0x84: ["boost开始运行电压", 2, 10],
0x85: ["boost停止运行电压", 2, 10],
0x86: ["绝缘检测正阻抗限值", 3, 1],
0x88: ["绝缘检测负阻抗限值", 3, 1],
0x8A: ["抖动频率下限", 2, 10],
0x8B: ["抖动频率上限", 2, 10],
0x8C: ["保留地址项", 2],
0x8D: ["保留地址项", 2],
0x8E: ["保留地址项", 2],
0x8F: ["保留地址项", 2],
0x100: ["程序版本字符串", 4, 16],
0x110: ["设备型号字符串", 4, 16],
0x120: ["保留地址项", 4, 16],
0x130: ["生产厂家字符串", 4, 8],
0x138: ["保留地址项", 4, 8],
0x140: ["保留地址项", 4, 16],
0x150: ["保留地址项", 4, 16],
0x160: ["硬件版本字符串", 4, 16],
0x170: ["设备序列号", 4, 16],
0x180: ["设备MES码", 4, 16],
0x190: ["出厂日期批次", 4, 16],
}
class LaminaController:
""" 叠光控制器
"""
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
# 初始化串口通信
if com_name is not None:
com_config = {}
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, timeout=0, **com_config)
else:
self.__com = None
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
self.block = {
'addr' : addr_645,
'type' : 'modbus',
'data' : {
'addr_dev' : addr_modbus,
'data_define': ParamMap_LaminaController,
},
}
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None:
print(trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
time.sleep(10 * self.time_gap)
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
def frame_record(self) -> bool:
""" 读取录波数据
"""
param_saved = self.flag_print, self.retry, self.time_out
self.flag_print = False
self.retry = 3
self.time_out = 1.5
self.block['data']['file_block_size'] = 240
# 读取config
self.block['data']['type'] = 'record_cfg'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_cfg = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_cfg.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
# 读取data
self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_record = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_record.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
self.flag_print, self.retry, self.time_out = param_saved
if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3):
print("Operation_Record: 未取得录波数据.")
return False
# 处理配置信息
data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg])
config_record = {'LinePos': []}
pos = 0
if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2])
pos += 2
config_record['FaultWord'] = []
for i in range(0, len_faultword, 2):
faultword = data_cfg_bare[pos+i+1] * 0x100 + data_cfg_bare[pos+i]
config_record['FaultWord'].append(faultword)
pos += len_faultword
config_record['SysStatus'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultRecordPosition'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Alg'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelDescription'] = []
config_record['ChannelCoefficient'] = []
for i in range(config_record['ChannelNum']):
len_str = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
ch_desc = []
for j in range(0, len_str, 2):
ch_desc.append(data_cfg_bare[pos + j])
pos += len_str
ch_coe = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChannelDescription'].append(bytearray(ch_desc).decode())
config_record['ChannelCoefficient'].append(ch_coe)
if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FreqNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SampleFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SamplePoint'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['DataType'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['StartTime'] = time_stamp
pos += 14
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['TriggerTime'] = time_stamp
self.log['record']['config'] = config_record
self.log['record']['bare_config'] = data_cfg_bare
# 处理录波数据
data_record_bare = b"".join([x['data'] for x in frame_data_record])
data_record = []
pos = 0
while pos < len(data_record_bare):
record_point = {
'index': data_record_bare[pos+3] * 0x1000 +
data_record_bare[pos+2] * 0x100 +
data_record_bare[pos+1] * 0x10 +
data_record_bare[pos],
'timestamp': data_record_bare[pos+7] * 0x1000 +
data_record_bare[pos+6] * 0x100 +
data_record_bare[pos+5] * 0x10 +
data_record_bare[pos + 4],
'ChAlg': [],
'ChDgt': [],
}
pos += 8
for i in range(config_record['ChNum_Alg']):
point_data_alg = data_record_bare[pos+1] * 0x100 + data_record_bare[pos]
if data_record_bare[pos+1] & 0x80:
point_data_alg -= 0x10000
record_point['ChAlg'].append(point_data_alg)
pos += 2
for i in range(config_record['ChNum_Dgt']):
point_data_dgt = (data_record_bare[pos+(i // 8)] & (0x01 << (i % 8))) == (0x01 << (i % 8))
record_point['ChDgt'].append(point_data_dgt)
pos += 2
data_record.append(record_point)
self.log['record']['data'] = data_record
self.log['record']['bare_data'] = data_record_bare
return True
def frame_update(self, path_bin: Path, makefile: bool = False) -> bool:
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
if makefile:
self.block['data']['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin)
else:
self.block['data']['file'] = path_bin.read_bytes()
self.block['data']['header_offset'] = 184
self.block['data']['type'] = 'update'
param_saved = self.flag_print, self.retry, self.time_out
self.retry = 5
self.time_out = 0.5
self.flag_print = False
# 启动帧
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
return False
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
self.flag_print, self.retry, self.time_out = param_saved
return True
def GeneratePackage_DLSP001_p280039(path_hex: Path):
""" 叠光控制器DSP-280039平台版本 生成升级包 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False)
encrypt_main = file_encryption(bin_main)
info_version = parser_version_info(bin_main, 0x88000)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(info_version['name'])[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x02, 0x00] # 备份程序
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
main_header_buffer = bytearray()
for byte_org in main_header:
main_header_buffer.extend(bytearray([0x00, byte_org]))
back_header_buffer = bytearray()
for byte_org in back_header:
back_header_buffer.extend(bytearray([0x00, byte_org]))
main_encrypt = file_encryption(bin_main)
back_encrypt = file_encryption(bin_back)
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 2 * 0x030000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 2 * 0x006000
Image[offset_image: offset_image + len(main_header_buffer)] = main_header_buffer
offset_image = 2 * 0x007000
Image[offset_image: offset_image + len(back_header_buffer)] = back_header_buffer
offset_image = 2 * 0x008000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 2 * 0x01C000
Image[offset_image: offset_image + len(bin_back)] = bin_back
return bytearray(Image), main_header, back_header

View File

@@ -1 +1,2 @@
from EnergyRouter import EnergyRouter from . import EnergyRouter
from . import LaminaController

View File

@@ -0,0 +1,242 @@
import hashlib
from pathlib import Path
from datetime import datetime
from crc import Calculator, Crc16
from tools.IntelHex import file_IntelHex_to_Bin, file_Bin_to_IntelHex
Header_Tag = {
'file_type': [0x00, 2], # 0 - 文件类型; 2byte
'file_version': [0x01, 2], # 1 - 文件版本; 2byte
'file_length': [0x02, 4], # 2 - 文件长度; 4byte
'md5': [0x03, 16], # 3 - 文件MD5; 16byte
'encrypt': [0x04, 1], # 4 - 加密算法; 1byte
'update_type': [0x05, 1], # 5 - 升级文件类别; 1byte
'update_spec': [0x06, 4], # 6 - 升级特征字; 4byte
'update_verison': [0x07, 4], # 7 - 升级版本号; 4byte
'update_date': [0x08, 3], # 8 - 升级版本日期; 3byte
'area_code': [0x09, 4], # 9 - 省份特征; 4byte
'uptate_str': [0x0A, -1, 64], # 10 - 升级段描述; less than 64byte
'device_str': [0x0D, -1, 64], # 13 - 设备特征描述; less than 64byte
'hex_name': [0xFF, -1, 80], # 255 - Hex文件名; less than 80byte
}
def file_encryption(buffer):
""" 文件加密算法 """
pwd_idx = 0
pwd = b'moc.mmocspot.www'
pwd = list(map(lambda x: (x - 0x30 + 0x100) % 0x100, pwd))
result = bytearray(len(buffer))
for i in range(len(buffer)):
k = i
k |= i >> 8
k |= i >> 16
k |= i >> 24
result[i] = buffer[i] ^ pwd[pwd_idx] ^ (k & 0xFF)
pwd_idx = (pwd_idx + 1) % len(pwd)
return result
def build_header(config: dict, len_max=512):
"""
基于配置参数, 生成文件信息头;
V1版本, 依据字典生成tag标签组;
"""
# 定义文件头
m_file_header = bytearray(len_max)
header_len = 11
tag_num = 0
for tag, value in config.items():
if tag in Header_Tag.keys():
if tag == 'hex_name' and len_max < 256:
""" 当文件头长度不足时, 跳过文件名标签 """
continue
elif Header_Tag[tag][1] == -1:
tag_len = min(len(value), Header_Tag[tag][2])
else:
tag_len = Header_Tag[tag][1]
tag_date = [Header_Tag[tag][0], tag_len] + value[:tag_len]
m_file_header[header_len: header_len + tag_len + 2] = bytearray(tag_date)
tag_num += 1
header_len += 2 + tag_len
m_file_header[0:8] = b"TOPSCOMM"
m_file_header[8] = ((header_len - 10) % 0x100)
m_file_header[9] = ((header_len - 10) // 0x100)
m_file_header[10] = tag_num
m_file_header[header_len] = sum(m_file_header[:header_len]) % 0x100
m_file_header[header_len+1] = sum(m_file_header[:header_len]) // 0x100
if header_len+2 > len_max:
return None
else:
return m_file_header
def build_header_lite(config: dict):
"""
基于配置参数, 生成文件信息头;
V2版本, 仅提供必要信息;
"""
# 定义文件头
m_file_header = bytearray(64)
m_file_header[0:4] = bytearray(config["update_verison"])
m_file_header[4:8] = bytearray(config["file_length"])
m_file_header[8:24] = bytearray(config["md5"])
return m_file_header
def build_header_new(config: dict):
"""
基于配置参数, 生成新版文件信息头;
V3版本, 依据新版格式填充数据;
"""
# 定义文件头
m_file_header = [0xFF] * 184
m_file_header[0:8] = list(b"TOPSCOMM")
m_file_header[8:10] = config['prod_type']
m_file_header[10:22] = config['prog_id'] + [0] * (12 - len(config['prog_id']))
if config['method_compress'] == True:
m_file_header[23] = 0x01
else:
m_file_header[23] = 0x00
if 'crc32' in config.keys():
m_file_header[22] = 0x00
m_file_header[24: 40] = config['crc32'] + [0x00] * 12
elif 'md5' in config.keys():
m_file_header[22] = 0x01
m_file_header[24: 40] = config['md5']
else:
raise Exception("Error, Unknown method verify.")
# 时间戳生成
time_now = datetime.now()
time_stamp = list(map(lambda x: int(x),
time_now.strftime("%Y-%m-%d-%H-%M").split('-')))
time_stamp.insert(1, time_stamp[0] // 0x100)
time_stamp[0] = time_stamp[0] % 0x100
m_file_header[40: 46] = time_stamp
m_file_header[46: 50] = config['file_length']
# Cpu1
m_file_header[50: 54] = [0x00] * 4
m_file_header[54: 70] = [0x00] * 16
# Cpu2
m_file_header[70: 74] = [0x00] * 4
m_file_header[74: 90] = [0x00] * 16
if config['prog_type'] == 'app':
m_file_header[90: 92] = [0x00, 0x00]
elif config['prog_type'] == 'boot':
m_file_header[90: 92] = [0x01, 0x00]
elif config['prog_type'] == 'diff':
m_file_header[90: 92] = [0x02, 0x00]
elif config['prog_type'] == 'font':
m_file_header[90: 92] = [0x03, 0x00]
elif config['prog_type'] == 'config':
m_file_header[90: 92] = [0x04, 0x00]
elif config['prog_type'] == 'data':
m_file_header[90: 92] = [0x05, 0x00]
elif config['prog_type'] == 'test':
m_file_header[90: 92] = [0x06, 0x00]
else:
raise Exception("Error, Unknown Program Type.")
m_file_header[92: 94] = config['area_code']
m_file_header[94: 158] = config['hex_name']
if 'upgrade_type' in config.keys():
m_file_header[158: 160] = config['upgrade_type']
m_file_header[160: 182] = [0x00] * 22
else:
m_file_header[158: 182] = [0x00] * 24
m_file_header = bytearray(m_file_header)
calculator = Calculator(Crc16.MODBUS)
code_crc16 = calculator.checksum(m_file_header[:-2])
m_file_header[182: 184] = [code_crc16 // 0x100, code_crc16 % 0x100]
return m_file_header
def parser_version_info(file_bin: bytearray, base_addr:int):
""" 解析Bin文件内的版本信息结构体 """
address_block = (file_bin[6] << 24) + (file_bin[7] << 16) + (file_bin[4] << 8) + file_bin[5]
address_block -= base_addr
address_block *= 2
offset = address_block
block_version = {}
block_version['name'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
block_version['device'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
block_version['factory'] = file_bin[offset + 1 : offset + 32 : 2]
offset += 32
block_version['hardware'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
return block_version
def test1(fp: Path):
""" bin文件生成测试 """
file1 = Path(fp)
path1 = file1.parent / (file1.stem + ".bin")
data1 = file1.read_text()
data2 = file_IntelHex_to_Bin(data1, 0x3F0100)
path1.write_bytes(data2)
# 测试Bin到IntelHex
bin = Path(fp).read_bytes()
result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2)
(fp.parent / (fp.stem + ".hex")).write_text(result)
def test2():
""" 校验, 加密测试 """
# Header - Crc16
bin_offcial = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p460\result\lamina_adapter_t1.dat")
data_offcial = bin_offcial.read_bytes()
byte_data = data_offcial[:182]
crc = data_offcial[182:184]
# data = "54 4F 50 53 43 4F 4D 4D 45 00 53 4C 43 50 30 30 31 00 00 00 00 00 01 00 B6 61 A8 73 BF 82 9E A7 4C 79 F6 BB 94 E2 A5 18 E8 07 05 18 0B 17 18 4C 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 6C 61 6D 69 6E 61 5F 61 64 61 70 74 65 72 5F 6D 61 69 6E 02 00 00 00 00 20 10 53 06 00 00 00 00 80 A5 8F 02 00 00 00 00 EC 1F 40 00 00 00 00 00 00 00 00 00 00 00 00 00 7A CA 61 0A FD 7F 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00"
# byte_data = list(map(lambda x: int(x, 16), data.split(' ')))
# crc = "99 CB"
calculator = Calculator(Crc16.MODBUS)
code_crc16 = calculator.checksum(bytearray(byte_data))
print(f"Result = {hex(code_crc16)}, Offcial Result = {crc}")
# File - md5
data_bin = '123456 123456 '.encode()
md5_ctx = hashlib.md5()
md5_ctx.update(data_bin)
hash = md5_ctx.hexdigest()
# File - encrypt
buffer1 = data_bin[:]
buffer1_en = file_encryption(buffer1)
buffer2 = buffer1_en[:6] + buffer1[6:]
buffer2_de = file_encryption(buffer2)
pass
def task5():
""" 文件缓冲区对比测试 """
file_dat = Path(r"test\p280039\result\lamina_controller_dsp_t1.dat")
file_dat_buffer = Path(r"test\p280039\result\lamina_controller_dsp_buffer.bin")
file_dat_buffer.exists(), file_dat.exists()
data_dat = file_dat.read_bytes()
data_dat_buffer = file_dat_buffer.read_bytes()
for i in range(len(data_dat)):
if data_dat[i] != data_dat_buffer[2*i]:
print(f"Diff in {hex(i)}, Data: {data_dat[i]}!={data_dat_buffer[2*i]}")
if __name__ == "__main__":
# path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN")
# GeneratePackage_Demo_Xilinx(path_bin)
pass

View File

@@ -1,4 +1,3 @@
import re
import struct import struct
from crc import Calculator, Crc16 from crc import Calculator, Crc16
from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex
@@ -10,6 +9,7 @@ modbus_map = {
# 4 - str # 4 - str
# 5 - addr # 5 - addr
# 6 - float # 6 - float
# 7 - numberList
0x01: ["Hex示例", 1], 0x01: ["Hex示例", 1],
0x02: ["Int示例", 2], 0x02: ["Int示例", 2],
0x03: ["Int32示例", 3], 0x03: ["Int32示例", 3],
@@ -301,7 +301,7 @@ def find_frame_dlt645(buffer:bytes, address: list) -> bytes:
for i in range(len_buffer): for i in range(len_buffer):
if buffer[i] != 0x68 or buffer[i+7] != 0x68: if buffer[i] != 0x68 or buffer[i+7] != 0x68:
continue continue
if address[0] != 0xAA and buffer[i+1:i+7] == bytes(address): if address[0] != 0xAA and buffer[i+1:i+7] != bytes(address):
continue continue
j = buffer[i+9] + 12 j = buffer[i+9] + 12
@@ -355,7 +355,11 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict
""" 字符串表示 """ """ 字符串表示 """
data_len = current_map[2] data_len = current_map[2]
item = swapping_words(data, data_len) item = swapping_words(data, data_len)
item = item.replace(b'\xff', b' 0xFF').decode() try:
item = item.decode()
except Exception as ex:
item_len = sum([any(item[i:]) for i in range(len(item))])
item = trans_list_to_str(item[:item_len])
elif current_map[1] == 5: elif current_map[1] == 5:
""" 载波地址表示 """ """ 载波地址表示 """
data_len = current_map[2] data_len = current_map[2]

View File

@@ -1,724 +0,0 @@
# 升级包生成脚本
import hashlib
from pathlib import Path
from datetime import datetime
from crc import Calculator, Crc16
from tools.ByteConv import trans_list_to_str, conv_int_to_array
from tools.IntelHex import file_IntelHex_to_Bin, file_Bin_to_IntelHex
Header_Tag = {
'file_type': [0x00, 2], # 0 - 文件类型; 2byte
'file_version': [0x01, 2], # 1 - 文件版本; 2byte
'file_length': [0x02, 4], # 2 - 文件长度; 4byte
'md5': [0x03, 16], # 3 - 文件MD5; 16byte
'encrypt': [0x04, 1], # 4 - 加密算法; 1byte
'update_type': [0x05, 1], # 5 - 升级文件类别; 1byte
'update_spec': [0x06, 4], # 6 - 升级特征字; 4byte
'update_verison': [0x07, 4], # 7 - 升级版本号; 4byte
'update_date': [0x08, 3], # 8 - 升级版本日期; 3byte
'area_code': [0x09, 4], # 9 - 省份特征; 4byte
'uptate_str': [0x0A, -1, 64], # 10 - 升级段描述; less than 64byte
'device_str': [0x0D, -1, 64], # 13 - 设备特征描述; less than 64byte
'hex_name': [0xFF, -1, 80], # 255 - Hex文件名; less than 80byte
}
def file_encryption(buffer):
""" 文件加密算法 """
pwd_idx = 0
pwd = b'moc.mmocspot.www'
pwd = list(map(lambda x: (x - 0x30 + 0x100) % 0x100, pwd))
result = bytearray(len(buffer))
for i in range(len(buffer)):
k = i
k |= i >> 8
k |= i >> 16
k |= i >> 24
result[i] = buffer[i] ^ pwd[pwd_idx] ^ (k & 0xFF)
pwd_idx = (pwd_idx + 1) % len(pwd)
return result
def build_header(config: dict, len_max=512):
"""
基于配置参数, 生成文件信息头;
V1版本, 依据字典生成tag标签组;
"""
# 定义文件头
m_file_header = bytearray(len_max)
header_len = 11
tag_num = 0
for tag, value in config.items():
if tag in Header_Tag.keys():
if tag == 'hex_name' and len_max < 256:
""" 当文件头长度不足时, 跳过文件名标签 """
continue
elif Header_Tag[tag][1] == -1:
tag_len = min(len(value), Header_Tag[tag][2])
else:
tag_len = Header_Tag[tag][1]
tag_date = [Header_Tag[tag][0], tag_len] + value[:tag_len]
m_file_header[header_len: header_len + tag_len + 2] = bytearray(tag_date)
tag_num += 1
header_len += 2 + tag_len
m_file_header[0:8] = b"TOPSCOMM"
m_file_header[8] = ((header_len - 10) % 0x100)
m_file_header[9] = ((header_len - 10) // 0x100)
m_file_header[10] = tag_num
m_file_header[header_len] = sum(m_file_header[:header_len]) % 0x100
m_file_header[header_len+1] = sum(m_file_header[:header_len]) // 0x100
if header_len+2 > len_max:
return None
else:
return m_file_header
def build_header_lite(config: dict):
"""
基于配置参数, 生成文件信息头;
V2版本, 仅提供必要信息;
"""
# 定义文件头
m_file_header = bytearray(64)
m_file_header[0:4] = bytearray(config["update_verison"])
m_file_header[4:8] = bytearray(config["file_length"])
m_file_header[8:24] = bytearray(config["md5"])
return m_file_header
def build_header_new(config: dict):
"""
基于配置参数, 生成新版文件信息头;
V3版本, 依据新版格式填充数据;
"""
# 定义文件头
m_file_header = [0xFF] * 184
m_file_header[0:8] = list(b"TOPSCOMM")
m_file_header[8:10] = config['prod_type']
m_file_header[10:22] = config['prog_id'] + [0] * (12 - len(config['prog_id']))
if config['method_compress'] == True:
m_file_header[23] = 0x01
else:
m_file_header[23] = 0x00
if 'crc32' in config.keys():
m_file_header[22] = 0x00
m_file_header[24: 40] = config['crc32'] + [0x00] * 12
elif 'md5' in config.keys():
m_file_header[22] = 0x01
m_file_header[24: 40] = config['md5']
else:
raise Exception("Error, Unknown method verify.")
# 时间戳生成
time_now = datetime.now()
time_stamp = list(map(lambda x: int(x),
time_now.strftime("%Y-%m-%d-%H-%M").split('-')))
time_stamp.insert(1, time_stamp[0] // 0x100)
time_stamp[0] = time_stamp[0] % 0x100
m_file_header[40: 46] = time_stamp
m_file_header[46: 50] = config['file_length']
# Cpu1
m_file_header[50: 54] = [0x00] * 4
m_file_header[54: 70] = [0x00] * 16
# Cpu2
m_file_header[70: 74] = [0x00] * 4
m_file_header[74: 90] = [0x00] * 16
if config['prog_type'] == 'app':
m_file_header[90: 92] = [0x00, 0x00]
elif config['prog_type'] == 'boot':
m_file_header[90: 92] = [0x01, 0x00]
elif config['prog_type'] == 'diff':
m_file_header[90: 92] = [0x02, 0x00]
elif config['prog_type'] == 'font':
m_file_header[90: 92] = [0x03, 0x00]
elif config['prog_type'] == 'config':
m_file_header[90: 92] = [0x04, 0x00]
elif config['prog_type'] == 'data':
m_file_header[90: 92] = [0x05, 0x00]
elif config['prog_type'] == 'test':
m_file_header[90: 92] = [0x06, 0x00]
else:
raise Exception("Error, Unknown Program Type.")
m_file_header[92: 94] = config['area_code']
m_file_header[94: 158] = config['hex_name']
if 'upgrade_type' in config.keys():
m_file_header[158: 160] = config['upgrade_type']
m_file_header[160: 182] = [0x00] * 22
else:
m_file_header[158: 182] = [0x00] * 24
m_file_header = bytearray(m_file_header)
calculator = Calculator(Crc16.MODBUS)
code_crc16 = calculator.checksum(m_file_header[:-2])
m_file_header[182: 184] = [code_crc16 // 0x100, code_crc16 % 0x100]
return m_file_header
def parser_version_info(file_bin: bytearray, base_addr:int):
""" 解析Bin文件内的版本信息结构体 """
address_block = (file_bin[6] << 24) + (file_bin[7] << 16) + (file_bin[4] << 8) + file_bin[5]
address_block -= base_addr
address_block *= 2
offset = address_block
block_version = {}
block_version['name'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
block_version['device'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
block_version['factory'] = file_bin[offset + 1 : offset + 32 : 2]
offset += 32
block_version['hardware'] = file_bin[offset + 1 : offset + 64 : 2]
offset += 64
return block_version
def test1(fp: Path):
""" bin文件生成测试 """
file1 = Path(fp)
path1 = file1.parent / (file1.stem + ".bin")
data1 = file1.read_text()
data2 = file_IntelHex_to_Bin(data1, 0x3F0100)
path1.write_bytes(data2)
# 测试Bin到IntelHex
bin = Path(fp).read_bytes()
result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2)
(fp.parent / (fp.stem + ".hex")).write_text(result)
def test2():
""" 校验, 加密测试 """
# Header - Crc16
bin_offcial = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p460\result\lamina_adapter_t1.dat")
data_offcial = bin_offcial.read_bytes()
byte_data = data_offcial[:182]
crc = data_offcial[182:184]
# data = "54 4F 50 53 43 4F 4D 4D 45 00 53 4C 43 50 30 30 31 00 00 00 00 00 01 00 B6 61 A8 73 BF 82 9E A7 4C 79 F6 BB 94 E2 A5 18 E8 07 05 18 0B 17 18 4C 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 6C 61 6D 69 6E 61 5F 61 64 61 70 74 65 72 5F 6D 61 69 6E 02 00 00 00 00 20 10 53 06 00 00 00 00 80 A5 8F 02 00 00 00 00 EC 1F 40 00 00 00 00 00 00 00 00 00 00 00 00 00 7A CA 61 0A FD 7F 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00"
# byte_data = list(map(lambda x: int(x, 16), data.split(' ')))
# crc = "99 CB"
calculator = Calculator(Crc16.MODBUS)
code_crc16 = calculator.checksum(bytearray(byte_data))
print(f"Result = {hex(code_crc16)}, Offcial Result = {crc}")
# File - md5
data_bin = '123456 123456 '.encode()
md5_ctx = hashlib.md5()
md5_ctx.update(data_bin)
hash = md5_ctx.hexdigest()
# File - encrypt
buffer1 = data_bin[:]
buffer1_en = file_encryption(buffer1)
buffer2 = buffer1_en[:6] + buffer1[6:]
buffer2_de = file_encryption(buffer2)
pass
def task5():
""" 文件缓冲区对比测试 """
file_dat = Path(r"test\p280039\result\lamina_controller_dsp_t1.dat")
file_dat_buffer = Path(r"test\p280039\result\lamina_controller_dsp_buffer.bin")
file_dat_buffer.exists(), file_dat.exists()
data_dat = file_dat.read_bytes()
data_dat_buffer = file_dat_buffer.read_bytes()
for i in range(len(data_dat)):
if data_dat[i] != data_dat_buffer[2*i]:
print(f"Diff in {hex(i)}, Data: {data_dat[i]}!={data_dat_buffer[2*i]}")
def GeneratePackage_Demo_Xilinx(path_bin: Path):
""" 完整升级包生成测试 """
config = {
'file_type': [0x10, 0x01], # Xilinx-Demo 自机升级文件
'file_version': [0x00, 0x00], # 文件版本-00 用于兼容文件格式升级
# 'file_length': [], # 文件长度(自动生成)
# 'md5': [], # 文件MD5(自动生成)
'encrypt': [0x01], # 默认加密算法
'update_type': [0x01], # APP升级
'update_spec': [0x00, 0x00, 0x00, 0x00], # 升级特征字
'update_verison': [0x02, 0x00, 0x00, 0x01], # 升级版本号
'update_date': [0x22, 0x04, 0x24], # 升级版本日期
# 'area_code': [], # 省份特征
# 'uptate_str': [], # 升级段描述
# 'device_str': [], # 设备特征描述
# 'hex_name': [], # Hex文件名(自动读取)
# 文件Hex结构信息
# 'flash_addr': 0x3E8020, # 程序起始地址
# 'flash_size': 0x005FC0, # 程序空间大小
}
data_bin = path_bin.read_bytes()
md5_ctx = hashlib.md5()
md5_ctx.update(data_bin)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(data_bin))
config['hex_name'] = list(path_bin.name.encode())[:80]
if (header:= build_header(config, 128)) is None:
raise Exception("Header tag oversize. ")
if (header_512:= build_header(config, 512)) is None:
raise Exception("Header tag oversize. ")
data_encrypt = file_encryption(data_bin)
print("Upgrade file generated successfully.")
print(f"\t header_length={len(header)}, bin_length={len(data_bin)}[{hex(len(data_bin))}]")
print(f"\t file md5: {trans_list_to_str(config['md5'])}")
file1 = path_bin.parent / (path_bin.stem + '.dat')
file1.write_bytes(header + data_bin)
file2 = path_bin.parent / (path_bin.stem + '_h512.dat')
file2.write_bytes(header_512 + data_bin)
def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-4A0平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x100000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x010000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x0BC000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x0FC000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x0FE000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GeneratePackage_SLCP101_p460(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光优化器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSY001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GeneratePackage_DLSY001_p460(path_hex: Path):
""" 叠光优化器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSY001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x02, 0x00] # 备份程序
config['file_length'] = conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
main_header_buffer = bytearray()
for byte_org in main_header:
main_header_buffer.extend(bytearray([0x00, byte_org]))
back_header_buffer = bytearray()
for byte_org in back_header:
back_header_buffer.extend(bytearray([0x00, byte_org]))
main_encrypt = file_encryption(bin_main)
back_encrypt = file_encryption(bin_back)
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 2 * 0x030000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 2 * 0x006000
Image[offset_image: offset_image + len(main_header_buffer)] = main_header_buffer
offset_image = 2 * 0x007000
Image[offset_image: offset_image + len(back_header_buffer)] = back_header_buffer
offset_image = 2 * 0x008000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 2 * 0x01C000
Image[offset_image: offset_image + len(bin_back)] = bin_back
return bytearray(Image), main_header, back_header
def GeneratePackage_DLSP001_p280039(path_hex: Path):
""" 叠光控制器DSP-280039平台版本 生成升级包 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"DLSP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False)
encrypt_main = file_encryption(bin_main)
info_version = parser_version_info(bin_main, 0x88000)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['hex_name'] = list(info_version['name'])[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def Process1():
""" 镜像生成流程 """
root = Path(r"test\p460")
result = Path(r"test\p460\result")
# 正常启动镜像
hex_boot = root / r"bootloader.hex"
hex_main = root / r"lamina_adapter.hex"
hex_back = root / r"lamina_adapter_back.hex"
hex_update = root / r"lamina_adapter_t1.hex"
file_image = result / f'{hex_main.stem[:-6]}_ROM.bin'
file_main_header = result / 'SLCP101_header_main.bin'
file_back_header = result / 'SLCP101_header_back.bin'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_SLCP101_p460(hex_boot, hex_main, hex_back)
data_package, data_bins = GeneratePackage_SLCP101_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bins)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def Process2():
""" 镜像生成流程 """
root = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug")
# result = Path(r"test\p460_o1\result")
result = root
# 正常启动镜像
hex_boot = Path(r"test\p460_o1\bootloader.hex")
hex_main = root / r"lamina_optimizer.hex"
hex_back = root / r"lamina_optimizer.hex"
hex_update = root / r"lamina_optimizer.hex"
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'DLSY001_header_main.bin'
file_back_header = result / 'DLSY001_header_back.bin'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_DLSY001_p460(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_DLSY001_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
if __name__ == "__main__":
# path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN")
# GeneratePackage_Demo_Xilinx(path_bin)
Process2()
pass

View File

@@ -1,9 +1,9 @@
import time import time
from webui import webui from webui import webui
from pathlib import Path from pathlib import Path
from source.func_frame import check_frame_dlt645 from function.frame import check_frame_dlt645
from source.dev_LaminaAdapter import LaminaAdapter from device.EnergyRouter import EnergyRouter
from source.device.EnergyRouter import EnergyRouter from device.LaminaAdapter import LaminaAdapter
def my_function(e : webui.event): def my_function(e : webui.event):

View File

@@ -1,7 +1,7 @@
import time import time
from tools.ByteConv import display_hex from function.tools.ByteConv import display_hex
from dev_LaminaController import LaminaController from device.LaminaController import LaminaController
from dev_LaminaController import ParamMap_LaminaController from device.LaminaController import ParamMap_LaminaController
def test_communication(device: LaminaController, time_out=2): def test_communication(device: LaminaController, time_out=2):

View File

@@ -1,2 +0,0 @@
from . import ByteConv
from . import IntelHex