diff --git a/source/dev_LaminaAdapter.py b/source/dev_LaminaAdapter.py index f6f87f6..24df4ce 100644 --- a/source/dev_LaminaAdapter.py +++ b/source/dev_LaminaAdapter.py @@ -1,6 +1,5 @@ import time from pathlib import Path -from typing import Callable, Any from device.LaminaAdapter import LaminaAdapter from device.LaminaAdapter import GeneratePackage_SLCP001_p4a0, GeneratePackage_SLCP101_p460, GeneratePackage_DLSY001_p460 from device.LaminaAdapter import GeneratePackage_SLCP102_p460 diff --git a/source/device/DeviceSerial.py b/source/device/DeviceSerial.py index 0f94ae4..88a6264 100644 --- a/source/device/DeviceSerial.py +++ b/source/device/DeviceSerial.py @@ -1,30 +1,33 @@ import time from serial import Serial +from serial.tools import list_ports -from . import tools -from . import function +from .tools import ByteConv +from .function import protocols class DeviceSerial: """ 串口通信设备原型 Note: 串口资源释放与重复开启 """ - def __init__(self, com_name="COM1", **kwargs): + def __init__(self, com_name, callbacks, **kwargs): """ 初始化设备 """ + self.__com = None if com_name is not None: - com_config = {} - com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 - com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' - com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 - com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 - self.__com = Serial(com_name, timeout=0, **com_config) - else: - self.__com = None + self.open_connection(com_name, **kwargs) self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 + match callbacks: + case (maker, parser): + self.frame_maker = maker if maker is not None else lambda self: '' + self.frame_parser = parser if parser is not None else lambda self, frame: '' + case _: + self.farme_maker = lambda self: '' + self.frame_parser = lambda self, frame: '' + self.output = { 'result': False, 'code_func': 0x00, @@ -45,28 +48,33 @@ class DeviceSerial: time_start, time_current, flag_frame = time.time(), time.time(), False while (time_current - time_start) < self.time_out: time.sleep(self.time_gap) - bytes_read = self.__com.read_all() time_current = time.time() + bytes_read = self.__com.read_all() if flag_frame and len(bytes_read) == 0: break elif len(bytes_read): flag_frame = True frame_recv += bytes_read try: - self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) + self.output = self.frame_parser(frame_recv) if self.flag_print: - print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) + print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: - print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) + print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] - def __transfer_data(self, frame: bytearray) -> bool: + + def _transfer_data(self) -> bool: """ 串口收发报文, 包含重试逻辑与数据打印 """ + # 生成发送帧 + frame: bytearray = self.frame_maker() + if self.__com is None: - print(tools.ByteConv.trans_list_to_str(frame)) + """ 无效通信接口, 打印报文后返回 """ + print(ByteConv.trans_list_to_str(frame)) return False fail_count = 0 @@ -78,22 +86,82 @@ class DeviceSerial: if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: - print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) + print("Send Frame: ", ByteConv.trans_list_to_str(frame)) - time.sleep(10 * self.time_gap) + time.sleep(2 * self.time_gap) if self.__read_frame(): if (self.flag_print is not None) and 'Regs' in self.output.keys(): - function.frame.print_display(self.output['Regs']) + protocols.print_display(self.output['Regs']) self.log['read'] += 1 break - else: - fail_count += 1 - if fail_count >= self.log['keep-fail']: - self.log['keep-fail'] = fail_count - time.sleep(2*self.time_out) + + fail_count += 1 + self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail'] + time.sleep(2 * self.time_out) return fail_count < self.retry + def close_connection(self) -> bool: + """ 关闭连接, 释放通信资源 + + """ + if self.__com is not None: + self.__com.close() + return self.__com is None or (not self.__com.is_open) + + def open_connection(self, port=None, **kwargs) -> bool: + """ 打开连接, 更新或重新配置通信资源 + + """ + com_config = { + 'baudrate': 115200, + 'parity': 'N', + 'bytesize': 8, + 'stopbits': 1, + } + + serial_close = lambda com: com.close() if com.isOpen() else None + serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports()) + + match (self.__com, port, kwargs): + case (None, str() as port, None): + """ 使用默认参数打开串口 """ + self.__com = Serial(port, timeout=0, **com_config) + case (None, str() as port, dict() as kwargs): + """ 使用指定参数打开串口 """ + com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] + com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] + com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] + com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits'] + self.__com = Serial(port, timeout=0, **com_config) + case (Serial() as com, None, None): + """ 无参数重开串口 """ + serial_close(self.__com) + com.open() + case (Serial() as com, port, None): + """ 重新指定端口号并打开串口 """ + serial_close(self.__com) + if serial_port_check(port): + raise ValueError("无效串口端口: %s" % port) + com.port = port + com.open() + case (None, str() as port, dict() as kwargs): + """ 重新指定端口号与配置并打开串口 """ + serial_close(self.__com) + if serial_port_check(port): + raise ValueError("无效串口端口: %s" % port) + com.port = port + com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] + com.parity = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] + com.bytesize = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] + com.stopbits = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits'] + com.open() + case _: + """ 匹配失败, 报错 """ + raise ValueError("Invalid config.") + + return self.__com.is_open + diff --git a/source/device/EnergyRouter.py b/source/device/EnergyRouter.py index faf2cb0..3da41a2 100644 --- a/source/device/EnergyRouter.py +++ b/source/device/EnergyRouter.py @@ -54,11 +54,11 @@ class EnergyRouter: time.sleep(self.time_out) frame_recv = self.tcp_socket.recv(128) - self.output = function.frame.check_frame_modbus(frame_recv, self.block) + self.output = function.protocols.check_frame_modbus(frame_recv, self.block) if self.flag_print: print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) if 'Regs' in self.output.keys(): - function.frame.print_display(self.output['Regs']) + function.protocols.print_display(self.output['Regs']) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: @@ -71,7 +71,7 @@ class EnergyRouter: self.block['type'] = 'read' self.block['data_addr'] = daddr self.block['data_len'] = dlen - frame = function.frame.make_frame_modbus(self.block) + frame = function.protocols.make_frame_modbus(self.block) return self.__transfer_data(frame) @@ -86,7 +86,7 @@ class EnergyRouter: self.block['file'] = Path(path_bin).read_bytes() self.block['header_offset'] = 128 # 启动帧 - frame_master = function.frame.make_frame_modbus(self.block) + frame_master = function.protocols.make_frame_modbus(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved @@ -116,7 +116,7 @@ class EnergyRouter: continue seq_window[i] = 1 self.block['index'] = seq_offset + i - seq_frame_master[i] = function.frame.make_frame_modbus(self.block) + seq_frame_master[i] = function.protocols.make_frame_modbus(self.block) self.tcp_socket.send(seq_frame_master[i]) # 接收帧回复 tmp = list(zip(range(len(seq_window)), seq_window)) @@ -126,7 +126,7 @@ class EnergyRouter: # 接收到空数据, 对端已关闭连接 if seq_frame_slave[i] == '': raise Exception("TCP closed.") - self.output = function.frame.check_frame_modbus(seq_frame_slave[i], None) + self.output = function.protocols.check_frame_modbus(seq_frame_slave[i], None) seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope'] if seq_current < seq_offset: raise Exception("Error.") @@ -152,14 +152,14 @@ class EnergyRouter: # 结束升级 self.block['step'] = 'end' - frame_master = function.frame.make_frame_modbus(self.block) + frame_master = function.protocols.make_frame_modbus(self.block) while self.output['result'] is False: self.tcp_socket.send(frame_master) frame_slave = self.tcp_socket.recv(8) if frame_slave == '': raise Exception("TCP closed.") - self.output = function.frame.check_frame_modbus(frame_slave[:18], self.block) + self.output = function.protocols.check_frame_modbus(frame_slave[:18], self.block) def GeneratePackage_Demo_Xilinx(path_bin: Path): diff --git a/source/device/LaminaAdapter.py b/source/device/LaminaAdapter.py index 1de9088..c96c349 100644 --- a/source/device/LaminaAdapter.py +++ b/source/device/LaminaAdapter.py @@ -3,10 +3,11 @@ import hashlib from math import ceil from tqdm import tqdm from pathlib import Path -from serial import Serial from . import tools -from . import function +from .tools import ByteConv, IntelHex +from .function import protocols, file_upgrade +from .DeviceSerial import DeviceSerial ParamMap_LaminaAdapter = { # 1 - Hex @@ -120,27 +121,59 @@ ParamMap_LaminaAdapter = { 0x190: ["Datetime", 4, 16], } +MemoryMap_SLCP001 = { + 'image_size': 0x100000, # 镜像文件大小 + 'app_size': 0x040000, # 应用程序大小 + 'boot_size': 0x010000, # Boot程序大小 -class LaminaAdapter: + 'boot_addr': 0x000000, # Boot程序地址 + 'main_addr': 0x010000, # main程序地址 + 'back_addr': 0x0BC000, # back程序地址 + 'main_header': 0x0FC000, # main信息地址 + 'back_header': 0x0FE000, # back信息地址 +} +MemoryMap_460 = { + 'image_size': 0x058000, # 镜像文件大小 + 'app_size': 0x024000, # 应用程序大小 + 'boot_size': 0x010000, # Boot程序大小 + + 'boot_addr': 0x000000, # Boot程序地址 + 'main_addr': 0x00C000, # main程序地址 + 'back_addr': 0x030000, # back程序地址 + 'main_header': 0x054000, # main信息地址 + 'back_header': 0x056000, # back信息地址 +} + +class LaminaAdapter(DeviceSerial): """ 叠光适配器\优化器 + 使用继承方式实现功能 """ - def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): - # 初始化串口通信 - if com_name is not None: - com_config = {} - com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 - com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' - com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 - com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 - self.__com = Serial(com_name, **com_config) - else: - self.__com =None - - self.flag_print = 'frame_print' in kwargs.keys() - self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 - self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 - self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 - self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 + def __init__(self, com_name, + addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], + addr_modbus=0x01, type_dev='SLCP101', **kwargs): + """ 调用超类实现函数初始化 """ + super().__init__(com_name, + callbacks=(lambda : protocols.make_frame_dlt645(self.block), + lambda frame: protocols.check_frame_dlt645(frame, self.block)), + **kwargs) + + self.device = type_dev + match type_dev: + case 'SLCP001': + self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP001', *args, **kwargs) + self.make_image = lambda *args, **kwargs: GenerateImage('SLCP001', *args, **kwargs) + case 'SLCP101': + self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP101', *args, **kwargs) + self.make_image = lambda *args, **kwargs: GenerateImage('SLCP101', *args, **kwargs) + case 'SLCP102': + self.make_package = lambda *args, **kwargs: GeneratePackage('SLCP102', *args, **kwargs) + self.make_image = lambda *args, **kwargs: GenerateImage('SLCP102', *args, **kwargs) + case 'DLSY001': + self.make_package = lambda *args, **kwargs: GeneratePackage('DLSY001', *args, **kwargs) + self.make_image = lambda *args, **kwargs: GenerateImage('DLSY001', *args, **kwargs) + case _: + self.make_package = None + self.make_image = None self.block = { 'addr' : addr_645, @@ -150,160 +183,215 @@ class LaminaAdapter: 'data_define': ParamMap_LaminaAdapter, }, } - self.output = { - 'result': False, - 'code_func': 0x00, - } - self.log = { - 'send': 0, - 'read': 0, - 'keep-fail': 0, - 'record': { - 'config': None, - 'data': None, - }, - } - def __read_frame(self) -> bool: - """ 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """ - frame_recv = b'' - time_start, time_current, flag_frame = time.time(), time.time(), False - while (time_current - time_start) < self.time_out: - time.sleep(self.time_gap) - bytes_read = self.__com.read_all() - time_current = time.time() - if flag_frame and len(bytes_read) == 0: - break - elif len(bytes_read): - flag_frame = True - frame_recv += bytes_read - try: - self.output = function.frame.check_frame_dlt645(frame_recv, self.block) - if self.flag_print: - print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) - except Exception as ex: - print("Error Info: ", ex) - if self.flag_print and frame_recv: - print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) - self.output['result'] = False - - return self.output['result'] - - def __transfer_data(self, frame: bytearray) -> bool: - """ 报文数据传输 """ - if self.__com is None: - print(tools.ByteConv.trans_list_to_str(frame)) - return False - - fail_count = 0 - while fail_count < self.retry: - frame_discard = self.__com.read_all() - self.__com.write(frame) - self.log['send'] += 1 - - if self.flag_print and frame_discard: - print("Discard Data: " , frame_discard) - if self.flag_print: - print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) - - if self.__read_frame(): - if 'Regs' in self.output.keys(): - function.frame.print_display(self.output['Regs']) - self.log['read'] += 1 - break - else: - fail_count += 1 - if fail_count >= self.log['keep-fail']: - self.log['keep-fail'] = fail_count - time.sleep(2*self.time_out) - - return fail_count < self.retry - - def frame_read(self, daddr=0x60, dlen=0x50): + def frame_read(self, daddr=0x60, dlen=0x50) -> bool: self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen - frame = function.frame.make_frame_dlt645(self.block) + return self._transfer_data() - return self.__transfer_data(frame) - - def frame_write_one(self, daddr=0x85, dval=-900): + def frame_write_one(self, daddr=0x85, dval=-900) -> bool: self.block['data']['type'] = 'write_one' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = function.frame.make_frame_dlt645(self.block) + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() - return self.__transfer_data(frame) - - def frame_write_dual(self, daddr=0x91, dval=600): + def frame_write_dual(self, daddr=0x91, dval=600) -> bool: self.block['data']['type'] = 'write_dual' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = function.frame.make_frame_dlt645(self.block) + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() - return self.__transfer_data(frame) - - def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = function.frame.make_frame_dlt645(self.block) + return self._transfer_data() - return self.__transfer_data(frame) - - def frame_update(self, path_bin): + def frame_update(self, path_file: Path, makefile: bool = False) -> bool: """ 程序升级 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; """ param_saved = self.flag_print, self.retry, self.time_out self.flag_print = False + try: + status = 'init' # 初始化 + if not path_file.exists(): + raise Exception("工程编译目标文件不存在.") + if makefile and self.make_package is not None: + self.block['data']['file'] = self.make_package(path_file) + else: + self.block['data']['file'] = path_file.read_bytes() - self.block['data']['type'] = 'update' - self.block['data']['step'] = 'start' - self.block['data']['index'] = 0 - self.block['data']['file'] = Path(path_bin).read_bytes() - self.block['data']['header_offset'] = 184 - # 启动帧 - frame_master = bytearray(function.frame.make_frame_dlt645(self.block)) + self.block['data']['type'] = 'update' + self.block['data']['header_offset'] = 184 - if not self.__transfer_data(frame_master): + status = 'start' # 启动帧 + self.block['data']['step'] = 'start' + self.block['data']['index'] = 0 + assert self._transfer_data() + + self.block["data"]['file_block_size'] = self.output['upgrade']['length'] + # 避免接收到延迟返回报文 + time.sleep(self.time_out) + + status = 'trans' # 文件传输 + self.retry = 3 + self.time_out = 1.5 + self.block["data"]['step'] = 'trans' + self.block['data']['index'] = 0 + frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) + for idx in tqdm(range(frame_total), desc="File Transmitting"): + self.block["data"]['index'] = idx + assert self._transfer_data() + + status = 'end' # 结束升级 + self.time_out = 1 + self.block["data"]['step'] = 'end' + self.block["data"]['index'] += 1 + assert self._transfer_data() + except Exception as ex: + """ 通用异常处理 """ self.flag_print, self.retry, self.time_out = param_saved - print('Upgrade Fail: start') - return False - self.block["data"]['file_block_size'] = self.output['upgrade']['length'] - - # 避免接收到延迟返回报文 - time.sleep(self.time_out) - - # 文件传输 - self.retry = 3 - self.time_out = 1.5 - self.block["data"]['step'] = 'trans' - self.block['data']['index'] = 0 - frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) - for idx in tqdm(range(frame_total), desc="File Transmitting"): - self.block["data"]['index'] = idx - frame_master = function.frame.make_frame_dlt645(self.block) - if not self.__transfer_data(frame_master): - self.flag_print, self.retry, self.time_out = param_saved - print(f'Upgrade Fail: trans data in {idx}') - return False - - # 结束升级 - self.time_out = 1 - self.block["data"]['step'] = 'end' - self.block["data"]['index'] += 1 - frame_master = function.frame.make_frame_dlt645(self.block) - if not self.__transfer_data(frame_master): - self.flag_print, self.retry, self.time_out = param_saved - print(f'Upgrade Fail: end') + report = f'Upgrade Fail: {status}' + report += f', Frame in {self.block["data"]["index"]}' if status == 'trans' else '' + report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else '' + print(report) return False self.flag_print, self.retry, self.time_out = param_saved return True +def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray: + """ 生成升级包 """ + config = { + 'prod_type': [0x45, 0x00], # 产品类型 + 'method_compress': False, # 文件压缩 + 'prog_id': list(type_dev.encode('ascii')), # 程序识别号 + 'prog_type': 'app', # 程序类型 + 'area_code': [0x00, 0x00], # 地区 + } + match type_dev: + case 'SLCP001': + MemoryMap = MemoryMap_SLCP001 + case 'SLCP101': + MemoryMap = MemoryMap_460 + case 'SLCP102': + MemoryMap = MemoryMap_460 + case 'DLSY001': + MemoryMap = MemoryMap_460 + case _: + raise Exception("Unknow device.") + + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=MemoryMap['app_size']) + encrypt_main = file_upgrade.file_encryption(bin_main) + + md5_ctx = hashlib.md5() + md5_ctx.update(bin_main) + config["md5"] = list(md5_ctx.digest()) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) + config['hex_name'] = list(path_hex.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (main_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + print("Package generated successfully.") + print(f"File name: {path_hex.name}") + print(f"File Info:") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + + # 组装镜像 + Image = [0xFF] * (len(main_header) + len(encrypt_main)) + offset_image = 0 + Image[offset_image: offset_image + len(main_header)] = main_header + offset_image += len(main_header) + Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main + + # 额外处理 + if 'output_bin' in kwargs.keys() and kwargs['output_bin']: + (path_hex / (path_hex.stem + '.bin')).write_bytes(bin_main) + + return bytearray(Image) + +def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Path, **kwargs) ->bytearray: + """ 镜像生成 """ + config = { + 'prod_type': [0x45, 0x00], # 产品类型 + 'method_compress': False, # 文件压缩 + 'prog_id': list(type_dev.encode('ascii')), # 程序识别号 + 'prog_type': 'app', # 程序类型 + 'area_code': [0x00, 0x00], # 地区 + } + match type_dev: + case 'SLCP001': + MemoryMap = MemoryMap_SLCP001 + case 'SLCP101': + MemoryMap = MemoryMap_460 + case 'SLCP102': + MemoryMap = MemoryMap_460 + case 'DLSY001': + MemoryMap = MemoryMap_460 + case _: + raise Exception("Unknow device.") + + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=MemoryMap['boot_size']) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=MemoryMap['app_size']) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=MemoryMap['app_size']) + + md5_ctx = hashlib.md5() + md5_ctx.update(bin_main) + config["md5"] = list(md5_ctx.digest()) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) + config['hex_name'] = list(path_main.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (main_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + md5_ctx = hashlib.md5() + md5_ctx.update(bin_back) + config["md5"] = list(md5_ctx.digest()) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) + config['hex_name'] = list(path_back.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (back_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + # 组装镜像 + Image = [0xFF] * MemoryMap['image_size'] + Image[MemoryMap['boot_addr']: MemoryMap['boot_addr'] + len(bin_boot)] = bin_boot + Image[MemoryMap['main_addr']: MemoryMap['main_addr'] + len(bin_main)] = bin_main + Image[MemoryMap['back_addr']: MemoryMap['back_addr'] + len(bin_back)] = bin_back + Image[MemoryMap['main_header']: MemoryMap['main_header'] + len(main_header)] = main_header + Image[MemoryMap['back_header']: MemoryMap['back_header'] + len(back_header)] = back_header + + # Log打印 + print("Merge Image generated successfully.") + print(f"Main File:") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + print(f"Back File:") + print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") + + # 额外文件 + if 'output_header' in kwargs.keys() and kwargs['output_header']: + (path_main.parent / (path_main.stem + '.header')).write_bytes(main_header) + (path_back.parent / (path_back.stem + '.header')).write_bytes(back_header) + if 'output_bin' in kwargs.keys() and kwargs['output_bin']: + (path_main.parent / (path_main.stem + '.bin')).write_bytes(bin_main) + (path_back.parent / (path_back.stem + '.bin')).write_bytes(bin_back) + if 'output_encrypt' in kwargs.keys() and kwargs['output_encrypt']: + encrypt_main = file_upgrade.file_encryption(bin_main) + encrypt_back = file_upgrade.file_encryption(bin_back) + (path_main.parent / (path_main.stem + '.encrypt')).write_bytes(encrypt_main) + (path_back.parent / (path_back.stem + '.encrypt')).write_bytes(encrypt_back) + + return bytearray(Image) + def GeneratePackage_SLCP001_p4a0(path_hex: Path): """ 叠光适配器-460平台版本 生成升级包 """ config = { @@ -313,16 +401,16 @@ def GeneratePackage_SLCP001_p4a0(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000) + encrypt_main = file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -348,16 +436,16 @@ def GeneratePackage_SLCP101_p460(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -384,16 +472,16 @@ def GeneratePackage_SLCP102_p460(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -420,28 +508,28 @@ def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) - bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) - encrypt_back = function.file_upgrade.file_encryption(bin_back) + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) + encrypt_main = file_upgrade.file_encryption(bin_main) + encrypt_back = file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=function.file_upgrade.build_header_new(config)) is None: + if (back_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") @@ -475,28 +563,28 @@ def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) - bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) - encrypt_back = function.file_upgrade.file_encryption(bin_back) + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) + encrypt_back = file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=function.file_upgrade.build_header_new(config)) is None: + if (back_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") @@ -530,28 +618,28 @@ def GenerateImage_SLCP102_p460(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) - bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) - encrypt_back = function.file_upgrade.file_encryption(bin_back) + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) + encrypt_back = file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=function.file_upgrade.build_header_new(config)) is None: + if (back_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") @@ -585,16 +673,16 @@ def GeneratePackage_DLSY001_p460(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -621,28 +709,28 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) - bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) - bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) - encrypt_main = function.file_upgrade.file_encryption(bin_main) - encrypt_back = function.file_upgrade.file_encryption(bin_back) + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) + encrypt_main = file_upgrade.file_encryption(bin_main) + encrypt_back = file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=function.file_upgrade.build_header_new(config)) is None: + if (main_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=function.file_upgrade.build_header_new(config)) is None: + if (back_header:=file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") diff --git a/source/device/LaminaController.py b/source/device/LaminaController.py index 7f9a193..e5f36f7 100644 --- a/source/device/LaminaController.py +++ b/source/device/LaminaController.py @@ -7,6 +7,9 @@ from serial import Serial from . import tools from . import function +from .tools import ByteConv, IntelHex +from .function import protocols, file_upgrade +from .DeviceSerial import DeviceSerial ParamMap_LaminaController = { # 1 - Hex @@ -112,6 +115,19 @@ ParamMap_LaminaController = { 0x190: ["出厂日期批次", 4, 16], } +MemoryMap_280039 = { + 'image_size': 2*0x030000, # 镜像文件大小 + 'app_size': 2*0x004000, # 应用程序大小 + 'boot_size': 2*0x014000, # Boot程序大小 + + 'boot_addr': 0x000000, # Boot程序地址 + 'main_header': 0x006000, # main信息地址 + 'back_header': 0x007000, # back信息地址 + 'main_info': 0x088000, # main版本地址 + 'back_info': 0x007000, # back版本地址 + 'main_addr': 0x008000, # main程序地址 + 'back_addr': 0x01C000, # back程序地址 +} class LaminaController: """ 叠光控制器 @@ -169,7 +185,7 @@ class LaminaController: flag_frame = True frame_recv += bytes_read try: - self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) + self.output = function.protocols.check_frame_modbus(frame_recv, self.block['data']) if self.flag_print: print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: @@ -199,7 +215,7 @@ class LaminaController: time.sleep(10 * self.time_gap) if self.__read_frame(): if (self.flag_print is not None) and 'Regs' in self.output.keys(): - function.frame.print_display(self.output['Regs']) + function.protocols.print_display(self.output['Regs']) self.log['read'] += 1 break else: @@ -214,7 +230,7 @@ class LaminaController: self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen - frame = function.frame.make_frame_modbus(self.block['data']) + frame = function.protocols.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -223,7 +239,7 @@ class LaminaController: self.block['data']['data_addr'] = daddr item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data']['data_val'] = int(dval * item_coff) - frame = function.frame.make_frame_modbus(self.block['data']) + frame = function.protocols.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -232,7 +248,7 @@ class LaminaController: self.block['data']['data_addr'] = daddr item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data']['data_val'] = int(dval * item_coff) - frame = function.frame.make_frame_modbus(self.block['data']) + frame = function.protocols.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -240,7 +256,7 @@ class LaminaController: self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = function.frame.make_frame_modbus(self.block['data']) + frame = function.protocols.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -255,7 +271,7 @@ class LaminaController: # 读取config self.block['data']['type'] = 'record_cfg' self.block['data']['step'] = 'start' - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) ret, pbar = True, None frame_data_cfg = [] @@ -265,7 +281,7 @@ class LaminaController: if self.output['record']['seq'] == 0: pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") self.block['data']['step'] = 'next' - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) elif (self.output['record']['seq']) >= self.output['record']['total']: ret = False pbar and pbar.update() @@ -274,7 +290,7 @@ class LaminaController: # 读取data self.block['data']['type'] = 'record_data' self.block['data']['step'] = 'start' - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) ret, pbar = True, None frame_data_record = [] @@ -284,7 +300,7 @@ class LaminaController: if self.output['record']['seq'] == 0: pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") self.block['data']['step'] = 'next' - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) elif (self.output['record']['seq']) >= self.output['record']['total']: ret = False pbar and pbar.update() @@ -442,7 +458,7 @@ class LaminaController: # 启动帧 self.block['data']['step'] = 'start' self.block['data']['index'] = 0 - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print('Upgrade Fail: start') @@ -460,7 +476,7 @@ class LaminaController: frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) for idx in tqdm(range(frame_total), desc="File Transmitting"): self.block["data"]['index'] = idx - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: trans data in {idx}') @@ -470,7 +486,7 @@ class LaminaController: self.time_out = 1 self.block["data"]['step'] = 'end' self.block["data"]['index'] += 1 - frame_master = function.frame.make_frame_modbus(self.block['data']) + frame_master = function.protocols.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: end') @@ -480,6 +496,413 @@ class LaminaController: return True +class LaminaController_new(DeviceSerial): + """ 叠光控制器 + """ + def __init__(self, com_name="COM16", addr_modbus=0x01, type_dev='DLSP001', **kwargs): + """ 调用超类实现函数初始化 """ + super().__init__(com_name, + callbacks=(lambda : protocols.make_frame_modbus(self.block), + lambda frame: protocols.check_frame_modbus(frame, self.block)), + **kwargs) + + self.device = type_dev + match type_dev: + case 'DLSP001': + self.make_package = lambda *args, **kwargs: GeneratePackage('DLSP001', *args, **kwargs) + self.make_image = lambda *args, **kwargs: GenerateImage('DLSP001', *args, **kwargs) + case _: + self.make_package = None + self.make_image = None + + self.block = { + 'addr_dev' : addr_modbus, + 'data_define': ParamMap_LaminaController, + } + + def frame_read(self, daddr=0x60, dlen=0x30) -> bool: + self.block['type'] = 'read' + self.block['data_addr'] = daddr + self.block['data_len'] = dlen + return self._transfer_data() + + def frame_write_one(self, daddr=0x85, dval=-900) -> bool: + self.block['type'] = 'write_one' + self.block['data_addr'] = daddr + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() + + def frame_write_dual(self, daddr=0x91, dval=600) -> bool: + self.block['type'] = 'write_dual' + self.block['data_addr'] = daddr + item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval * item_coff) + return self._transfer_data() + + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: + self.block['type'] = 'write_str' + self.block['data_addr'] = daddr + self.block['data_val'] = dval + return self._transfer_data() + + def frame_record(self) -> bool: + """ 读取录波数据 + """ + param_saved = self.flag_print, self.retry, self.time_out + self.flag_print = False + self.retry = 3 + self.time_out = 1.5 + self.block['file_block_size'] = 240 + # 读取config + self.block['type'] = 'record_cfg' + self.block['step'] = 'start' + frame_master = function.protocols.make_frame_modbus(self.block) + + ret, pbar = True, None + frame_data_cfg = [] + while ret: + if ret := self.__transfer_data(frame_master): + frame_data_cfg.append(self.output['record']) + if self.output['record']['seq'] == 0: + pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") + self.block['step'] = 'next' + frame_master = function.protocols.make_frame_modbus(self.block) + elif (self.output['record']['seq']) >= self.output['record']['total']: + ret = False + pbar and pbar.update() + pbar and pbar.close() + + # 读取data + self.block['type'] = 'record_data' + self.block['step'] = 'start' + frame_master = function.protocols.make_frame_modbus(self.block) + + ret, pbar = True, None + frame_data_record = [] + while ret: + if ret := self.__transfer_data(frame_master): + frame_data_record.append(self.output['record']) + if self.output['record']['seq'] == 0: + pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") + self.block['step'] = 'next' + frame_master = function.protocols.make_frame_modbus(self.block) + elif (self.output['record']['seq']) >= self.output['record']['total']: + ret = False + pbar and pbar.update() + pbar and pbar.close() + + self.flag_print, self.retry, self.time_out = param_saved + if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3): + print("Operation_Record: 未取得录波数据.") + return False + + # 处理配置信息 + data_cfg_bare = b"".join([x['data'] for x in frame_data_cfg]) + config_record = {'LinePos': []} + pos = 0 + if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1: + Warning("config 配置文件格式异常") + pos += 2 + config_record['LinePos'].append(pos) + len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2]) + pos += 2 + config_record['FaultWord'] = [] + for i in range(0, len_faultword, 2): + faultword = data_cfg_bare[pos+i+1] * 0x100 + data_cfg_bare[pos+i] + config_record['FaultWord'].append(faultword) + pos += len_faultword + config_record['SysStatus'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['FaultRecordPosition'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['FaultNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2: + Warning("config 配置文件格式异常") + pos += 2 + config_record['LinePos'].append(pos) + config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['ChNum_Alg'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3: + Warning("config 配置文件格式异常") + pos += 2 + config_record['LinePos'].append(pos) + config_record['ChannelDescription'] = [] + config_record['ChannelCoefficient'] = [] + for i in range(config_record['ChannelNum']): + len_str = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + ch_desc = [] + for j in range(0, len_str, 2): + ch_desc.append(data_cfg_bare[pos + j]) + pos += len_str + ch_coe = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['ChannelDescription'].append(bytearray(ch_desc).decode()) + config_record['ChannelCoefficient'].append(ch_coe) + if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4: + Warning("config 配置文件格式异常") + pos += 2 + config_record['LinePos'].append(pos) + config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['FreqNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['SampleFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['SamplePoint'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['DataType'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] + pos += 2 + if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5: + Warning("config 配置文件格式异常") + pos += 2 + config_record['LinePos'].append(pos) + time_stamp = { + 'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos], + 'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2], + 'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4], + 'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6], + 'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8], + 'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] + + (data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001, + } + config_record['StartTime'] = time_stamp + pos += 14 + time_stamp = { + 'year': data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos], + 'month': data_cfg_bare[pos+3] * 0x100 + data_cfg_bare[pos+2], + 'day': data_cfg_bare[pos+5] * 0x100 + data_cfg_bare[pos+4], + 'hour': data_cfg_bare[pos+7] * 0x100 + data_cfg_bare[pos+6], + 'minute': data_cfg_bare[pos+9] * 0x100 + data_cfg_bare[pos+8], + 'second': data_cfg_bare[pos+11] * 0x100 + data_cfg_bare[pos+10] + + (data_cfg_bare[pos+13] * 0x100 + data_cfg_bare[pos+12]) * 0.001, + } + config_record['TriggerTime'] = time_stamp + self.log['record']['config'] = config_record + self.log['record']['bare_config'] = data_cfg_bare + + # 处理录波数据 + data_record_bare = b"".join([x['data'] for x in frame_data_record]) + data_record = [] + pos = 0 + while pos < len(data_record_bare): + record_point = { + 'index': data_record_bare[pos+3] * 0x1000 + + data_record_bare[pos+2] * 0x100 + + data_record_bare[pos+1] * 0x10 + + data_record_bare[pos], + 'timestamp': data_record_bare[pos+7] * 0x1000 + + data_record_bare[pos+6] * 0x100 + + data_record_bare[pos+5] * 0x10 + + data_record_bare[pos + 4], + 'ChAlg': [], + 'ChDgt': [], + } + pos += 8 + for i in range(config_record['ChNum_Alg']): + point_data_alg = data_record_bare[pos+1] * 0x100 + data_record_bare[pos] + if data_record_bare[pos+1] & 0x80: + point_data_alg -= 0x10000 + record_point['ChAlg'].append(point_data_alg) + pos += 2 + for i in range(config_record['ChNum_Dgt']): + point_data_dgt = (data_record_bare[pos+(i // 8)] & (0x01 << (i % 8))) == (0x01 << (i % 8)) + record_point['ChDgt'].append(point_data_dgt) + pos += 2 + data_record.append(record_point) + + self.log['record']['data'] = data_record + self.log['record']['bare_data'] = data_record_bare + + return True + + def frame_update(self, path_bin: Path, makefile: bool = False) -> bool: + """ 程序升级 + 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; + """ + if makefile: + self.block['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin) + else: + self.block['file'] = path_bin.read_bytes() + self.block['header_offset'] = 184 + self.block['type'] = 'update' + param_saved = self.flag_print, self.retry, self.time_out + self.retry = 5 + self.time_out = 0.5 + self.flag_print = False + + # 启动帧 + self.block['step'] = 'start' + self.block['index'] = 0 + frame_master = function.protocols.make_frame_modbus(self.block) + if not self.__transfer_data(frame_master): + self.flag_print, self.retry, self.time_out = param_saved + print('Upgrade Fail: start') + return False + self.block["data"]['file_block_size'] = self.output['upgrade']['length'] + + # 避免接收到延迟返回报文 + time.sleep(self.time_out) + + # 文件传输 + self.retry = 3 + self.time_out = 1.5 + self.block["data"]['step'] = 'trans' + self.block['index'] = 0 + frame_total = ceil((len(self.block["data"]['file']) - self.block['header_offset']) / self.block["data"]['file_block_size']) + for idx in tqdm(range(frame_total), desc="File Transmitting"): + self.block["data"]['index'] = idx + frame_master = function.protocols.make_frame_modbus(self.block) + if not self.__transfer_data(frame_master): + self.flag_print, self.retry, self.time_out = param_saved + print(f'Upgrade Fail: trans data in {idx}') + return False + + # 结束升级 + self.time_out = 1 + self.block["data"]['step'] = 'end' + self.block["data"]['index'] += 1 + frame_master = function.protocols.make_frame_modbus(self.block) + if not self.__transfer_data(frame_master): + self.flag_print, self.retry, self.time_out = param_saved + print(f'Upgrade Fail: end') + return False + + self.flag_print, self.retry, self.time_out = param_saved + return True + + +def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray: + """ 生成升级包 """ + config = { + 'prod_type': [0x46, 0x00], # 产品类型 + 'method_compress': False, # 文件压缩 + 'prog_id': list(type_dev.encode('ascii')), # 程序识别号 + 'prog_type': 'app', # 程序类型 + 'area_code': [0x00, 0x00], # 地区 + 'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份) + } + match type_dev: + case 'DLSP001': + MemoryMap = MemoryMap_280039 + case _: + raise Exception("Unknow device.") + + bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=MemoryMap['app_size'], conv_end=False) + encrypt_main = file_upgrade.file_encryption(bin_main) + + info_version = file_upgrade.parser_version_info(bin_main, 0x88000) + md5_ctx = hashlib.md5() + md5_ctx.update(bin_main) + config["md5"] = list(md5_ctx.digest()) + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) + config['hex_name'] = list(info_version['name'])[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (main_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + print("Package generated successfully.") + print(f"File name: {path_hex.name}") + print(f"File Info:") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + + # 组装镜像 + Image = [0xFF] * (len(main_header) + len(encrypt_main)) + offset_image = 0 + Image[offset_image: offset_image + len(main_header)] = main_header + offset_image += len(main_header) + Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main + + return bytearray(Image) + + +def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Path, **kwargs) ->bytearray: + """ 镜像生成 """ + config = { + 'prod_type': [0x46, 0x00], # 产品类型 + 'method_compress': False, # 文件压缩 + 'prog_id': list(type_dev.encode('ascii')), # 程序识别号 + 'prog_type': 'app', # 程序类型 + 'area_code': [0x00, 0x00], # 地区 + 'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份) + } + match type_dev: + case 'DLSP001': + MemoryMap = MemoryMap_280039 + case _: + raise Exception("Unknow device.") + + bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=MemoryMap['boot_size'], conv_end=False) + bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=MemoryMap['app_size'], conv_end=False) + bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=MemoryMap['app_size'], conv_end=False) + + md5_ctx = hashlib.md5() + md5_ctx.update(bin_main) + config["md5"] = list(md5_ctx.digest()) + config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲 + config['file_length'] = ByteConv.conv_int_to_array(len(bin_main)) + config['hex_name'] = list(path_main.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (main_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + md5_ctx = hashlib.md5() + md5_ctx.update(bin_back) + config["md5"] = list(md5_ctx.digest()) + config['upgrade_type'] = [0x02, 0x00] # 备份程序 + config['file_length'] = ByteConv.conv_int_to_array(len(bin_back)) + config['hex_name'] = list(path_back.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + if (back_header:=file_upgrade.build_header_new(config)) is None: + raise Exception("Header tag oversize. ") + + # 组装镜像 + main_header_buffer = bytearray() + back_header_buffer = bytearray() + for byte_org in main_header: + main_header_buffer.extend(bytearray([0x00, byte_org])) + for byte_org in back_header: + back_header_buffer.extend(bytearray([0x00, byte_org])) + Image = [0xFF] * MemoryMap['image_size'] + Image[MemoryMap['boot_addr']: MemoryMap['boot_addr'] + len(bin_boot)] = bin_boot + Image[MemoryMap['main_addr']: MemoryMap['main_addr'] + len(main_header_buffer)] = main_header_buffer + Image[MemoryMap['back_addr']: MemoryMap['back_addr'] + len(back_header_buffer)] = back_header_buffer + Image[MemoryMap['main_header']: MemoryMap['main_header'] + len(bin_main)] = bin_main + Image[MemoryMap['back_header']: MemoryMap['back_header'] + len(bin_back)] = bin_back + + # Log打印 + print("Merge Image generated successfully.") + print(f"Main File:") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + print(f"Back File:") + print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") + + # 额外文件 + if 'output_header' in kwargs.keys() and kwargs['output_header']: + (path_main.parent / (path_main.stem + '.header')).write_bytes(main_header) + (path_back.parent / (path_back.stem + '.header')).write_bytes(back_header) + if 'output_bin' in kwargs.keys() and kwargs['output_bin']: + (path_main.parent / (path_main.stem + '.bin')).write_bytes(bin_main) + (path_back.parent / (path_back.stem + '.bin')).write_bytes(bin_back) + if 'output_encrypt' in kwargs.keys() and kwargs['output_encrypt']: + main_encrypt = file_upgrade.file_encryption(bin_main) + back_encrypt = file_upgrade.file_encryption(bin_back) + (path_main.parent / (path_main.stem + '.encrypt')).write_bytes(main_encrypt) + (path_back.parent / (path_back.stem + '.encrypt')).write_bytes(back_encrypt) + + return bytearray(Image) + + def GeneratePackage_DLSP001_p280039(path_hex: Path): """ 叠光控制器DSP-280039平台版本 生成升级包 """ config = { @@ -582,4 +1005,5 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P offset_image = 2 * 0x01C000 Image[offset_image: offset_image + len(bin_back)] = bin_back - return bytearray(Image), main_header, back_header \ No newline at end of file + return bytearray(Image), main_header, back_header + diff --git a/source/device/function/__init__.py b/source/device/function/__init__.py index 755428c..4529c42 100644 --- a/source/device/function/__init__.py +++ b/source/device/function/__init__.py @@ -1,2 +1,2 @@ -from . import frame +from . import protocols from . import file_upgrade \ No newline at end of file diff --git a/source/device/function/frame.py b/source/device/function/protocols.py similarity index 100% rename from source/device/function/frame.py rename to source/device/function/protocols.py diff --git a/source/post_work.py b/source/post_work.py index e0a5c35..8251150 100644 --- a/source/post_work.py +++ b/source/post_work.py @@ -1,4 +1,5 @@ from pathlib import Path +from device.LaminaAdapter import LaminaAdapter from device.LaminaAdapter import GenerateImage_SLCP001_p4a0, GeneratePackage_SLCP001_p4a0 from device.LaminaAdapter import GenerateImage_SLCP101_p460, GeneratePackage_SLCP101_p460 from device.LaminaAdapter import GenerateImage_SLCP102_p460, GeneratePackage_SLCP102_p460 @@ -198,6 +199,51 @@ def Process2(): file_image3.write_bytes(data_image) +def Process(type_dev: str, path_boot: Path, path_project: Path): + """ 镜像生成流程 """ + root_boot = path_boot + root_main = path_project + result = root_main + + # 正常启动镜像 + hex_boot = root_boot / "bootloader.hex" + hex_main = root_main / "lamina_adapter.hex" + hex_back = root_main / "lamina_adapter_back.hex" + hex_update = hex_main + if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()): + raise Exception("缺失必要程序文件") + + file_image = result / f'{hex_main.stem}_ROM.bin' + file_package = result / f'{hex_update.stem}.dat' + + dev_lamina = LaminaAdapter(None, type_dev=type_dev) + + data_package = dev_lamina.make_package(hex_update) + data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True) + + file_image.write_bytes(data_image) + file_package.write_bytes(data_package) + + # 异常镜像-主分区md5错误 + file_image1 = result / f'{file_image.stem}_b1.bin' + data_image_copy = data_image + data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] + file_image1.write_bytes(data_image_copy) + + # 异常镜像-备份分区md5错误 + file_image2 = result / f'{file_image.stem}_b2.bin' + data_image_copy = data_image + data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] + file_image2.write_bytes(data_image_copy) + + # 异常镜像-双分区md5错误 + file_image3 = result / f'{file_image.stem}_b3.bin' + data_image_copy = data_image + data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] + data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] + file_image3.write_bytes(data_image_copy) + + if __name__ == "__main__": path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") @@ -207,3 +253,8 @@ if __name__ == "__main__": Process1(path_boot2, path_main) # 适配器SLCP101 # Process1_v2(path_boot2, path_main) # 适配器SLCP102 # Process2() + + path_boot1 = Path(r"resource/460-PROJ_STACKLIGHT_OPTIMIZER") + path_boot2 = Path(r"resource/460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") + path_project = Path(r"resource\SLCP101") + Process('SLCP101', path_boot2, path_project) \ No newline at end of file