diff --git a/source/dev_LaminaAdapter.py b/source/dev_LaminaAdapter.py index 0916239..09377ad 100644 --- a/source/dev_LaminaAdapter.py +++ b/source/dev_LaminaAdapter.py @@ -1,11 +1,11 @@ import time from pathlib import Path from device.LaminaAdapter import LaminaAdapter -from source.device.LaminaAdapter import GeneratePackage_SLCP101_p460 -from source.device.LaminaAdapter import GenerateImage_SLCP101_p460 -from source.device.LaminaAdapter import GeneratePackage_DLSY001_p460 -from source.device.LaminaAdapter import GenerateImage_DLSY001_p460 -from function.tools.ByteConv import trans_list_to_str +from device.LaminaAdapter import GeneratePackage_SLCP101_p460 +from device.LaminaAdapter import GenerateImage_SLCP101_p460 +from device.LaminaAdapter import GeneratePackage_DLSY001_p460 +from device.LaminaAdapter import GenerateImage_DLSY001_p460 +from device.tools.ByteConv import trans_list_to_str def test_communication(time_out=2): """ 通信成功率测试 """ @@ -37,7 +37,7 @@ def make_Pakeage(fp: Path): file_package = fp.parent / f'{hex_update.stem}.dat' file_update_bin = fp.parent / f'{hex_update.stem}.bin' - data_package, data_update_bin = GeneratePackage_SLCP101_p460(hex_update) + data_package, data_update_bin = GeneratePackage_DLSY001_p460(hex_update) file_package.write_bytes(data_package) file_update_bin.write_bytes(data_update_bin) @@ -239,45 +239,47 @@ if __name__=='__main__': 'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10}, } - Process1() - exit(0) + # Process1() + # Process2() + # exit(0) - dev_lamina = LaminaAdapter(**mode_config['HPLC']) + dev_lamina = LaminaAdapter(**mode_config['Debug']) dev_lamina.frame_read(0x0100, 0x20) if not hasattr(__builtins__,"__IPYTHON__"): # 工程-即时转换 - file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") + # file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") + file_hex = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug\lamina_optimizer.hex") if not file_hex.exists(): raise Exception("工程编译目标文件不存在.") file_package = make_Pakeage(file_hex) - version = "DLSY001_240911_1600_V1.01" + version = "DLSY001_241030_1430_V1.02" addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00] while True: """ 自动检测升级流程(需要调试) """ ret = False - while not ret or ('Regs' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Regs'][0x0100][1]): - dev_lamina.frame_read(0x82, 3) + while not ret or ('Regs' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Regs'][0x0100][1].strip('\000')): + # dev_lamina.frame_read(0x82, 3) ret = dev_lamina.frame_read(0x0100, 0x20) time.sleep(1) dev_lamina.frame_update(file_package) - time.sleep(6) + time.sleep(3) ret = dev_lamina.frame_read(0x0100, 0x20) if ret and (version == dev_lamina.output['Regs'][0x0100][1]): dev_lamina.frame_write_one(0x52, 0x01) - print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}") - dev_lamina.frame_write_str(0x82, addr) - dev_lamina.frame_read(0x82, 3) - addr[5] += 1 - if addr[5] & 0x0F >= 10: - addr[5] += 0x10 - 10 + # print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}") + # dev_lamina.frame_write_str(0x82, addr) + # dev_lamina.frame_read(0x82, 3) + # addr[5] += 1 + # if addr[5] & 0x0F >= 10: + # addr[5] += 0x10 - 10 diff --git a/source/dev_LaminaController.py b/source/dev_LaminaController.py index df7a964..da43ecc 100644 --- a/source/dev_LaminaController.py +++ b/source/dev_LaminaController.py @@ -5,8 +5,8 @@ from tenacity import retry, stop_after_attempt, wait_fixed from device.LaminaController import LaminaController from device.LaminaController import GeneratePackage_DLSP001_p280039 from device.LaminaController import GenerateImage_DLSP001_p280039 -from function.tools.ByteConv import trans_list_to_str -from tools.IntelHex import file_Bin_to_IntelHex +from device.tools.ByteConv import trans_list_to_str +from device.tools.IntelHex import file_Bin_to_IntelHex def test_communication(time_out=2): """ 通信成功率测试 """ diff --git a/source/device/DeviceSerial.py b/source/device/DeviceSerial.py index d786d0c..25c24fc 100644 --- a/source/device/DeviceSerial.py +++ b/source/device/DeviceSerial.py @@ -1,8 +1,8 @@ import time from serial import Serial -from function.tools.ByteConv import conv_int_to_array, trans_list_to_str -from function.frame import make_frame_modbus, check_frame_modbus, print_display +from . import tools +from . import function class DeviceSerial: """ 串口通信设备原型 @@ -52,20 +52,20 @@ class DeviceSerial: flag_frame = True frame_recv += bytes_read try: - self.output = check_frame_modbus(frame_recv, self.block['data']) + self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) if self.flag_print: - print("Read Frame: ", trans_list_to_str(frame_recv)) + print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: - print("Fail Data: " , trans_list_to_str(frame_recv)) + print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] def __transfer_data(self, frame: bytearray) -> bool: """ 串口收发报文, 包含重试逻辑与数据打印 """ if self.__com is None: - print(trans_list_to_str(frame)) + print(tools.ByteConv.trans_list_to_str(frame)) return False fail_count = 0 @@ -77,12 +77,12 @@ class DeviceSerial: if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: - print("Send Frame: ", trans_list_to_str(frame)) + print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) time.sleep(10 * self.time_gap) if self.__read_frame(): if (self.flag_print is not None) and 'Regs' in self.output.keys(): - print_display(self.output['Regs']) + function.frame.print_display(self.output['Regs']) self.log['read'] += 1 break else: diff --git a/source/device/EnergyRouter.py b/source/device/EnergyRouter.py index d47dd4d..faf2cb0 100644 --- a/source/device/EnergyRouter.py +++ b/source/device/EnergyRouter.py @@ -3,9 +3,9 @@ import socket import random import hashlib from pathlib import Path -from function.tools.ByteConv import conv_int_to_array, trans_list_to_str -from function.frame import make_frame_modbus, check_frame_modbus, print_display -from function.file_upgrade import build_header, file_encryption + +from . import tools +from . import function ParamMap_EnergyRouter = { 0x00: ['编译日期', 4, 6], @@ -46,7 +46,7 @@ class EnergyRouter: def __transfer_data(self, frame: bytes) -> bool: """ 数据传输处理函数 """ if self.tcp_socket is None: - print(trans_list_to_str(frame)) + print(tools.ByteConv.trans_list_to_str(frame)) return False try: @@ -54,15 +54,15 @@ class EnergyRouter: time.sleep(self.time_out) frame_recv = self.tcp_socket.recv(128) - self.output = check_frame_modbus(frame_recv, self.block) + self.output = function.frame.check_frame_modbus(frame_recv, self.block) if self.flag_print: - print("Read Frame: ", trans_list_to_str(frame_recv)) + print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) if 'Regs' in self.output.keys(): - print_display(self.output['Regs']) + function.frame.print_display(self.output['Regs']) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: - print("Fail Data: " , trans_list_to_str(frame_recv)) + print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] @@ -71,7 +71,7 @@ class EnergyRouter: self.block['type'] = 'read' self.block['data_addr'] = daddr self.block['data_len'] = dlen - frame = make_frame_modbus(self.block) + frame = function.frame.make_frame_modbus(self.block) return self.__transfer_data(frame) @@ -86,7 +86,7 @@ class EnergyRouter: self.block['file'] = Path(path_bin).read_bytes() self.block['header_offset'] = 128 # 启动帧 - frame_master = make_frame_modbus(self.block) + frame_master = function.frame.make_frame_modbus(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved @@ -116,7 +116,7 @@ class EnergyRouter: continue seq_window[i] = 1 self.block['index'] = seq_offset + i - seq_frame_master[i] = make_frame_modbus(self.block) + seq_frame_master[i] = function.frame.make_frame_modbus(self.block) self.tcp_socket.send(seq_frame_master[i]) # 接收帧回复 tmp = list(zip(range(len(seq_window)), seq_window)) @@ -126,7 +126,7 @@ class EnergyRouter: # 接收到空数据, 对端已关闭连接 if seq_frame_slave[i] == '': raise Exception("TCP closed.") - self.output = check_frame_modbus(seq_frame_slave[i], None) + self.output = function.frame.check_frame_modbus(seq_frame_slave[i], None) seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope'] if seq_current < seq_offset: raise Exception("Error.") @@ -152,14 +152,14 @@ class EnergyRouter: # 结束升级 self.block['step'] = 'end' - frame_master = make_frame_modbus(self.block) + frame_master = function.frame.make_frame_modbus(self.block) while self.output['result'] is False: self.tcp_socket.send(frame_master) frame_slave = self.tcp_socket.recv(8) if frame_slave == '': raise Exception("TCP closed.") - self.output = check_frame_modbus(frame_slave[:18], self.block) + self.output = function.frame.check_frame_modbus(frame_slave[:18], self.block) def GeneratePackage_Demo_Xilinx(path_bin: Path): @@ -188,19 +188,19 @@ def GeneratePackage_Demo_Xilinx(path_bin: Path): md5_ctx = hashlib.md5() md5_ctx.update(data_bin) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(data_bin)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(data_bin)) config['hex_name'] = list(path_bin.name.encode())[:80] - if (header:= build_header(config, 128)) is None: + if (header:= function.file_upgrade.build_header(config, 128)) is None: raise Exception("Header tag oversize. ") - if (header_512:= build_header(config, 512)) is None: + if (header_512:= function.file_upgrade.build_header(config, 512)) is None: raise Exception("Header tag oversize. ") - data_encrypt = file_encryption(data_bin) + data_encrypt = function.file_upgrade.file_encryption(data_bin) print("Upgrade file generated successfully.") print(f"\t header_length={len(header)}, bin_length={len(data_bin)}[{hex(len(data_bin))}]") - print(f"\t file md5: {trans_list_to_str(config['md5'])}") + print(f"\t file md5: {tools.ByteConv.trans_list_to_str(config['md5'])}") file1 = path_bin.parent / (path_bin.stem + '.dat') file1.write_bytes(header + data_bin) file2 = path_bin.parent / (path_bin.stem + '_h512.dat') diff --git a/source/device/LaminaAdapter.py b/source/device/LaminaAdapter.py index 9cb19e8..b942280 100644 --- a/source/device/LaminaAdapter.py +++ b/source/device/LaminaAdapter.py @@ -4,11 +4,9 @@ from math import ceil from tqdm import tqdm from pathlib import Path from serial import Serial -from function.file_upgrade import build_header_new, file_encryption -from function.tools.ByteConv import conv_int_to_array, trans_list_to_str -from function.tools.IntelHex import file_IntelHex_to_Bin -from function.frame import check_frame_dlt645, make_frame_dlt645, print_display +from . import tools +from . import function ParamMap_LaminaAdapter = { # 1 - Hex @@ -17,6 +15,7 @@ ParamMap_LaminaAdapter = { # 4 - str # 5 - addr # 6 - float + 0x00: ["硬件版本识别电压", 2, 1000], 0x0E: ["故障字1", 1], 0x0F: ["故障字2", 1], 0x10: ["MPPT工作状态", 1], @@ -111,13 +110,14 @@ ParamMap_LaminaAdapter = { 0x110: ["型号", 4, 16], 0x120: ["载波芯片地址", 4, 16], 0x130: ["厂商", 4, 8], - 0x138: ["保留", 4, 8], - 0x140: ["保留", 4, 16], + 0x138: ["ODM版本", 4, 16], + 0x148: ["保留", 4, 8], 0x150: ["保留", 4, 16], 0x160: ["硬件", 4, 16], 0x170: ["SN", 4, 16], 0x180: ["MES", 4, 16], 0x190: ["Datetime", 4, 16], + 0x1A0: ["版本前缀", 4, 5], } @@ -178,13 +178,13 @@ class LaminaAdapter: flag_frame = True frame_recv += bytes_read try: - self.output = check_frame_dlt645(frame_recv, self.block) + self.output = function.frame.check_frame_dlt645(frame_recv, self.block) if self.flag_print: - print("Read Frame: ", trans_list_to_str(frame_recv)) + print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: - print("Fail Data: " , trans_list_to_str(frame_recv)) + print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] @@ -192,7 +192,7 @@ class LaminaAdapter: def __transfer_data(self, frame: bytearray) -> bool: """ 报文数据传输 """ if self.__com is None: - print(trans_list_to_str(frame)) + print(tools.ByteConv.trans_list_to_str(frame)) return False fail_count = 0 @@ -204,11 +204,11 @@ class LaminaAdapter: if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: - print("Send Frame: ", trans_list_to_str(frame)) + print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) if self.__read_frame(): if 'Regs' in self.output.keys(): - print_display(self.output['Regs']) + function.frame.print_display(self.output['Regs']) self.log['read'] += 1 break else: @@ -223,7 +223,7 @@ class LaminaAdapter: self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen - frame = make_frame_dlt645(self.block) + frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) @@ -231,7 +231,7 @@ class LaminaAdapter: self.block['data']['type'] = 'write_one' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = make_frame_dlt645(self.block) + frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) @@ -239,7 +239,7 @@ class LaminaAdapter: self.block['data']['type'] = 'write_dual' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = make_frame_dlt645(self.block) + frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) @@ -247,7 +247,7 @@ class LaminaAdapter: self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = make_frame_dlt645(self.block) + frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) @@ -265,7 +265,7 @@ class LaminaAdapter: self.block['data']['file'] = Path(path_bin).read_bytes() self.block['data']['header_offset'] = 184 # 启动帧 - frame_master = bytearray(make_frame_dlt645(self.block)) + frame_master = bytearray(function.frame.make_frame_dlt645(self.block)) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved @@ -284,7 +284,7 @@ class LaminaAdapter: frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) for idx in tqdm(range(frame_total), desc="File Transmitting"): self.block["data"]['index'] = idx - frame_master = make_frame_dlt645(self.block) + frame_master = function.frame.make_frame_dlt645(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: trans data in {idx}') @@ -294,7 +294,7 @@ class LaminaAdapter: self.time_out = 1 self.block["data"]['step'] = 'end' self.block["data"]['index'] += 1 - frame_master = make_frame_dlt645(self.block) + frame_master = function.frame.make_frame_dlt645(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: end') @@ -313,16 +313,16 @@ def GeneratePackage_SLCP101_p460(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) - encrypt_main = file_encryption(bin_main) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) + encrypt_main = function.file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -349,28 +349,28 @@ def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) - bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) - bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) - encrypt_main = file_encryption(bin_main) - encrypt_back = file_encryption(bin_back) + bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) + bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) + encrypt_main = function.file_upgrade.file_encryption(bin_main) + encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_back)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=build_header_new(config)) is None: + if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") @@ -404,28 +404,28 @@ def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) - bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) - bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) - encrypt_main = file_encryption(bin_main) - encrypt_back = file_encryption(bin_back) + bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) + bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) + encrypt_main = function.file_upgrade.file_encryption(bin_main) + encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_back)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=build_header_new(config)) is None: + if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") @@ -459,16 +459,16 @@ def GeneratePackage_DLSY001_p460(path_hex: Path): 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) - encrypt_main = file_encryption(bin_main) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) + encrypt_main = function.file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -495,28 +495,28 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } - bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) - bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) - bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) - encrypt_main = file_encryption(bin_main) - encrypt_back = file_encryption(bin_back) + bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) + bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) + encrypt_main = function.file_upgrade.file_encryption(bin_main) + encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_back)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=build_header_new(config)) is None: + if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") diff --git a/source/device/LaminaController.py b/source/device/LaminaController.py index c509cd1..d04df04 100644 --- a/source/device/LaminaController.py +++ b/source/device/LaminaController.py @@ -1,13 +1,12 @@ import time +import hashlib from math import ceil from tqdm import tqdm from pathlib import Path from serial import Serial -from function.file_upgrade import build_header_new, file_encryption, parser_version_info -from function.tools.ByteConv import conv_int_to_array, trans_list_to_str -from function.tools.IntelHex import file_IntelHex_to_Bin -from function.frame import check_frame_modbus, make_frame_modbus, print_display +from . import tools +from . import function ParamMap_LaminaController = { # 1 - Hex @@ -168,20 +167,20 @@ class LaminaController: flag_frame = True frame_recv += bytes_read try: - self.output = check_frame_modbus(frame_recv, self.block['data']) + self.output = function.frame.check_frame_modbus(frame_recv, self.block['data']) if self.flag_print: - print("Read Frame: ", trans_list_to_str(frame_recv)) + print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: - print("Fail Data: " , trans_list_to_str(frame_recv)) + print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] def __transfer_data(self, frame: bytearray) -> bool: """ 串口收发报文, 包含重试逻辑与数据打印 """ if self.__com is None: - print(trans_list_to_str(frame)) + print(tools.ByteConv.trans_list_to_str(frame)) return False fail_count = 0 @@ -193,12 +192,12 @@ class LaminaController: if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: - print("Send Frame: ", trans_list_to_str(frame)) + print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) time.sleep(10 * self.time_gap) if self.__read_frame(): if (self.flag_print is not None) and 'Regs' in self.output.keys(): - print_display(self.output['Regs']) + function.frame.print_display(self.output['Regs']) self.log['read'] += 1 break else: @@ -213,7 +212,7 @@ class LaminaController: self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen - frame = make_frame_modbus(self.block['data']) + frame = function.frame.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -222,7 +221,7 @@ class LaminaController: self.block['data']['data_addr'] = daddr item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data']['data_val'] = int(dval * item_coff) - frame = make_frame_modbus(self.block['data']) + frame = function.frame.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -231,7 +230,7 @@ class LaminaController: self.block['data']['data_addr'] = daddr item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1 self.block['data']['data_val'] = int(dval * item_coff) - frame = make_frame_modbus(self.block['data']) + frame = function.frame.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -239,7 +238,7 @@ class LaminaController: self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval - frame = make_frame_modbus(self.block['data']) + frame = function.frame.make_frame_modbus(self.block['data']) return self.__transfer_data(frame) @@ -254,7 +253,7 @@ class LaminaController: # 读取config self.block['data']['type'] = 'record_cfg' self.block['data']['step'] = 'start' - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) ret, pbar = True, None frame_data_cfg = [] @@ -264,7 +263,7 @@ class LaminaController: if self.output['record']['seq'] == 0: pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") self.block['data']['step'] = 'next' - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) elif (self.output['record']['seq']) >= self.output['record']['total']: ret = False pbar and pbar.update() @@ -273,7 +272,7 @@ class LaminaController: # 读取data self.block['data']['type'] = 'record_data' self.block['data']['step'] = 'start' - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) ret, pbar = True, None frame_data_record = [] @@ -283,7 +282,7 @@ class LaminaController: if self.output['record']['seq'] == 0: pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") self.block['data']['step'] = 'next' - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) elif (self.output['record']['seq']) >= self.output['record']['total']: ret = False pbar and pbar.update() @@ -441,7 +440,7 @@ class LaminaController: # 启动帧 self.block['data']['step'] = 'start' self.block['data']['index'] = 0 - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print('Upgrade Fail: start') @@ -459,7 +458,7 @@ class LaminaController: frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) for idx in tqdm(range(frame_total), desc="File Transmitting"): self.block["data"]['index'] = idx - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: trans data in {idx}') @@ -469,7 +468,7 @@ class LaminaController: self.time_out = 1 self.block["data"]['step'] = 'end' self.block["data"]['index'] += 1 - frame_master = make_frame_modbus(self.block['data']) + frame_master = function.frame.make_frame_modbus(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: end') @@ -490,17 +489,17 @@ def GeneratePackage_DLSP001_p280039(path_hex: Path): 'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份) } - bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False) - encrypt_main = file_encryption(bin_main) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False) + encrypt_main = function.file_upgrade.file_encryption(bin_main) - info_version = parser_version_info(bin_main, 0x88000) + info_version = function.file_upgrade.parser_version_info(bin_main, 0x88000) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(info_version['name'])[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") @@ -529,28 +528,28 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P 'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份) } - bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False) - bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False) - bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False) + bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False) + bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False) + bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲 - config['file_length'] = conv_int_to_array(len(bin_main)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (main_header:=build_header_new(config)) is None: + if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) config['upgrade_type'] = [0x02, 0x00] # 备份程序 - config['file_length'] = conv_int_to_array(len(bin_back)) + config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) - if (back_header:=build_header_new(config)) is None: + if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") main_header_buffer = bytearray() @@ -559,8 +558,8 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P back_header_buffer = bytearray() for byte_org in back_header: back_header_buffer.extend(bytearray([0x00, byte_org])) - main_encrypt = file_encryption(bin_main) - back_encrypt = file_encryption(bin_back) + main_encrypt = function.file_upgrade.file_encryption(bin_main) + back_encrypt = function.file_upgrade.file_encryption(bin_back) print("Merge Image generated successfully.") print(f"Main File:") diff --git a/source/device/__init__.py b/source/device/__init__.py index 29d9a1a..66bd041 100644 --- a/source/device/__init__.py +++ b/source/device/__init__.py @@ -1,2 +1,5 @@ +from . import tools +from . import function from . import EnergyRouter +from . import LaminaAdapter from . import LaminaController \ No newline at end of file diff --git a/source/device/function/__init__.py b/source/device/function/__init__.py new file mode 100644 index 0000000..755428c --- /dev/null +++ b/source/device/function/__init__.py @@ -0,0 +1,2 @@ +from . import frame +from . import file_upgrade \ No newline at end of file diff --git a/source/device/function/file_upgrade.py b/source/device/function/file_upgrade.py index c5943b5..007fa9b 100644 --- a/source/device/function/file_upgrade.py +++ b/source/device/function/file_upgrade.py @@ -1,10 +1,6 @@ -import hashlib -from pathlib import Path from datetime import datetime from crc import Calculator, Crc16 -from tools.IntelHex import file_IntelHex_to_Bin, file_Bin_to_IntelHex - Header_Tag = { 'file_type': [0x00, 2], # 0 - 文件类型; 2byte @@ -183,19 +179,6 @@ def parser_version_info(file_bin: bytearray, base_addr:int): return block_version -def test1(fp: Path): - """ bin文件生成测试 """ - file1 = Path(fp) - path1 = file1.parent / (file1.stem + ".bin") - data1 = file1.read_text() - data2 = file_IntelHex_to_Bin(data1, 0x3F0100) - path1.write_bytes(data2) - - # 测试Bin到IntelHex - bin = Path(fp).read_bytes() - result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2) - (fp.parent / (fp.stem + ".hex")).write_text(result) - def test2(): """ 校验, 加密测试 """ @@ -237,6 +220,6 @@ def task5(): if __name__ == "__main__": - # path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN") - # GeneratePackage_Demo_Xilinx(path_bin) + import hashlib + from pathlib import Path pass diff --git a/source/device/function/frame.py b/source/device/function/frame.py index 91ede38..d80dfb5 100644 --- a/source/device/function/frame.py +++ b/source/device/function/frame.py @@ -1,6 +1,7 @@ import struct from crc import Calculator, Crc16 -from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex + +from .. import tools modbus_map = { # 1 - Hex @@ -332,14 +333,14 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict data_label, data_len = "未知数据", 1 if idx not in modbus_map.keys(): item = convert_regs_to_int(data, 1, signed=False) - item = display_hex(item, 4) + item = tools.ByteConv.display_hex(item, 4) else: current_map = modbus_map[idx] data_label = current_map[0] if current_map[1] == 1: """ Hex字符表示 """ item = convert_regs_to_int(data, 1, signed=False) - item = display_hex(item, 4) + item = tools.ByteConv.display_hex(item, 4) elif current_map[1] == 2: """ 16位数值表示 """ item = convert_regs_to_int(data, 1) @@ -359,12 +360,12 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict item = item.decode() except Exception as ex: item_len = sum([any(item[i:]) for i in range(len(item))]) - item = trans_list_to_str(item[:item_len]) + item = tools.ByteConv.trans_list_to_str(item[:item_len]) elif current_map[1] == 5: """ 载波地址表示 """ data_len = current_map[2] item = swapping_words(data, data_len) - item = trans_list_to_str(item) + item = tools.ByteConv.trans_list_to_str(item) elif current_map[1] == 6: """ 浮点数值表示 """ data_len = 2 @@ -388,7 +389,7 @@ def print_display(output_data: dict): for key, value in output_data.items(): label = value[0] data = "-".join(map(str, value[1])) if type(value) == list else value[1] - print(f"{display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}") + print(f"{tools.ByteConv.display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}") diff --git a/source/device/function/tools/ByteConv.py b/source/device/tools/ByteConv.py similarity index 100% rename from source/device/function/tools/ByteConv.py rename to source/device/tools/ByteConv.py diff --git a/source/device/function/tools/IntelHex.py b/source/device/tools/IntelHex.py similarity index 90% rename from source/device/function/tools/IntelHex.py rename to source/device/tools/IntelHex.py index 91f989a..f7ea328 100644 --- a/source/device/function/tools/IntelHex.py +++ b/source/device/tools/IntelHex.py @@ -1,3 +1,6 @@ +from pathlib import Path + + def calculate_checksum(data): """ 计算校验域 """ checksum = 0 @@ -12,7 +15,7 @@ def file_IntelHex_to_Bin(file_data, base_address=0, len_max=1, **kwargs): """ if base_address == 0: if file_data[8] == '2': - base_address = int(file_data[9: 13], 16) * 0x100 + base_address = int(file_data[9: 13], 16) * 0x10 offset_begin = 16 elif file_data[8] == '4': base_address = int(file_data[9: 13], 16) * 0x10000 @@ -139,4 +142,19 @@ def file_Bin_to_IntelHex(file_data: bytearray, base_address=0, **kwargs): hex_record = f':00000001FF\n' result += hex_record - return result \ No newline at end of file + return result + + +def test1(fp: Path): + """ bin文件生成测试 """ + file1 = Path(fp) + path1 = file1.parent / (file1.stem + ".bin") + data1 = file1.read_text() + data2 = file_IntelHex_to_Bin(data1, 0x3F0100) + path1.write_bytes(data2) + + # 测试Bin到IntelHex + bin = Path(fp).read_bytes() + result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2) + (fp.parent / (fp.stem + ".hex")).write_text(result) + diff --git a/source/device/tools/__init__.py b/source/device/tools/__init__.py new file mode 100644 index 0000000..e8c778a --- /dev/null +++ b/source/device/tools/__init__.py @@ -0,0 +1,2 @@ +from . import IntelHex +from . import ByteConv \ No newline at end of file