重构设备类定义, 使用继承简化代码;

This commit is contained in:
2024-11-18 02:49:30 +08:00
parent 628ce8bf27
commit 10885c9101
8 changed files with 873 additions and 243 deletions

View File

@@ -1,6 +1,5 @@
import time import time
from pathlib import Path from pathlib import Path
from typing import Callable, Any
from device.LaminaAdapter import LaminaAdapter from device.LaminaAdapter import LaminaAdapter
from device.LaminaAdapter import GeneratePackage_SLCP001_p4a0, GeneratePackage_SLCP101_p460, GeneratePackage_DLSY001_p460 from device.LaminaAdapter import GeneratePackage_SLCP001_p4a0, GeneratePackage_SLCP101_p460, GeneratePackage_DLSY001_p460
from device.LaminaAdapter import GeneratePackage_SLCP102_p460 from device.LaminaAdapter import GeneratePackage_SLCP102_p460

View File

@@ -1,30 +1,33 @@
import time import time
from serial import Serial from serial import Serial
from serial.tools import list_ports
from . import tools from .tools import ByteConv
from . import function from .function import protocols
class DeviceSerial: class DeviceSerial:
""" 串口通信设备原型 """ 串口通信设备原型
Note: 串口资源释放与重复开启 Note: 串口资源释放与重复开启
""" """
def __init__(self, com_name="COM1", **kwargs): def __init__(self, com_name, callbacks, **kwargs):
""" 初始化设备 """ """ 初始化设备 """
self.__com = None
if com_name is not None: if com_name is not None:
com_config = {} self.open_connection(com_name, **kwargs)
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
self.__com = Serial(com_name, timeout=0, **com_config)
else:
self.__com = None
self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
match callbacks:
case (maker, parser):
self.frame_maker = maker if maker is not None else lambda self: ''
self.frame_parser = parser if parser is not None else lambda self, frame: ''
case _:
self.farme_maker = lambda self: ''
self.frame_parser = lambda self, frame: ''
self.output = { self.output = {
'result': False, 'result': False,
'code_func': 0x00, 'code_func': 0x00,
@@ -45,28 +48,33 @@ class DeviceSerial:
time_start, time_current, flag_frame = time.time(), time.time(), False time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out: while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap) time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time() time_current = time.time()
bytes_read = self.__com.read_all()
if flag_frame and len(bytes_read) == 0: if flag_frame and len(bytes_read) == 0:
break break
elif len(bytes_read): elif len(bytes_read):
flag_frame = True flag_frame = True
frame_recv += bytes_read frame_recv += bytes_read
try: try:
self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) self.output = self.frame_parser(frame_recv)
if self.flag_print: if self.flag_print:
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv))
except Exception as ex: except Exception as ex:
print("Error Info: ", ex) print("Error Info: ", ex)
if self.flag_print and frame_recv: if self.flag_print and frame_recv:
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False self.output['result'] = False
return self.output['result'] return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
def _transfer_data(self) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """ """ 串口收发报文, 包含重试逻辑与数据打印 """
# 生成发送帧
frame: bytearray = self.frame_maker()
if self.__com is None: if self.__com is None:
print(tools.ByteConv.trans_list_to_str(frame)) """ 无效通信接口, 打印报文后返回 """
print(ByteConv.trans_list_to_str(frame))
return False return False
fail_count = 0 fail_count = 0
@@ -78,22 +86,82 @@ class DeviceSerial:
if self.flag_print and frame_discard: if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard) print("Discard Data: " , frame_discard)
if self.flag_print: if self.flag_print:
print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) print("Send Frame: ", ByteConv.trans_list_to_str(frame))
time.sleep(10 * self.time_gap) time.sleep(2 * self.time_gap)
if self.__read_frame(): if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys(): if (self.flag_print is not None) and 'Regs' in self.output.keys():
function.frame.print_display(self.output['Regs']) protocols.print_display(self.output['Regs'])
self.log['read'] += 1 self.log['read'] += 1
break break
else:
fail_count += 1 fail_count += 1
if fail_count >= self.log['keep-fail']: self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail']
self.log['keep-fail'] = fail_count time.sleep(2 * self.time_out)
time.sleep(2*self.time_out)
return fail_count < self.retry return fail_count < self.retry
def close_connection(self) -> bool:
""" 关闭连接, 释放通信资源
"""
if self.__com is not None:
self.__com.close()
return self.__com is None or (not self.__com.is_open)
def open_connection(self, port=None, **kwargs) -> bool:
""" 打开连接, 更新或重新配置通信资源
"""
com_config = {
'baudrate': 115200,
'parity': 'N',
'bytesize': 8,
'stopbits': 1,
}
serial_close = lambda com: com.close() if com.isOpen() else None
serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports())
match (self.__com, port, kwargs):
case (None, str() as port, None):
""" 使用默认参数打开串口 """
self.__com = Serial(port, timeout=0, **com_config)
case (None, str() as port, dict() as kwargs):
""" 使用指定参数打开串口 """
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate']
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity']
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize']
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits']
self.__com = Serial(port, timeout=0, **com_config)
case (Serial() as com, None, None):
""" 无参数重开串口 """
serial_close(self.__com)
com.open()
case (Serial() as com, port, None):
""" 重新指定端口号并打开串口 """
serial_close(self.__com)
if serial_port_check(port):
raise ValueError("无效串口端口: %s" % port)
com.port = port
com.open()
case (None, str() as port, dict() as kwargs):
""" 重新指定端口号与配置并打开串口 """
serial_close(self.__com)
if serial_port_check(port):
raise ValueError("无效串口端口: %s" % port)
com.port = port
com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate']
com.parity = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity']
com.bytesize = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize']
com.stopbits = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits']
com.open()
case _:
""" 匹配失败, 报错 """
raise ValueError("Invalid config.")
return self.__com.is_open

View File

@@ -54,11 +54,11 @@ class EnergyRouter:
time.sleep(self.time_out) time.sleep(self.time_out)
frame_recv = self.tcp_socket.recv(128) frame_recv = self.tcp_socket.recv(128)
self.output = function.frame.check_frame_modbus(frame_recv, self.block) self.output = function.protocols.check_frame_modbus(frame_recv, self.block)
if self.flag_print: if self.flag_print:
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
if 'Regs' in self.output.keys(): if 'Regs' in self.output.keys():
function.frame.print_display(self.output['Regs']) function.protocols.print_display(self.output['Regs'])
except Exception as ex: except Exception as ex:
print("Error Info: ", ex) print("Error Info: ", ex)
if self.flag_print and frame_recv: if self.flag_print and frame_recv:
@@ -71,7 +71,7 @@ class EnergyRouter:
self.block['type'] = 'read' self.block['type'] = 'read'
self.block['data_addr'] = daddr self.block['data_addr'] = daddr
self.block['data_len'] = dlen self.block['data_len'] = dlen
frame = function.frame.make_frame_modbus(self.block) frame = function.protocols.make_frame_modbus(self.block)
return self.__transfer_data(frame) return self.__transfer_data(frame)
@@ -86,7 +86,7 @@ class EnergyRouter:
self.block['file'] = Path(path_bin).read_bytes() self.block['file'] = Path(path_bin).read_bytes()
self.block['header_offset'] = 128 self.block['header_offset'] = 128
# 启动帧 # 启动帧
frame_master = function.frame.make_frame_modbus(self.block) frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master): if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
@@ -116,7 +116,7 @@ class EnergyRouter:
continue continue
seq_window[i] = 1 seq_window[i] = 1
self.block['index'] = seq_offset + i self.block['index'] = seq_offset + i
seq_frame_master[i] = function.frame.make_frame_modbus(self.block) seq_frame_master[i] = function.protocols.make_frame_modbus(self.block)
self.tcp_socket.send(seq_frame_master[i]) self.tcp_socket.send(seq_frame_master[i])
# 接收帧回复 # 接收帧回复
tmp = list(zip(range(len(seq_window)), seq_window)) tmp = list(zip(range(len(seq_window)), seq_window))
@@ -126,7 +126,7 @@ class EnergyRouter:
# 接收到空数据, 对端已关闭连接 # 接收到空数据, 对端已关闭连接
if seq_frame_slave[i] == '': if seq_frame_slave[i] == '':
raise Exception("TCP closed.") raise Exception("TCP closed.")
self.output = function.frame.check_frame_modbus(seq_frame_slave[i], None) self.output = function.protocols.check_frame_modbus(seq_frame_slave[i], None)
seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope'] seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope']
if seq_current < seq_offset: if seq_current < seq_offset:
raise Exception("Error.") raise Exception("Error.")
@@ -152,14 +152,14 @@ class EnergyRouter:
# 结束升级 # 结束升级
self.block['step'] = 'end' self.block['step'] = 'end'
frame_master = function.frame.make_frame_modbus(self.block) frame_master = function.protocols.make_frame_modbus(self.block)
while self.output['result'] is False: while self.output['result'] is False:
self.tcp_socket.send(frame_master) self.tcp_socket.send(frame_master)
frame_slave = self.tcp_socket.recv(8) frame_slave = self.tcp_socket.recv(8)
if frame_slave == '': if frame_slave == '':
raise Exception("TCP closed.") raise Exception("TCP closed.")
self.output = function.frame.check_frame_modbus(frame_slave[:18], self.block) self.output = function.protocols.check_frame_modbus(frame_slave[:18], self.block)
def GeneratePackage_Demo_Xilinx(path_bin: Path): def GeneratePackage_Demo_Xilinx(path_bin: Path):

View File

@@ -3,10 +3,11 @@ import hashlib
from math import ceil from math import ceil
from tqdm import tqdm from tqdm import tqdm
from pathlib import Path from pathlib import Path
from serial import Serial
from . import tools from . import tools
from . import function from .tools import ByteConv, IntelHex
from .function import protocols, file_upgrade
from .DeviceSerial import DeviceSerial
ParamMap_LaminaAdapter = { ParamMap_LaminaAdapter = {
# 1 - Hex # 1 - Hex
@@ -120,27 +121,59 @@ ParamMap_LaminaAdapter = {
0x190: ["Datetime", 4, 16], 0x190: ["Datetime", 4, 16],
} }
MemoryMap_SLCP001 = {
'image_size': 0x100000, # 镜像文件大小
'app_size': 0x040000, # 应用程序大小
'boot_size': 0x010000, # Boot程序大小
class LaminaAdapter: 'boot_addr': 0x000000, # Boot程序地址
'main_addr': 0x010000, # main程序地址
'back_addr': 0x0BC000, # back程序地址
'main_header': 0x0FC000, # main信息地址
'back_header': 0x0FE000, # back信息地址
}
MemoryMap_460 = {
'image_size': 0x058000, # 镜像文件大小
'app_size': 0x024000, # 应用程序大小
'boot_size': 0x010000, # Boot程序大小
'boot_addr': 0x000000, # Boot程序地址
'main_addr': 0x00C000, # main程序地址
'back_addr': 0x030000, # back程序地址
'main_header': 0x054000, # main信息地址
'back_header': 0x056000, # back信息地址
}
class LaminaAdapter(DeviceSerial):
""" 叠光适配器\优化器 """ 叠光适配器\优化器
使用继承方式实现功能
""" """
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): def __init__(self, com_name,
# 初始化串口通信 addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA],
if com_name is not None: addr_modbus=0x01, type_dev='SLCP101', **kwargs):
com_config = {} """ 调用超类实现函数初始化 """
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 super().__init__(com_name,
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' callbacks=(lambda : protocols.make_frame_dlt645(self.block),
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 lambda frame: protocols.check_frame_dlt645(frame, self.block)),
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 **kwargs)
self.__com = Serial(com_name, **com_config)
else: self.device = type_dev
self.__com =None match type_dev:
case 'SLCP001':
self.flag_print = 'frame_print' in kwargs.keys() self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP001', *args, **kwargs)
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.make_image = lambda *args, **kwargs: GenerateImage('SLCP001', *args, **kwargs)
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 case 'SLCP101':
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP101', *args, **kwargs)
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 self.make_image = lambda *args, **kwargs: GenerateImage('SLCP101', *args, **kwargs)
case 'SLCP102':
self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP102', *args, **kwargs)
self.make_image = lambda *args, **kwargs: GenerateImage('SLCP102', *args, **kwargs)
case 'DLSY001':
self.make_package = lambda *args, **kwargs: GeneratePackage('DLSY001', *args, **kwargs)
self.make_image = lambda *args, **kwargs: GenerateImage('DLSY001', *args, **kwargs)
case _:
self.make_package = None
self.make_image = None
self.block = { self.block = {
'addr' : addr_645, 'addr' : addr_645,
@@ -150,160 +183,215 @@ class LaminaAdapter:
'data_define': ParamMap_LaminaAdapter, 'data_define': ParamMap_LaminaAdapter,
}, },
} }
self.output = {
'result': False,
'code_func': 0x00,
}
self.log = {
'send': 0,
'read': 0,
'keep-fail': 0,
'record': {
'config': None,
'data': None,
},
}
def __read_frame(self) -> bool: def frame_read(self, daddr=0x60, dlen=0x50) -> bool:
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
frame_recv = b''
time_start, time_current, flag_frame = time.time(), time.time(), False
while (time_current - time_start) < self.time_out:
time.sleep(self.time_gap)
bytes_read = self.__com.read_all()
time_current = time.time()
if flag_frame and len(bytes_read) == 0:
break
elif len(bytes_read):
flag_frame = True
frame_recv += bytes_read
try:
self.output = function.frame.check_frame_dlt645(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 报文数据传输 """
if self.__com is None:
print(tools.ByteConv.trans_list_to_str(frame))
return False
fail_count = 0
while fail_count < self.retry:
frame_discard = self.__com.read_all()
self.__com.write(frame)
self.log['send'] += 1
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame))
if self.__read_frame():
if 'Regs' in self.output.keys():
function.frame.print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
fail_count += 1
if fail_count >= self.log['keep-fail']:
self.log['keep-fail'] = fail_count
time.sleep(2*self.time_out)
return fail_count < self.retry
def frame_read(self, daddr=0x60, dlen=0x50):
self.block['data']['type'] = 'read' self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen self.block['data']['data_len'] = dlen
frame = function.frame.make_frame_dlt645(self.block) return self._transfer_data()
return self.__transfer_data(frame) def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
def frame_write_one(self, daddr=0x85, dval=-900):
self.block['data']['type'] = 'write_one' self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
frame = function.frame.make_frame_dlt645(self.block) item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
return self._transfer_data()
return self.__transfer_data(frame) def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
def frame_write_dual(self, daddr=0x91, dval=600):
self.block['data']['type'] = 'write_dual' self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
frame = function.frame.make_frame_dlt645(self.block) item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
return self._transfer_data()
return self.__transfer_data(frame) def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
self.block['data']['type'] = 'write_str' self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
frame = function.frame.make_frame_dlt645(self.block) return self._transfer_data()
return self.__transfer_data(frame) def frame_update(self, path_file: Path, makefile: bool = False) -> bool:
def frame_update(self, path_bin):
""" 程序升级 """ 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
""" """
param_saved = self.flag_print, self.retry, self.time_out param_saved = self.flag_print, self.retry, self.time_out
self.flag_print = False self.flag_print = False
try:
status = 'init' # 初始化
if not path_file.exists():
raise Exception("工程编译目标文件不存在.")
if makefile and self.make_package is not None:
self.block['data']['file'] = self.make_package(path_file)
else:
self.block['data']['file'] = path_file.read_bytes()
self.block['data']['type'] = 'update' self.block['data']['type'] = 'update'
self.block['data']['step'] = 'start' self.block['data']['header_offset'] = 184
self.block['data']['index'] = 0
self.block['data']['file'] = Path(path_bin).read_bytes()
self.block['data']['header_offset'] = 184
# 启动帧
frame_master = bytearray(function.frame.make_frame_dlt645(self.block))
if not self.__transfer_data(frame_master): status = 'start' # 启动帧
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
assert self._transfer_data()
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
status = 'trans' # 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
assert self._transfer_data()
status = 'end' # 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
assert self._transfer_data()
except Exception as ex:
""" 通用异常处理 """
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start') report = f'Upgrade Fail: {status}'
return False report += f', Frame in {self.block["data"]["index"]}' if status == 'trans' else ''
self.block["data"]['file_block_size'] = self.output['upgrade']['length'] report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else ''
print(report)
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['data']['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = function.frame.make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = function.frame.make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False return False
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
return True return True
def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray:
""" 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(type_dev.encode('ascii')), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
match type_dev:
case 'SLCP001':
MemoryMap = MemoryMap_SLCP001
case 'SLCP101':
MemoryMap = MemoryMap_460
case 'SLCP102':
MemoryMap = MemoryMap_460
case 'DLSY001':
MemoryMap = MemoryMap_460
case _:
raise Exception("Unknow device.")
bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=MemoryMap['app_size'])
encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
# 额外处理
if 'output_bin' in kwargs.keys() and kwargs['output_bin']:
(path_hex / (path_hex.stem + '.bin')).write_bytes(bin_main)
return bytearray(Image)
def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Path, **kwargs) ->bytearray:
""" 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(type_dev.encode('ascii')), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
match type_dev:
case 'SLCP001':
MemoryMap = MemoryMap_SLCP001
case 'SLCP101':
MemoryMap = MemoryMap_460
case 'SLCP102':
MemoryMap = MemoryMap_460
case 'DLSY001':
MemoryMap = MemoryMap_460
case _:
raise Exception("Unknow device.")
bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=MemoryMap['boot_size'])
bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=MemoryMap['app_size'])
bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=MemoryMap['app_size'])
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
# 组装镜像
Image = [0xFF] * MemoryMap['image_size']
Image[MemoryMap['boot_addr']: MemoryMap['boot_addr'] + len(bin_boot)] = bin_boot
Image[MemoryMap['main_addr']: MemoryMap['main_addr'] + len(bin_main)] = bin_main
Image[MemoryMap['back_addr']: MemoryMap['back_addr'] + len(bin_back)] = bin_back
Image[MemoryMap['main_header']: MemoryMap['main_header'] + len(main_header)] = main_header
Image[MemoryMap['back_header']: MemoryMap['back_header'] + len(back_header)] = back_header
# Log打印
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 额外文件
if 'output_header' in kwargs.keys() and kwargs['output_header']:
(path_main.parent / (path_main.stem + '.header')).write_bytes(main_header)
(path_back.parent / (path_back.stem + '.header')).write_bytes(back_header)
if 'output_bin' in kwargs.keys() and kwargs['output_bin']:
(path_main.parent / (path_main.stem + '.bin')).write_bytes(bin_main)
(path_back.parent / (path_back.stem + '.bin')).write_bytes(bin_back)
if 'output_encrypt' in kwargs.keys() and kwargs['output_encrypt']:
encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = file_upgrade.file_encryption(bin_back)
(path_main.parent / (path_main.stem + '.encrypt')).write_bytes(encrypt_main)
(path_back.parent / (path_back.stem + '.encrypt')).write_bytes(encrypt_back)
return bytearray(Image)
def GeneratePackage_SLCP001_p4a0(path_hex: Path): def GeneratePackage_SLCP001_p4a0(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """ """ 叠光适配器-460平台版本 生成升级包 """
config = { config = {
@@ -313,16 +401,16 @@ def GeneratePackage_SLCP001_p4a0(path_hex: Path):
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000) bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Package generated successfully.") print("Package generated successfully.")
@@ -348,16 +436,16 @@ def GeneratePackage_SLCP101_p460(path_hex: Path):
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Package generated successfully.") print("Package generated successfully.")
@@ -384,16 +472,16 @@ def GeneratePackage_SLCP102_p460(path_hex: Path):
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Package generated successfully.") print("Package generated successfully.")
@@ -420,28 +508,28 @@ def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back) encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_back) md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=function.file_upgrade.build_header_new(config)) is None: if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.") print("Merge Image generated successfully.")
@@ -475,28 +563,28 @@ def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back) encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_back) md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=function.file_upgrade.build_header_new(config)) is None: if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.") print("Merge Image generated successfully.")
@@ -530,28 +618,28 @@ def GenerateImage_SLCP102_p460(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back) encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_back) md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=function.file_upgrade.build_header_new(config)) is None: if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.") print("Merge Image generated successfully.")
@@ -585,16 +673,16 @@ def GeneratePackage_DLSY001_p460(path_hex: Path):
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Package generated successfully.") print("Package generated successfully.")
@@ -621,28 +709,28 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型 'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区 'area_code': [0x00, 0x00], # 地区
} }
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back) encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_main) md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=function.file_upgrade.build_header_new(config)) is None: if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5() md5_ctx = hashlib.md5()
md5_ctx.update(bin_back) md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest()) config["md5"] = list(md5_ctx.digest())
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name'])) config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=function.file_upgrade.build_header_new(config)) is None: if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ") raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.") print("Merge Image generated successfully.")

View File

@@ -7,6 +7,9 @@ from serial import Serial
from . import tools from . import tools
from . import function from . import function
from .tools import ByteConv, IntelHex
from .function import protocols, file_upgrade
from .DeviceSerial import DeviceSerial
ParamMap_LaminaController = { ParamMap_LaminaController = {
# 1 - Hex # 1 - Hex
@@ -112,6 +115,19 @@ ParamMap_LaminaController = {
0x190: ["出厂日期批次", 4, 16], 0x190: ["出厂日期批次", 4, 16],
} }
MemoryMap_280039 = {
'image_size': 2*0x030000, # 镜像文件大小
'app_size': 2*0x004000, # 应用程序大小
'boot_size': 2*0x014000, # Boot程序大小
'boot_addr': 0x000000, # Boot程序地址
'main_header': 0x006000, # main信息地址
'back_header': 0x007000, # back信息地址
'main_info': 0x088000, # main版本地址
'back_info': 0x007000, # back版本地址
'main_addr': 0x008000, # main程序地址
'back_addr': 0x01C000, # back程序地址
}
class LaminaController: class LaminaController:
""" 叠光控制器 """ 叠光控制器
@@ -169,7 +185,7 @@ class LaminaController:
flag_frame = True flag_frame = True
frame_recv += bytes_read frame_recv += bytes_read
try: try:
self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) self.output = function.protocols.check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print: if self.flag_print:
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
except Exception as ex: except Exception as ex:
@@ -199,7 +215,7 @@ class LaminaController:
time.sleep(10 * self.time_gap) time.sleep(10 * self.time_gap)
if self.__read_frame(): if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys(): if (self.flag_print is not None) and 'Regs' in self.output.keys():
function.frame.print_display(self.output['Regs']) function.protocols.print_display(self.output['Regs'])
self.log['read'] += 1 self.log['read'] += 1
break break
else: else:
@@ -214,7 +230,7 @@ class LaminaController:
self.block['data']['type'] = 'read' self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen self.block['data']['data_len'] = dlen
frame = function.frame.make_frame_modbus(self.block['data']) frame = function.protocols.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
@@ -223,7 +239,7 @@ class LaminaController:
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff) self.block['data']['data_val'] = int(dval * item_coff)
frame = function.frame.make_frame_modbus(self.block['data']) frame = function.protocols.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
@@ -232,7 +248,7 @@ class LaminaController:
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff) self.block['data']['data_val'] = int(dval * item_coff)
frame = function.frame.make_frame_modbus(self.block['data']) frame = function.protocols.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
@@ -240,7 +256,7 @@ class LaminaController:
self.block['data']['type'] = 'write_str' self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
frame = function.frame.make_frame_modbus(self.block['data']) frame = function.protocols.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame) return self.__transfer_data(frame)
@@ -255,7 +271,7 @@ class LaminaController:
# 读取config # 读取config
self.block['data']['type'] = 'record_cfg' self.block['data']['type'] = 'record_cfg'
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
ret, pbar = True, None ret, pbar = True, None
frame_data_cfg = [] frame_data_cfg = []
@@ -265,7 +281,7 @@ class LaminaController:
if self.output['record']['seq'] == 0: if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
self.block['data']['step'] = 'next' self.block['data']['step'] = 'next'
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']: elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False ret = False
pbar and pbar.update() pbar and pbar.update()
@@ -274,7 +290,7 @@ class LaminaController:
# 读取data # 读取data
self.block['data']['type'] = 'record_data' self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
ret, pbar = True, None ret, pbar = True, None
frame_data_record = [] frame_data_record = []
@@ -284,7 +300,7 @@ class LaminaController:
if self.output['record']['seq'] == 0: if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
self.block['data']['step'] = 'next' self.block['data']['step'] = 'next'
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']: elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False ret = False
pbar and pbar.update() pbar and pbar.update()
@@ -442,7 +458,7 @@ class LaminaController:
# 启动帧 # 启动帧
self.block['data']['step'] = 'start' self.block['data']['step'] = 'start'
self.block['data']['index'] = 0 self.block['data']['index'] = 0
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master): if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start') print('Upgrade Fail: start')
@@ -460,7 +476,7 @@ class LaminaController:
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"): for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx self.block["data"]['index'] = idx
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master): if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}') print(f'Upgrade Fail: trans data in {idx}')
@@ -470,7 +486,7 @@ class LaminaController:
self.time_out = 1 self.time_out = 1
self.block["data"]['step'] = 'end' self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1 self.block["data"]['index'] += 1
frame_master = function.frame.make_frame_modbus(self.block['data']) frame_master = function.protocols.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master): if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end') print(f'Upgrade Fail: end')
@@ -480,6 +496,413 @@ class LaminaController:
return True return True
class LaminaController_new(DeviceSerial):
""" 叠光控制器
"""
def __init__(self, com_name="COM16", addr_modbus=0x01, type_dev='DLSP001', **kwargs):
""" 调用超类实现函数初始化 """
super().__init__(com_name,
callbacks=(lambda : protocols.make_frame_modbus(self.block),
lambda frame: protocols.check_frame_modbus(frame, self.block)),
**kwargs)
self.device = type_dev
match type_dev:
case 'DLSP001':
self.make_package = lambda *args, **kwargs: GeneratePackage('DLSP001', *args, **kwargs)
self.make_image = lambda *args, **kwargs: GenerateImage('DLSP001', *args, **kwargs)
case _:
self.make_package = None
self.make_image = None
self.block = {
'addr_dev' : addr_modbus,
'data_define': ParamMap_LaminaController,
}
def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
self.block['type'] = 'read'
self.block['data_addr'] = daddr
self.block['data_len'] = dlen
return self._transfer_data()
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['type'] = 'write_one'
self.block['data_addr'] = daddr
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
return self._transfer_data()
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['type'] = 'write_dual'
self.block['data_addr'] = daddr
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
return self._transfer_data()
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
self.block['type'] = 'write_str'
self.block['data_addr'] = daddr
self.block['data_val'] = dval
return self._transfer_data()
def frame_record(self) -> bool:
""" 读取录波数据
"""
param_saved = self.flag_print, self.retry, self.time_out
self.flag_print = False
self.retry = 3
self.time_out = 1.5
self.block['file_block_size'] = 240
# 读取config
self.block['type'] = 'record_cfg'
self.block['step'] = 'start'
frame_master = function.protocols.make_frame_modbus(self.block)
ret, pbar = True, None
frame_data_cfg = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_cfg.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
self.block['step'] = 'next'
frame_master = function.protocols.make_frame_modbus(self.block)
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
# 读取data
self.block['type'] = 'record_data'
self.block['step'] = 'start'
frame_master = function.protocols.make_frame_modbus(self.block)
ret, pbar = True, None
frame_data_record = []
while ret:
if ret := self.__transfer_data(frame_master):
frame_data_record.append(self.output['record'])
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
self.block['step'] = 'next'
frame_master = function.protocols.make_frame_modbus(self.block)
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
pbar and pbar.close()
self.flag_print, self.retry, self.time_out = param_saved
if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3):
print("Operation_Record: 未取得录波数据.")
return False
# 处理配置信息
data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg])
config_record = {'LinePos': []}
pos = 0
if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2])
pos += 2
config_record['FaultWord'] = []
for i in range(0, len_faultword, 2):
faultword = data_cfg_bare[pos+i+1] * 0x100 + data_cfg_bare[pos+i]
config_record['FaultWord'].append(faultword)
pos += len_faultword
config_record['SysStatus'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultRecordPosition'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FaultNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Alg'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['ChannelDescription'] = []
config_record['ChannelCoefficient'] = []
for i in range(config_record['ChannelNum']):
len_str = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
ch_desc = []
for j in range(0, len_str, 2):
ch_desc.append(data_cfg_bare[pos + j])
pos += len_str
ch_coe = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['ChannelDescription'].append(bytearray(ch_desc).decode())
config_record['ChannelCoefficient'].append(ch_coe)
if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['FreqNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SampleFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['SamplePoint'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['DataType'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2
if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5:
Warning("config 配置文件格式异常")
pos += 2
config_record['LinePos'].append(pos)
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['StartTime'] = time_stamp
pos += 14
time_stamp = {
'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos],
'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2],
'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4],
'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6],
'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8],
'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] +
(data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001,
}
config_record['TriggerTime'] = time_stamp
self.log['record']['config'] = config_record
self.log['record']['bare_config'] = data_cfg_bare
# 处理录波数据
data_record_bare = b"".join([x['data'] for x in frame_data_record])
data_record = []
pos = 0
while pos < len(data_record_bare):
record_point = {
'index': data_record_bare[pos+3] * 0x1000 +
data_record_bare[pos+2] * 0x100 +
data_record_bare[pos+1] * 0x10 +
data_record_bare[pos],
'timestamp': data_record_bare[pos+7] * 0x1000 +
data_record_bare[pos+6] * 0x100 +
data_record_bare[pos+5] * 0x10 +
data_record_bare[pos + 4],
'ChAlg': [],
'ChDgt': [],
}
pos += 8
for i in range(config_record['ChNum_Alg']):
point_data_alg = data_record_bare[pos+1] * 0x100 + data_record_bare[pos]
if data_record_bare[pos+1] & 0x80:
point_data_alg -= 0x10000
record_point['ChAlg'].append(point_data_alg)
pos += 2
for i in range(config_record['ChNum_Dgt']):
point_data_dgt = (data_record_bare[pos+(i // 8)] & (0x01 << (i % 8))) == (0x01 << (i % 8))
record_point['ChDgt'].append(point_data_dgt)
pos += 2
data_record.append(record_point)
self.log['record']['data'] = data_record
self.log['record']['bare_data'] = data_record_bare
return True
def frame_update(self, path_bin: Path, makefile: bool = False) -> bool:
""" 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
"""
if makefile:
self.block['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin)
else:
self.block['file'] = path_bin.read_bytes()
self.block['header_offset'] = 184
self.block['type'] = 'update'
param_saved = self.flag_print, self.retry, self.time_out
self.retry = 5
self.time_out = 0.5
self.flag_print = False
# 启动帧
self.block['step'] = 'start'
self.block['index'] = 0
frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
return False
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False
self.flag_print, self.retry, self.time_out = param_saved
return True
def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray:
""" 生成升级包 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(type_dev.encode('ascii')), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
match type_dev:
case 'DLSP001':
MemoryMap = MemoryMap_280039
case _:
raise Exception("Unknow device.")
bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=MemoryMap['app_size'], conv_end=False)
encrypt_main = file_upgrade.file_encryption(bin_main)
info_version = file_upgrade.parser_version_info(bin_main, 0x88000)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(info_version['name'])[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image)
def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Path, **kwargs) ->bytearray:
""" 镜像生成 """
config = {
'prod_type': [0x46, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(type_dev.encode('ascii')), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
match type_dev:
case 'DLSP001':
MemoryMap = MemoryMap_280039
case _:
raise Exception("Unknow device.")
bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=MemoryMap['boot_size'], conv_end=False)
bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=MemoryMap['app_size'], conv_end=False)
bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=MemoryMap['app_size'], conv_end=False)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x02, 0x00] # 备份程序
config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
# 组装镜像
main_header_buffer = bytearray()
back_header_buffer = bytearray()
for byte_org in main_header:
main_header_buffer.extend(bytearray([0x00, byte_org]))
for byte_org in back_header:
back_header_buffer.extend(bytearray([0x00, byte_org]))
Image = [0xFF] * MemoryMap['image_size']
Image[MemoryMap['boot_addr']: MemoryMap['boot_addr'] + len(bin_boot)] = bin_boot
Image[MemoryMap['main_addr']: MemoryMap['main_addr'] + len(main_header_buffer)] = main_header_buffer
Image[MemoryMap['back_addr']: MemoryMap['back_addr'] + len(back_header_buffer)] = back_header_buffer
Image[MemoryMap['main_header']: MemoryMap['main_header'] + len(bin_main)] = bin_main
Image[MemoryMap['back_header']: MemoryMap['back_header'] + len(bin_back)] = bin_back
# Log打印
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 额外文件
if 'output_header' in kwargs.keys() and kwargs['output_header']:
(path_main.parent / (path_main.stem + '.header')).write_bytes(main_header)
(path_back.parent / (path_back.stem + '.header')).write_bytes(back_header)
if 'output_bin' in kwargs.keys() and kwargs['output_bin']:
(path_main.parent / (path_main.stem + '.bin')).write_bytes(bin_main)
(path_back.parent / (path_back.stem + '.bin')).write_bytes(bin_back)
if 'output_encrypt' in kwargs.keys() and kwargs['output_encrypt']:
main_encrypt = file_upgrade.file_encryption(bin_main)
back_encrypt = file_upgrade.file_encryption(bin_back)
(path_main.parent / (path_main.stem + '.encrypt')).write_bytes(main_encrypt)
(path_back.parent / (path_back.stem + '.encrypt')).write_bytes(back_encrypt)
return bytearray(Image)
def GeneratePackage_DLSP001_p280039(path_hex: Path): def GeneratePackage_DLSP001_p280039(path_hex: Path):
""" 叠光控制器DSP-280039平台版本 生成升级包 """ """ 叠光控制器DSP-280039平台版本 生成升级包 """
config = { config = {
@@ -582,4 +1005,5 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P
offset_image = 2 * 0x01C000 offset_image = 2 * 0x01C000
Image[offset_image: offset_image + len(bin_back)] = bin_back Image[offset_image: offset_image + len(bin_back)] = bin_back
return bytearray(Image), main_header, back_header return bytearray(Image), main_header, back_header

View File

@@ -1,2 +1,2 @@
from . import frame from . import protocols
from . import file_upgrade from . import file_upgrade

View File

@@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
from device.LaminaAdapter import LaminaAdapter
from device.LaminaAdapter import GenerateImage_SLCP001_p4a0, GeneratePackage_SLCP001_p4a0 from device.LaminaAdapter import GenerateImage_SLCP001_p4a0, GeneratePackage_SLCP001_p4a0
from device.LaminaAdapter import GenerateImage_SLCP101_p460, GeneratePackage_SLCP101_p460 from device.LaminaAdapter import GenerateImage_SLCP101_p460, GeneratePackage_SLCP101_p460
from device.LaminaAdapter import GenerateImage_SLCP102_p460, GeneratePackage_SLCP102_p460 from device.LaminaAdapter import GenerateImage_SLCP102_p460, GeneratePackage_SLCP102_p460
@@ -198,6 +199,51 @@ def Process2():
file_image3.write_bytes(data_image) file_image3.write_bytes(data_image)
def Process(type_dev: str, path_boot: Path, path_project: Path):
""" 镜像生成流程 """
root_boot = path_boot
root_main = path_project
result = root_main
# 正常启动镜像
hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / "lamina_adapter.hex"
hex_back = root_main / "lamina_adapter_back.hex"
hex_update = hex_main
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
file_image = result / f'{hex_main.stem}_ROM.bin'
file_package = result / f'{hex_update.stem}.dat'
dev_lamina = LaminaAdapter(None, type_dev=type_dev)
data_package = dev_lamina.make_package(hex_update)
data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True)
file_image.write_bytes(data_image)
file_package.write_bytes(data_package)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image_copy = data_image
data_image_copy[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image_copy)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image_copy = data_image
data_image_copy[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image_copy)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image_copy = data_image
data_image_copy[0x054018: 0x05401A] = [0x00, 0x01]
data_image_copy[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image_copy)
if __name__ == "__main__": if __name__ == "__main__":
path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
@@ -207,3 +253,8 @@ if __name__ == "__main__":
Process1(path_boot2, path_main) # 适配器SLCP101 Process1(path_boot2, path_main) # 适配器SLCP101
# Process1_v2(path_boot2, path_main) # 适配器SLCP102 # Process1_v2(path_boot2, path_main) # 适配器SLCP102
# Process2() # Process2()
path_boot1 = Path(r"resource/460-PROJ_STACKLIGHT_OPTIMIZER")
path_boot2 = Path(r"resource/460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
path_project = Path(r"resource\SLCP101")
Process('SLCP101', path_boot2, path_project)