import time import hashlib from math import ceil from tqdm import tqdm from pathlib import Path from serial import Serial from . import tools from . import function ParamMap_LaminaAdapter = { # 1 - Hex # 2 - Int16 # 3 - lnt32 # 4 - str # 5 - addr # 6 - float 0x00: ["硬件版本识别电压", 2, 1000], 0x0E: ["故障字1", 1], 0x0F: ["故障字2", 1], 0x10: ["MPPT工作状态", 1], 0x11: ["系统工作状态", 1], 0x12: ["系统工作模式", 1], 0x13: ["输入电压", 2, 10], 0x14: ["电感电流", 2, 100], 0x15: ["12V电压", 2, 10], 0x16: ["输出电压", 2, 10], 0x17: ["输入电流", 2, 100], 0x18: ["温度1", 2, 10], 0x19: ["温度2", 2, 10], 0x1A: ["输入功率", 3, 1000], 0x1C: ["设备温度", 2, 10], 0x1D: ["开关机状态", 1], 0x1E: ["电池电压", 2, 10], 0x1F: ["并机功率限值", 3, 1000], 0x21: ["输出电容电电压", 2, 10], 0x22: ["参考电压", 2, 10], 0x60: ["光伏通道使能", 1], 0x61: ["最小启动输入电压", 2, 10], 0x62: ["最大启动输入电压", 2, 10], 0x63: ["最小停止输入电压", 2, 10], 0x64: ["最大停止输入电压", 2, 10], 0x65: ["最小MPPT电压", 2, 10], 0x66: ["最大MPPT电压", 2, 10], 0x67: ["最小启动输出电压", 2, 10], 0x68: ["最大启动输出电压", 2, 10], 0x69: ["最小停止输出电压", 2, 10], 0x6A: ["最大停止输出电压", 2, 10], 0x6B: ["输入过压保护值", 2, 10], 0x6C: ["输出过压保护值", 2, 10], 0x6D: ["输出欠压保护值", 2, 10], 0x6E: ["电感过流保护值", 2, 100], 0x6F: ["输入过流保护值", 2, 100], 0x70: ["最小电感电流限值", 2, 100], 0x71: ["最大电感电流限值", 2, 100], 0x72: ["浮充电压阈值", 2, 10], 0x73: ["三点法中间阈值", 2, 10], 0x74: ["恒压充电电压", 2, 10], 0x75: ["过温故障值", 2, 10], 0x76: ["过温告警值", 2, 10], 0x77: ["温度恢复值", 2, 10], 0x78: ["最低满载电压", 2, 10], 0x79: ["最高满载电压", 2, 10], 0x7A: ["输入过载保护值", 3, 1000], 0x7C: ["最小功率限值", 3, 1000], 0x7E: ["最大功率限值", 3, 1000], 0x80: ["最大功率限值存储值", 3, 1000], 0x82: ["载波通信地址", 5, 3], 0x85: ["电压环out_max", 2, 100], 0x86: ["电压环out_min", 2, 100], 0x87: ["电流环out_max", 2, 100], 0x88: ["电流环out_min", 2, 100], 0x89: ["MPPT扰动系数k_d_vin", 2, 100], 0x8A: ["dmin", 2, 1000], 0x8B: ["dmax", 2, 1000], 0x8C: ["扫描电压偏移scanvolt_offset", 2, 10], 0x8D: ["电压环Kp", 3, 100000], 0x8F: ["电压环Ki", 3, 100000], 0x91: ["电流环Kp", 3, 100000], 0x93: ["电流环Ki", 3, 100000], 0x95: ["日志级别", 1], 0x96: ["日志输出方式", 1], 0x97: ["采样校准volt_in_a", 2, 1000], 0x98: ["采样校准volt_in_b", 2, 100], 0x99: ["采样校准volt_out_a", 2, 1000], 0x9A: ["采样校准volt_out_b", 2, 100], 0x9B: ["采样校准curr_in_a", 2, 1000], 0x9C: ["采样校准curr_in_b", 2, 100], 0x9D: ["采样校准curr_induc_a", 2, 1000], 0x9E: ["采样校准curr_induc_b", 2, 100], 0x9F: ["采样校准volt_12V_a", 2, 1000], 0xA0: ["采样校准volt_12V_b", 2, 100], 0xA1: ["温度补偿temp1_b", 2, 10], 0xA2: ["温度补偿temp2_b", 2, 10], 0xA3: ["系统工作模式", 2], 0xA4: ["电感电流给定值curr_set", 2, 100], 0xA5: ["抖动频率上限", 2, 100], 0xA6: ["抖动频率下限", 2, 100], 0xA7: ["电池电压判断限值", 2, 10], 0xA8: ["MPPT追踪模式", 1], 0xA9: ["ADC参考电压", 2, 1000], 0xAA: ["保留", 1], 0xAB: ["保留", 1], 0xAC: ["保留", 1], 0xAD: ["保留", 1], 0xAE: ["保留", 1], 0xAF: ["保留", 1], 0xB0: ["版本", 4, 16], 0x100: ["版本(ODM)", 4, 16], 0x110: ["型号", 4, 16], 0x120: ["载波芯片地址", 4, 16], 0x130: ["厂商", 4, 8], 0x138: ["保留", 4, 8], 0x140: ["保留", 4, 16], 0x150: ["保留", 4, 16], 0x160: ["硬件", 4, 16], 0x170: ["SN", 4, 16], 0x180: ["MES", 4, 16], 0x190: ["Datetime", 4, 16], 0x1A0: ["版本前缀", 4, 5], } class LaminaAdapter: """ 叠光适配器\优化器 """ def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): # 初始化串口通信 if com_name is not None: com_config = {} com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 self.__com = Serial(com_name, **com_config) else: self.__com =None self.flag_print = 'frame_print' in kwargs.keys() self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 self.block = { 'addr' : addr_645, 'type' : 'modbus', 'data' : { 'addr_dev' : addr_modbus, 'data_define': ParamMap_LaminaAdapter, }, } self.output = { 'result': False, 'code_func': 0x00, } self.log = { 'send': 0, 'read': 0, 'keep-fail': 0, 'record': { 'config': None, 'data': None, }, } def __read_frame(self) -> bool: """ 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """ frame_recv = b'' time_start, time_current, flag_frame = time.time(), time.time(), False while (time_current - time_start) < self.time_out: time.sleep(self.time_gap) bytes_read = self.__com.read_all() time_current = time.time() if flag_frame and len(bytes_read) == 0: break elif len(bytes_read): flag_frame = True frame_recv += bytes_read try: self.output = function.frame.check_frame_dlt645(frame_recv, self.block) if self.flag_print: print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] def __transfer_data(self, frame: bytearray) -> bool: """ 报文数据传输 """ if self.__com is None: print(tools.ByteConv.trans_list_to_str(frame)) return False fail_count = 0 while fail_count < self.retry: frame_discard = self.__com.read_all() self.__com.write(frame) self.log['send'] += 1 if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame)) if self.__read_frame(): if 'Regs' in self.output.keys(): function.frame.print_display(self.output['Regs']) self.log['read'] += 1 break else: fail_count += 1 if fail_count >= self.log['keep-fail']: self.log['keep-fail'] = fail_count time.sleep(2*self.time_out) return fail_count < self.retry def frame_read(self, daddr=0x60, dlen=0x50): self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_one(self, daddr=0x85, dval=-900): self.block['data']['type'] = 'write_one' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_dual(self, daddr=0x91, dval=600): self.block['data']['type'] = 'write_dual' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = function.frame.make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_update(self, path_bin): """ 程序升级 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; """ param_saved = self.flag_print, self.retry, self.time_out self.flag_print = False self.block['data']['type'] = 'update' self.block['data']['step'] = 'start' self.block['data']['index'] = 0 self.block['data']['file'] = Path(path_bin).read_bytes() self.block['data']['header_offset'] = 184 # 启动帧 frame_master = bytearray(function.frame.make_frame_dlt645(self.block)) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print('Upgrade Fail: start') return False self.block["data"]['file_block_size'] = self.output['upgrade']['length'] # 避免接收到延迟返回报文 time.sleep(self.time_out) # 文件传输 self.retry = 3 self.time_out = 1.5 self.block["data"]['step'] = 'trans' self.block['data']['index'] = 0 frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) for idx in tqdm(range(frame_total), desc="File Transmitting"): self.block["data"]['index'] = idx frame_master = function.frame.make_frame_dlt645(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: trans data in {idx}') return False # 结束升级 self.time_out = 1 self.block["data"]['step'] = 'end' self.block["data"]['index'] += 1 frame_master = function.frame.make_frame_dlt645(self.block) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: end') return False self.flag_print, self.retry, self.time_out = param_saved return True def GeneratePackage_SLCP001_p4a0(path_hex: Path): """ 叠光适配器-460平台版本 生成升级包 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"SLCP001"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000) encrypt_main = function.file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") print(f"File name: {path_hex.name}") print(f"File Info:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") # 组装镜像 Image = [0xFF] * (len(main_header) + len(encrypt_main)) offset_image = 0 Image[offset_image: offset_image + len(main_header)] = main_header offset_image += len(main_header) Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main return bytearray(Image), bin_main def GeneratePackage_SLCP101_p460(path_hex: Path): """ 叠光适配器-460平台版本 生成升级包 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"SLCP101"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) encrypt_main = function.file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") print(f"File name: {path_hex.name}") print(f"File Info:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") # 组装镜像 Image = [0xFF] * (len(main_header) + len(encrypt_main)) offset_image = 0 Image[offset_image: offset_image + len(main_header)] = main_header offset_image += len(main_header) Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main return bytearray(Image), bin_main def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path): """ 叠光适配器-4A0平台版本 镜像生成 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"SLCP001"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000) bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000) bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000) encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") print(f"Main File:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") print(f"Back File:") print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") # 组装镜像 Image = [0xFF] * 0x100000 offset_image = 0 Image[offset_image: offset_image + len(bin_boot)] = bin_boot offset_image = 0x010000 Image[offset_image: offset_image + len(bin_main)] = bin_main offset_image = 0x0BC000 Image[offset_image: offset_image + len(bin_back)] = bin_back offset_image = 0x0FC000 Image[offset_image: offset_image + len(main_header)] = main_header offset_image = 0x0FE000 Image[offset_image: offset_image + len(back_header)] = back_header return bytearray(Image), main_header, back_header def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path): """ 叠光适配器-460平台版本 镜像生成 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"SLCP101"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") print(f"Main File:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") print(f"Back File:") print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") # 组装镜像 Image = [0xFF] * 0x058000 offset_image = 0 Image[offset_image: offset_image + len(bin_boot)] = bin_boot offset_image = 0x00C000 Image[offset_image: offset_image + len(bin_main)] = bin_main offset_image = 0x030000 Image[offset_image: offset_image + len(bin_back)] = bin_back offset_image = 0x054000 Image[offset_image: offset_image + len(main_header)] = main_header offset_image = 0x056000 Image[offset_image: offset_image + len(back_header)] = back_header return bytearray(Image), main_header, back_header def GeneratePackage_DLSY001_p460(path_hex: Path): """ 叠光优化器-460平台版本 生成升级包 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"DLSY001"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000) encrypt_main = function.file_upgrade.file_encryption(bin_main) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_hex.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Package generated successfully.") print(f"File name: {path_hex.name}") print(f"File Info:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") # 组装镜像 Image = [0xFF] * (len(main_header) + len(encrypt_main)) offset_image = 0 Image[offset_image: offset_image + len(main_header)] = main_header offset_image += len(main_header) Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main return bytearray(Image), bin_main def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path): """ 叠光优化器-460平台版本 镜像生成 """ config = { 'prod_type': [0x45, 0x00], # 产品类型 'method_compress': False, # 文件压缩 'prog_id': list(b"DLSY001"), # 程序识别号 'prog_type': 'app', # 程序类型 'area_code': [0x00, 0x00], # 地区 } bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000) bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000) bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000) encrypt_main = function.file_upgrade.file_encryption(bin_main) encrypt_back = function.file_upgrade.file_encryption(bin_back) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main)) config['hex_name'] = list(path_main.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") md5_ctx = hashlib.md5() md5_ctx.update(bin_back) config["md5"] = list(md5_ctx.digest()) config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back)) config['hex_name'] = list(path_back.name.encode())[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (back_header:=function.file_upgrade.build_header_new(config)) is None: raise Exception("Header tag oversize. ") print("Merge Image generated successfully.") print(f"Main File:") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") print(f"Back File:") print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") # 组装镜像 Image = [0xFF] * 0x058000 offset_image = 0 Image[offset_image: offset_image + len(bin_boot)] = bin_boot offset_image = 0x00C000 Image[offset_image: offset_image + len(bin_main)] = bin_main offset_image = 0x030000 Image[offset_image: offset_image + len(bin_back)] = bin_back offset_image = 0x054000 Image[offset_image: offset_image + len(main_header)] = main_header offset_image = 0x056000 Image[offset_image: offset_image + len(back_header)] = back_header return bytearray(Image), main_header, back_header