import time from math import ceil from tqdm import tqdm from pathlib import Path from serial import Serial from tools.ByteConv import trans_list_to_str from func_frame import make_frame_dlt645, check_frame_dlt645, print_display from func_upgrade import GeneratePackage_SLCP101_p460 modbus_map = { # 1 - Hex # 2 - Int16 # 3 - lnt32 # 4 - str # 5 - addr # 6 - float 0x0E: ["故障字1", 1], 0x0F: ["故障字2", 1], 0x10: ["MPPT工作状态", 1], 0x11: ["系统工作状态", 1], 0x12: ["系统工作模式", 1], 0x13: ["输入电压", 2], 0x14: ["电感电流", 2], 0x15: ["12V电压", 2], 0x16: ["输出电压", 2], 0x17: ["输入电流", 2], 0x18: ["温度1", 2], 0x19: ["温度2", 2], 0x1A: ["输入功率", 3], 0x1C: ["设备温度", 2], 0x1D: ["开关机状态", 1], 0x1E: ["电池电压", 2], 0x1F: ["并机功率限值", 3], 0x60: ["光伏通道使能", 1], 0x61: ["最小启动输入电压", 2], 0x62: ["最大启动输入电压", 2], 0x63: ["最小停止输入电压", 2], 0x64: ["最大停止输入电压", 2], 0x65: ["最小MPPT电压", 2], 0x66: ["最大MPPT电压", 2], 0x67: ["最小启动输出电压", 2], 0x68: ["最大启动输出电压", 2], 0x69: ["最小停止输出电压", 2], 0x6A: ["最大停止输出电压", 2], 0x6B: ["输入过压保护值", 2], 0x6C: ["输出过压保护值", 2], 0x6D: ["输出欠压保护值", 2], 0x6E: ["电感过流保护值", 2], 0x6F: ["输入过流保护值", 2], 0x70: ["最小电感电流限值", 2], 0x71: ["最大电感电流限值", 2], 0x72: ["浮充电压阈值", 2], 0x73: ["三点法中间阈值", 2], 0x74: ["恒压充电电压", 2], 0x75: ["过温故障值", 2], 0x76: ["过温告警值", 2], 0x77: ["温度恢复值", 2], 0x78: ["最低满载电压", 2], 0x79: ["最高满载电压", 2], 0x7A: ["输入过载保护值", 3], 0x7C: ["最小功率限值", 3], 0x7E: ["最大功率限值", 3], 0x80: ["最大功率限值存储值", 3], 0x82: ["载波通信地址", 5, 3], 0x85: ["电压环out_max", 2], 0x86: ["电压环out_min", 2], 0x87: ["电流环out_max", 2], 0x88: ["电流环out_min", 2], 0x89: ["MPPT扰动系数k_d_vin", 2], 0x8A: ["dmin", 2], 0x8B: ["dmax", 2], 0x8C: ["扫描电压偏移scanvolt_offset", 2], 0x8D: ["电压环Kp", 3], 0x8F: ["电压环Ki", 3], 0x91: ["电流环Kp", 3], 0x93: ["电流环Ki", 3], 0x95: ["日志级别", 1], 0x96: ["日志输出方式", 1], 0x97: ["采样校准volt_in_a", 2], 0x98: ["采样校准volt_in_b", 2], 0x99: ["采样校准volt_out_a", 2], 0x9A: ["采样校准volt_out_b", 2], 0x9B: ["采样校准curr_in_a", 2], 0x9C: ["采样校准curr_in_b", 2], 0x9D: ["采样校准curr_induc_a", 2], 0x9E: ["采样校准curr_induc_b", 2], 0x9F: ["采样校准volt_12V_a", 2], 0xA0: ["采样校准volt_12V_b", 2], 0xA1: ["温度补偿temp1_b", 2], 0xA2: ["温度补偿temp2_b", 2], 0xA3: ["系统工作模式", 2], 0xA4: ["电感电流给定值curr_set", 2], 0xA5: ["抖动频率上限", 2], 0xA6: ["抖动频率下限", 2], 0xA7: ["电池电压判断限值", 2], 0xA8: ["MPPT追踪模式", 1], 0xA9: ["ADC参考电压", 2], 0xAA: ["保留", 1], 0xAB: ["保留", 1], 0xAC: ["保留", 1], 0xAD: ["保留", 1], 0xAE: ["保留", 1], 0xAF: ["保留", 1], 0x100: ["版本", 4, 16], 0x110: ["型号", 4, 16], 0x120: ["载波芯片地址", 4, 16], 0x130: ["厂商", 4, 8], 0x138: ["保留", 4, 8], 0x140: ["保留", 4, 16], 0x150: ["保留", 4, 16], 0x160: ["硬件", 4, 16], 0x170: ["SN", 4, 16], 0x180: ["MES", 4, 16], 0x190: ["Datetime", 4, 16], } class LaminaAdapter: """ 叠光适配器\优化器 """ def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): # 初始化串口通信 if com_name is not None: com_config = {} com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 self.__com = Serial(com_name, **com_config) else: self.__com =None self.flag_print = 'frame_print' in kwargs.keys() self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 self.block = { 'addr' : addr_645, 'type' : 'modbus', 'data' : { 'addr_dev' : addr_modbus, 'data_define': modbus_map, }, } self.output = { 'result': False, 'code_func': 0x00, } self.log = { 'send': 0, 'read': 0, 'keep-fail': 0, 'record': { 'config': None, 'data': None, }, } def __read_frame(self) -> bool: """ 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """ frame_recv = b'' time_start, time_current, flag_frame = time.time(), time.time(), False while (time_current - time_start) < self.time_out: time.sleep(self.time_gap) bytes_read = self.__com.read_all() time_current = time.time() if flag_frame and len(bytes_read) == 0: break elif len(bytes_read): flag_frame = True frame_recv += bytes_read try: self.output = check_frame_dlt645(frame_recv, self.block['data']) if self.flag_print: print("Read Frame: ", trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: print("Fail Data: " , trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] def __transfer_data(self, frame: bytearray) -> bool: """ 报文数据传输 """ if self.__com is None: print(trans_list_to_str(frame)) return False fail_count = 0 while fail_count < self.retry: frame_discard = self.__com.read_all() self.__com.write(frame) self.log['send'] += 1 if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: print("Send Frame: ", trans_list_to_str(frame)) if self.__read_frame(): if 'Regs' in self.output.keys(): print_display(self.output['Regs']) self.log['read'] += 1 break else: fail_count += 1 if fail_count >= self.log['keep-fail']: self.log['keep-fail'] = fail_count time.sleep(2*self.time_out) return fail_count < self.retry def frame_read(self, daddr=0x60, dlen=0x50): self.block['data']['type'] = 'read' self.block['data']['data_addr'] = daddr self.block['data']['data_len'] = dlen frame = make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_one(self, daddr=0x85, dval=-900): self.block['data']['type'] = 'write_one' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_dual(self, daddr=0x91, dval=600): self.block['data']['type'] = 'write_dual' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): self.block['data']['type'] = 'write_str' self.block['data']['data_addr'] = daddr self.block['data']['data_val'] = dval frame = make_frame_dlt645(self.block) return self.__transfer_data(frame) def frame_update(self, path_bin): """ 程序升级 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; """ param_saved = self.flag_print, self.retry, self.time_out self.block['data']['type'] = 'update' self.block['data']['step'] = 'start' self.block['data']['index'] = 0 self.block['data']['file'] = Path(path_bin).read_bytes() self.block['data']['header_offset'] = 184 # 启动帧 frame_master = bytearray(make_frame_dlt645(self.block)) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print('Upgrade Fail: start') return False self.block["data"]['file_block_size'] = self.output['upgrade']['length'] # 避免接收到延迟返回报文 time.sleep(self.time_out) # 文件传输 self.retry = 3 self.time_out = 1.5 self.block["data"]['step'] = 'trans' self.block['data']['index'] = 0 frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size']) for idx in tqdm(range(frame_total), desc="File Transmitting"): self.block["data"]['index'] = idx frame_master = make_frame_dlt645(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: trans data in {idx}') return False # 结束升级 self.time_out = 1 self.block["data"]['step'] = 'end' self.block["data"]['index'] += 1 frame_master = make_frame_dlt645(self.block['data']) if not self.__transfer_data(frame_master): self.flag_print, self.retry, self.time_out = param_saved print(f'Upgrade Fail: end') return False self.flag_print, self.retry, self.time_out = param_saved return True def test_communication(time_out=2): """ 通信成功率测试 """ time_start = time.time() param_saved = dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out dev_lamina.flag_print = False dev_lamina.retry = 1 try: while True: dev_lamina.frame_read(0x0C, 0x20) print(f"Time Stamp: {time.ctime()}") print(f"Success Frame: {dev_lamina.log['read']}") print(f"Failed Frame: {dev_lamina.log['send'] - dev_lamina.log['read']}") print(f"Max Series Failed Frame: {dev_lamina.log['keep-fail']}") time.sleep(time_out) finally: time_end = time.time() print("Test Result: ") print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}") print(f"Time Elapsed: {time_end - time_start}") print(f"Success Rate: {dev_lamina.log['read'] / dev_lamina.log['send'] * 100}%") dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out = param_saved def make_Pakeage(fp: Path): """ 生成升级包 """ hex_update = fp file_package = fp.parent / f'{hex_update.stem}.dat' file_update_bin = fp.parent / f'{hex_update.stem}.bin' data_package, data_update_bin = GeneratePackage_SLCP101_p460(hex_update) file_package.write_bytes(data_package) file_update_bin.write_bytes(data_update_bin) return file_package def test(): if 0: dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压 dev_lamina.frame_write_one(0xA9, 2000) # 写ADC参考电压:2.0V dev_lamina.frame_write_one(0xA9, 3300) # 写ADC参考电压:3.3V dev_lamina.frame_read(0x13, 1) # 读输入电压 dev_lamina.frame_read(0x16, 1) # 读输出电压 dev_lamina.frame_read(0x97, 4) # 读校准参数: Vin_a, Vin_b, Vout_a, Vout_b dev_lamina.frame_write_one(0x97, 1000) # 写校准参数Vin_a: 1.000 dev_lamina.frame_write_one(0x97, 1500) # 写校准参数Vin_a: 1.500 dev_lamina.frame_write_one(0x98, 100) # 写校准参数Vin_b: 1.00 dev_lamina.frame_write_one(0x98, 150) # 写校准参数Vin_b: 1.50 dev_lamina.frame_write_one(0x99, 1000) # 写校准参数Vout_a: 1.000 dev_lamina.frame_write_one(0x99, 1500) # 写校准参数Vout_a: 1.500 dev_lamina.frame_write_one(0x9A, 1000) # 写校准参数Vout_a: 1.00 dev_lamina.frame_write_one(0x9A, 1500) # 写校准参数Vout_a: 1.50 if 0: dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]) dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0x0A, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0xFF, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0xA1, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0x1A, 0x00, 0x00, 0x00, 0x00, 0x00]) dev_lamina.frame_write_str(0x82, [0x99, 0x99, 0x99, 0x99, 0x99, 0x99]) dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) if 0: dev_lamina.frame_write_one(0x52, 0x01) time.sleep(0.5) dev_lamina.frame_write_one(0x50, 0x00) if 0: while 1: code_mes = input("扫描数据") temp = code_mes[5:7] + code_mes[-10:] code_addr = [int(temp[i:i+2], 16) for i in range(0, len(temp), 2)] print(f"扫描结果: {code_mes}") print(f"载波地址: {trans_list_to_str(code_addr)}") dev_lamina.frame_read(0x100, 0x20) time.sleep(0.5) dev_lamina.frame_write_str(0x0180, list(code_mes)) time.sleep(0.5) dev_lamina.frame_read(0x180, 0x10) time.sleep(0.5) dev_lamina.frame_write_str(0x82, code_addr) time.sleep(0.5) dev_lamina.frame_read(0x80, 0x08) time.sleep(0.5) dev_lamina.frame_write_one(0xA3, 0x01) time.sleep(0.5) dev_lamina.frame_write_one(0x51, 0x01) time.sleep(5) dev_lamina.frame_read(0x0E, 0x13) time.sleep(5) dev_lamina.frame_read(0x0E, 0x13) time.sleep(0.5) dev_lamina.frame_write_one(0x50, 0x00) if 0: dev_lamina.frame_write_one(0x0054, 0x01) dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_write_one(0x0054, 0x03) dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_write_one(0x0054, 0x07) dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_write_one(0x0054, 0x06) dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_write_one(0x0054, 0x04) dev_lamina.frame_read(0x000E, 0x02) dev_lamina.frame_write_one(0x0054, 0x00) dev_lamina.frame_read(0x000E, 0x02) if 0: dev_lamina.frame_read(0x0E, 0x14) if 0: dev_lamina.frame_read(0x60, 0x60) if 0: dev_lamina.frame_read(0x70, 0x02) dev_lamina.frame_write_one(0x70, 2100) dev_lamina.frame_write_one(0x71, 2200) dev_lamina.frame_read(0x70, 0x02) dev_lamina.frame_write_one(0x70, 2300) dev_lamina.frame_write_one(0x71, 2100) dev_lamina.frame_read(0x70, 0x02) if 0: dev_lamina.frame_write_str(0x0170, list("SN202405201117-1")) dev_lamina.frame_write_str(0x0180, list("MES202405201117-2")) dev_lamina.frame_write_str(0x0190, list("D202405211500-3")) time.sleep(2) dev_lamina.frame_read(0x0170, 0x30) if __name__=='__main__': mode_config = { "Log": {'com_name': None, # 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40], }, "Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], 'frame_print': True, 'time_out': 0.1, 'retry': 1, 'retry_sub': 10}, "HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], 'frame_print': True, 'time_out': 0.5, 'retry': 3, 'retry_sub': 10}, } dev_lamina = LaminaAdapter(**mode_config['Debug']) dev_lamina.frame_read(0x0100, 0x20) if not hasattr(__builtins__,"__IPYTHON__"): # 工程-即时转换 file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") if not file_hex.exists(): raise Exception("工程编译目标文件不存在.") file_package = make_Pakeage(file_hex) version = "DLSY001_240911_1600_V1.01" addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00] while True: """ 自动检测升级流程(需要调试) """ ret = False while not ret or ('Reg' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Reg'][0x0100][1]): dev_lamina.frame_read(0x82, 3) ret = dev_lamina.frame_read(0x0100, 0x20) time.sleep(1) dev_lamina.frame_update(file_package) time.sleep(6) ret = dev_lamina.frame_read(0x0100, 0x20) if ret and (version == dev_lamina.output['Reg'][0x0100][1]): dev_lamina.frame_write_one(0x52, 0x01) print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}") dev_lamina.frame_write_str(0x82, addr) dev_lamina.frame_read(0x82, 3) addr[5] += 1 if addr[5] & 0x0F >= 10: addr[5] += 0x10 - 10