diff --git a/.gitignore b/.gitignore index f3eb25a..dfe6956 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ # 缓存文件 **/__pycache__/*.pyc + +# 生成结果 +result/*.bin diff --git a/source/dev_LaminaAdapter.py b/source/dev_LaminaAdapter.py index 9073dbd..0e3a56b 100644 --- a/source/dev_LaminaAdapter.py +++ b/source/dev_LaminaAdapter.py @@ -105,7 +105,8 @@ modbus_map = { 0x130: ["SN", 4, 16], 0x140: ["MES", 4, 16], 0x150: ["Datetime", 4, 16], - 0x160: ["厂商", 4, 16], + 0x160: ["载波芯片地址", 4, 16], + 0x170: ["厂商", 4, 16], } @@ -113,12 +114,32 @@ class LaminaAdapter: def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): # 初始化串口通信 if com_name is not None: - self.com = Serial(com_name, baudrate=115200, parity='N', timeout=2) + com_config = {} + if 'baudrate' in kwargs.keys(): + com_config['baudrate'] = kwargs['baudrate'] + if 'parity' in kwargs.keys(): + com_config['parity'] = kwargs['parity'] + if 'bytesize' in kwargs.keys(): + com_config['bytesize'] = kwargs['bytesize'] + if 'stopbits' in kwargs.keys(): + com_config['stopbits'] = kwargs['stopbits'] + self.com = Serial(com_name, **com_config) else: self.com =None if 'frame_print' in kwargs.keys(): - self.frame_print = False + self.flag_print = True + + if 'time_out' in kwargs.keys(): + self.time_out = kwargs['time_out'] + if 'retry' in kwargs.keys(): + self.retry = kwargs['retry'] + else: + self.retry = 1 + if 'retry_sub' in kwargs.keys(): + self.retry_sub = kwargs['retry_sub'] + else: + self.retry_sub = 1 # 设置645协议地址 self.addr_645 = addr_645 @@ -136,13 +157,30 @@ class LaminaAdapter: def _transfer_data(self, frame): """ 报文数据传输 """ - self.com.read_all() - self.com.write(bytearray(frame)) - time.sleep(0.5) - frame_recv = self.com.read_all() - output_text = check_frame_dlt645(frame_recv, block=self.block) - print(output_text) - if self.frame_print: + cnt = 0 + while cnt < self.retry: + self.com.read_all() + self.com.write(bytearray(frame)) + + cnt_sub = 0 + frame_recv = None + while not frame_recv: + time.sleep(self.time_out) + frame_recv = self.com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + output_text = check_frame_dlt645(frame_recv, block=self.block) + cnt += 1 + except Exception as e: + print(e) + continue + print(output_text) + break + + if hasattr(self, 'frame_print'): print(trans_list_to_str(frame)) print(trans_list_to_str(frame_recv)) @@ -204,24 +242,24 @@ class LaminaAdapter: frame_master = bytearray(make_frame_dlt645(self.block)) # 等待擦除完成返回 - try_times = 3000 + try_times = 30 self.com.read_all() while try_times: - time.sleep(0.4) + time.sleep(self.time_out) self.com.write(frame_master) frame_slave = self.com.read_all() if not frame_slave: try_times -= 1 continue - self.block["data"]['file_block_size'] = check_frame_dlt645(frame_slave, self.block) + _, _, self.block["data"]['file_block_size'] = check_frame_dlt645(frame_slave, self.block) break if self.block["data"]['file_block_size'] == 0: raise Exception("Error slave response.") # 避免接收到延迟返回报文 - time.sleep(0.4) + time.sleep(self.time_out) # 文件传输 self.block["data"]['step'] = 'trans' @@ -229,13 +267,32 @@ class LaminaAdapter: while data_remain > 0: frame_master = bytearray(make_frame_dlt645(self.block)) - self.com.read_all() - self.com.write(frame_master) - time.sleep(0.2) - frame_slave = None - while not frame_slave: - frame_slave = self.com.read_all() - check_frame_dlt645(frame_slave, self.block) + cnt = 0 + while cnt < self.retry: + self.com.read_all() + self.com.write(frame_master) + + cnt_sub = 0 + frame_slave = None + while not frame_slave: + time.sleep(self.time_out) + frame_slave = self.com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + ret = check_frame_dlt645(frame_slave, self.block) + except Exception as e: + print(e) + ret = False, 0, 0 + if ret[0]: + break + else: + cnt += 1 + + if cnt >= self.retry: + raise Exception("Error, Retry failed.") self.block["data"]['index'] += 1 data_remain -= self.block["data"]['file_block_size'] @@ -244,21 +301,52 @@ class LaminaAdapter: self.block["data"]['step'] = 'end' frame_master = bytearray(make_frame_dlt645(self.block)) - self.com.read_all() - self.com.write(frame_master) - time.sleep(0.1) - frame_slave = None - while not frame_slave: - frame_slave = self.com.read_all() - check_frame_dlt645(frame_slave[:20], self.block) + cnt = 0 + while cnt < self.retry: + self.com.read_all() + self.com.write(frame_master) + + cnt_sub = 0 + frame_slave = None + while not frame_slave: + time.sleep(self.time_out) + frame_slave = self.com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + ret = check_frame_dlt645(frame_slave, self.block) + except Exception as e: + print(e) + ret = False, 0, 0 + if ret[0]: + break + else: + cnt += 1 + + if cnt >= self.retry: + raise Exception("Error, Retry failed.") if __name__=='__main__': - dev_lamina = LaminaAdapter(com_name="COM8") + mode_config = { + "Log": {'com_name': None}, + "Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, + 'time_out': 0.1}, + "HPLC": {'com_name': 'COM5', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, + 'time_out': 1, 'retry': 10, 'retry_sub': 10}, + } - dev_lamina.frame_read(0x0E, 0x14) + + dev_lamina = LaminaAdapter(**mode_config['Debug']) dev_lamina.frame_read(0x0100, 0x20) + if 0: + dev_lamina.frame_read(0x0E, 0x14) + if 0: + dev_lamina.frame_read(0x60, 0x60) + if 0: dev_lamina.frame_write_str(0x0130, list("SN20240107546")) dev_lamina.frame_write_str(0x0140, list("MES20240107546")) @@ -267,9 +355,13 @@ if __name__=='__main__': dev_lamina.frame_read(0x0130, 0x30) if not hasattr(__builtins__,"__IPYTHON__"): - # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240517_1100_T1.10.bin") + path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\lamina_adapter.bin") # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240520_0000_T1.11.bin") - path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240517_1800_V1.11.bin") + # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240525_1230_V1.15.bin") dev_lamina.frame_update(path_bin) + time.sleep(6) + + dev_lamina.frame_read(0x0100, 0x20) + diff --git a/source/func_frame.py b/source/func_frame.py index d2dc984..04f0ce2 100644 --- a/source/func_frame.py +++ b/source/func_frame.py @@ -222,40 +222,22 @@ def check_frame_modbus(frame, block=None): code_subfunc = frame[2] if code_func == 0x07: """ 升级回复帧 """ - if code_subfunc == 0x01 and frame[3] == 0x00: - return frame[6] * 256 + frame[7] + if frame[3] == 0x00: + check_result = True + else: + check_result = False + update_index = frame[4] * 256 + frame[5] + check_hope = 0 + if code_subfunc == 0x01: + """ 处理帧头 """ + check_hope = frame[6] * 256 + frame[7] elif code_subfunc == 0x02: """ 解析帧序号及返回值, 不做序列号校验 """ - update_result = frame[3] - update_index = frame[4] * 256 + frame[5] - # 检查严重错误标志 - if update_result == 0xFF: - raise Exception("Error, Update abort.") - if update_result >= 0xA0 or update_result == 0x0F: - check_result = False - else: - check_result = True - if update_result >= 0xA0: - update_result -= 0x90 - if update_result >= 0x20: - check_hope = update_index + (update_result & 0x0F) - elif update_result >= 0x10: - check_hope = update_index - (update_result & 0x0F) - else: - check_hope = None - return check_result, update_index, check_hope elif code_subfunc == 0x03: - if frame[3] == 0xFF: - raise Exception("Update verification failed.") - elif frame[3] == 0x01: - print("Update done.") - elif frame[3] == 0x00: - print("Update not done.") - else: - print("Update unknown return value.") - return frame[3] + """ 升级结束 """ else: raise Exception("Func code or Return code error.") + return check_result, update_index, check_hope elif code_func == 0x03 or code_func == 0x04: """ 数据读取帧 """ if frame[2] == len(frame[3:-2]): diff --git a/source/main.py b/source/main.py index 1e51b24..58203db 100644 --- a/source/main.py +++ b/source/main.py @@ -2,7 +2,7 @@ import time from webui import webui from pathlib import Path from func_frame import check_frame_dlt645 -from dev_Lamina import LaminaAdapter +from source.dev_LaminaAdapter import LaminaAdapter from dev_EnergyRouter import EnergyRouter diff --git a/source/utl_upgrade.py b/source/utl_upgrade.py index 53ba999..7a581b6 100644 --- a/source/utl_upgrade.py +++ b/source/utl_upgrade.py @@ -1,6 +1,8 @@ # 升级包生成脚本 import hashlib from pathlib import Path +from datetime import datetime +from crc import Calculator, Crc16 from utl import conv_int_to_array, trans_list_to_str @@ -20,40 +22,63 @@ Header_Tag = { 'hex_name': [0xFF, -1, 80], # 255 - Hex文件名; less than 80byte } -def file_IntelHex_to_Bin(file_data, base_address, len_max=0xC000, conv_end=False): +def file_IntelHex_to_Bin(file_data, base_address=-1, len_max=-1, **kwargs): """ 将Intel Hex格式文件转换为Bin格式; """ - result = bytearray(len_max).replace(b'\x00', b'\xff') + if base_address == -1: + if file_data[8] == '2': + base_address = int(file_data[9: 13], 16) * 16 + offset_begin = 16 + elif file_data[8] == '4': + base_address = int(file_data[9: 13], 16) * 2**16 + offset_begin = 16 + else: + base_address = 0 + offset_begin = 0 + base_address += int(file_data[offset_begin + 3: offset_begin + 7], 16) + base_address &= ~0x0FFF + if len_max == -1: + len_space = 0x10 + else: + len_space = len_max + result = bytearray(len_space).replace(b'\x00', b'\xff') lines = file_data.split('\n') offset = 0 max_address = 0 for line in lines: - if max_address >= len_max: - raise Exception("Bin file Oversize.") if line[0] == ':': checksum = sum([int(x, 16) * (15 * (~i & 0x01) + 1) for i, x in enumerate(line[1:])]) if (checksum & 0x00FF) != 0: raise Exception('Hex file checksum error!') if line[7:9] == '00': - len = int(line[1:3], 16) + len_line = int(line[1:3], 16) address = offset + int(line[3:7], 16) - base_address - address *= 2 data = bytearray.fromhex(line[9:-2]) - if conv_end: - for i in range(0, len, 2): - t = data[i] - data[i] = data[i+1] - data[i+1] = t - result[address:address+len] = data + if 'conv_end' in kwargs.keys(): + address *= 2 + if kwargs['conv_end']: + for i in range(0, len_line, 2): + t = data[i] + data[i] = data[i+1] + data[i+1] = t + + if max_address >= len_space: + if len_max == -1: + result += bytearray((len_line & ~0x0F) + 0x10).replace(b'\x00', b'\xff') + len_space += (len_line & ~0x0F) + 0x10 + else: + raise Exception("Bin file Oversize.") + result[address:address+len_line] = data + + if (address + len_line) > max_address: + max_address = address + len_line - if (address + len) > max_address: - max_address = address + len elif line[7:9] == '01': break elif line[7:9] == '02': - offset = int(line[9:-2], 16) * 2**16 + offset = int(line[9:-2], 16) * 16 elif line[7:9] == '03': pass elif line[7:9] == '04': # 拓展地址偏移 @@ -62,6 +87,7 @@ def file_IntelHex_to_Bin(file_data, base_address, len_max=0xC000, conv_end=False pass return result[:max_address] + def file_encryption(buffer): """ 文件加密算法 """ pwd_idx = 0 @@ -121,6 +147,74 @@ def build_header_lite(config: dict): return m_file_header +def build_header_new(config: dict): + """ + 基于配置参数, 生成新版文件信息头; + """ + # 定义文件头 + m_file_header = [0xFF] * 184 + + m_file_header[0:8] = list(b"TOPSCOMM") + m_file_header[8:10] = config['prod_type'] + m_file_header[10:22] = config['prog_id'] + [0] * (12 - len(config['prog_id'])) + if config['method_compress'] == True: + m_file_header[22] = 0x01 + else: + m_file_header[22] = 0x00 + + if 'crc32' in config.keys(): + m_file_header[23] = 0x00 + m_file_header[24: 40] = config['crc32'] + [0x00] * 12 + elif 'md5' in config.keys(): + m_file_header[23] = 0x01 + m_file_header[24: 40] = config['md5'] + else: + raise Exception("Error, Unknown method verify.") + + # 时间戳生成 + time_now = datetime.now() + time_stamp = list(map(lambda x: int(x), + time_now.strftime("%Y-%m-%d-%H-%M").split('-'))) + time_stamp.insert(1, time_stamp[0] // 0x100) + time_stamp[0] = time_stamp[0] % 0x100 + m_file_header[40: 46] = time_stamp + + m_file_header[46: 50] = config['file_length'] + # Cpu1 + m_file_header[50: 54] = [0x00] * 4 + m_file_header[54: 70] = [0x00] * 16 + # Cpu2 + m_file_header[70: 74] = [0x00] * 4 + m_file_header[74: 90] = [0x00] * 16 + + if config['prog_type'] == 'app': + m_file_header[90: 92] = [0x00, 0x00] + elif config['prog_type'] == 'boot': + m_file_header[90: 92] = [0x01, 0x00] + elif config['prog_type'] == 'diff': + m_file_header[90: 92] = [0x02, 0x00] + elif config['prog_type'] == 'font': + m_file_header[90: 92] = [0x03, 0x00] + elif config['prog_type'] == 'config': + m_file_header[90: 92] = [0x04, 0x00] + elif config['prog_type'] == 'data': + m_file_header[90: 92] = [0x05, 0x00] + elif config['prog_type'] == 'test': + m_file_header[90: 92] = [0x06, 0x00] + else: + raise Exception("Error, Unknown Program Type.") + + m_file_header[92: 94] = config['area_code'] + m_file_header[94: 158] = config['hex_name'] + m_file_header[158: 182] = [0x00] * 24 + + m_file_header = bytearray(m_file_header) + + calculator = Calculator(Crc16.MODBUS) + code_crc16 = calculator.checksum(m_file_header[:-2]) + m_file_header[182: 184] = [code_crc16 % 0x100, code_crc16 // 0x100] + + return m_file_header def make_datafile(file_path: str, config: dict, header_len=512): """ 升级文件生成函数; 完整文件头, TI-hex转换 """ @@ -174,6 +268,24 @@ def make_datafile2(file_path: str, config: dict, header_len=512): data_encrypt = file_encryption(data_bin) return data_header, data_bin, data_encrypt +def make_datafile3(file_path: str, config: dict, header_len=184): + """ 升级文件生成函数; 完整文件头 """ + file_path = Path(file_path) + file_data = file_path.read_text() + data_bin = file_IntelHex_to_Bin(file_data) + md5_ctx = hashlib.md5() + md5_ctx.update(data_bin) + config['file_length'] = conv_int_to_array(len(data_bin)) + config["md5"] = list(md5_ctx.digest()) + config['hex_name'] = list(file_path.name.encode())[:64] + config['hex_name'] += [0] * (64 - len(config['hex_name'])) + + data_header = build_header_new(config) + if data_header is None: + raise Exception("Header tag oversize. ") + data_encrypt = file_encryption(data_bin) + return data_header, data_bin, data_encrypt + def test1(fp): """ bin文件生成测试 """ @@ -186,6 +298,17 @@ def test1(fp): def test2(): """ md5校验, 加密测试 """ + bin_offcial = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\uart_tool\江苏发货版本\SLCP001_240517_1800_V1.11.bin") + data_offcial = bin_offcial.read_bytes() + byte_data = data_offcial[:182] + crc = data_offcial[182:184] + # data = "54 4F 50 53 43 4F 4D 4D 45 00 53 4C 43 50 30 30 31 00 00 00 00 00 01 00 B6 61 A8 73 BF 82 9E A7 4C 79 F6 BB 94 E2 A5 18 E8 07 05 18 0B 17 18 4C 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 6C 61 6D 69 6E 61 5F 61 64 61 70 74 65 72 5F 6D 61 69 6E 02 00 00 00 00 20 10 53 06 00 00 00 00 80 A5 8F 02 00 00 00 00 EC 1F 40 00 00 00 00 00 00 00 00 00 00 00 00 00 7A CA 61 0A FD 7F 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00" + # byte_data = list(map(lambda x: int(x, 16), data.split(' '))) + # crc = "99 CB" + calculator = Calculator(Crc16.MODBUS) + code_crc16 = calculator.checksum(bytearray(byte_data)) + print(f"Result = {hex(code_crc16)}, Offcial Result = {crc}") + data_bin = '123456 123456 '.encode() md5_ctx = hashlib.md5() md5_ctx.update(data_bin) @@ -229,11 +352,82 @@ def test3(path_bin): file2 = path_bin.parent / (path_bin.stem + '_h512.dat') file2.write_bytes(header_512 + data_b) +def test4(path_boot, path_main, path_back): + """ 叠光适配器新版升级方案测试 """ + config = { + 'prod_type': [0x45, 0x00], # 产品类型 + 'method_compress': False, # 文件压缩 + 'prog_id': list(b"SLCP001"), # 程序识别号 + 'prog_type': 'app', # 程序类型 + 'area_code': [0x00, 0x00], # 地区 + } + file_path = Path(path_boot) + file_data = file_path.read_text() + boot_bin = file_IntelHex_to_Bin(file_data, len_max=0x010000) + main_header, main_data_b, main_data_e = make_datafile3(path_main, config) + back_header, back_data_b, back_data_e = make_datafile3(path_back, config) + + print("Merge Image generated successfully.") + print(f"Main File:") + print(f"\t header_length={len(main_header)}, bin_length={len(main_data_b)}[{hex(len(main_data_b))}]") + print(f"Back File:") + print(f"\t header_length={len(back_header)}, bin_length={len(back_data_b)}[{hex(len(back_data_b))}]") + + # 组装镜像 + Image = [0xFF] * 0x100000 + offset_image = 0 + Image[offset_image: offset_image + len(boot_bin)] = boot_bin + offset_image = 0x010000 + Image[offset_image: offset_image + len(main_data_b)] = main_data_b + offset_image = 0x0BC000 + Image[offset_image: offset_image + len(back_data_b)] = back_data_b + offset_image = 0x0FC000 + Image[offset_image: offset_image + len(main_header)] = main_header + offset_image = 0x0FE000 + Image[offset_image: offset_image + len(back_header)] = back_header + + return bytearray(Image), main_header, main_data_b, main_data_e, back_header, back_data_b, back_data_e + if __name__ == "__main__": - path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN") + # path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN") + # test3(path_bin) - # test1() # test2() - test3(path_bin) - pass + + if 0: + bin_back = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\lamina_adapter_back.bin") + bin_main = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\lamina_adapter_main.bin") + main_header1 = bin_main.read_bytes()[:184] + main_data_e1 = bin_main.read_bytes()[184:] + back_header1 = bin_back.read_bytes()[:184] + back_data_e1 = bin_back.read_bytes()[184:] + + root = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\uart_tool") + result = Path('result') + # 正常启动镜像 + file_image = result / 'SLCP001_MEM.bin' + hex_boot = root / r"生产版本\bootloader.hex" + hex_main = root / r"生产版本\SLCP001_240525_1800_V1.12.hex" + hex_back = root / r"生产版本\SLCP001_240525_1800_B1.12.hex" + data_bins = test4(hex_boot, hex_main, hex_back) + file_image.write_bytes(data_bins[0].copy()) + + # 异常镜像-主分区md5错误 + file_image = result / 'SLCP001_MEM_b1.bin' + data_image = data_bins[0].copy() + data_image[0x0FC018: 0x0FC01A] = [0x00, 0x01] + file_image.write_bytes(data_image) + + # 异常镜像-备份分区md5错误 + file_image = result / 'SLCP001_MEM_b2.bin' + data_image = data_bins[0].copy() + data_image[0x0FE018: 0x0FE01A] = [0x00, 0x01] + file_image.write_bytes(data_image) + + # 异常镜像-双分区md5错误 + file_image = result / 'SLCP001_MEM_b3.bin' + data_image = data_bins[0].copy() + data_image[0x0FC018: 0x0FC01A] = [0x00, 0x01] + data_image[0x0FE018: 0x0FE01A] = [0x00, 0x01] + file_image.write_bytes(data_image)