From a417ceb1ee4a4bb0ef00f14e46e24c460843490c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=20=E6=B3=BD=E9=9A=86?= Date: Fri, 2 Aug 2024 17:16:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8F=A0=E5=85=89=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E5=99=A8=E9=80=9A=E4=BF=A1=E5=AF=B9=E8=B1=A1;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source/dev_LaminaController.py | 435 +++++++++++++++++++++++++++++++++ 1 file changed, 435 insertions(+) create mode 100644 source/dev_LaminaController.py diff --git a/source/dev_LaminaController.py b/source/dev_LaminaController.py new file mode 100644 index 0000000..e9aa964 --- /dev/null +++ b/source/dev_LaminaController.py @@ -0,0 +1,435 @@ +import time +from pathlib import Path +from serial import Serial +from utl import trans_list_to_str +from func_frame import make_frame_dlt645, check_frame_dlt645 + +modbus_map = { + # 1 - Hex + # 2 - Int16 + # 3 - lnt32 + # 4 - str + # 5 - addr + # 6 - float + 0x0C: ["告警字1", 1], + 0x0D: ["告警字2", 1], + 0x0E: ["故障字1", 1], + 0x0F: ["故障字2", 1], + 0x10: ["系统工作状态" , 1], + 0x11: ["Boost1工作状态" , 1], + 0x12: ["Boost2工作状态" , 1], + 0x13: ["开关机状态" , 1], + 0x14: ["光伏组串1输入电压" , 2], + 0x15: ["光伏组串2输入电压" , 2], + 0x16: ["Boost1电感电流" , 2], + 0x17: ["Boost2电感电流" , 2], + 0x18: ["Boost输出电压" , 2], + 0x19: ["Boost输出总电流" , 2], + 0x1A: ["Boost1功率" , 3], + 0x1C: ["Boost2功率" , 3], + 0x1E: ["输入总功率" , 3], + 0x20: ["LLC输出电压" , 2], + 0x21: ["端口输出电压" , 2], + 0x22: ["LLC输出电流均值" , 2], + 0x23: ["LLC输出电流峰值" , 2], + 0x24: ["绝缘检测电压" , 2], + 0x25: ["散热片温度" , 2], + 0x26: ["腔体1温度" , 2], + 0x27: ["腔体2温度" , 2], + 0x28: ["设备温度" , 2], + + 0x50: ["启停控制命令" , 2], + 0x51: ["故障清除命令" , 2], + 0x52: ["参数还原命令" , 2], + 0x53: ["设备复位命令" , 2], + + 0x60: ["整机运行使能", 1], + 0x61: ["最小启动允许输入电压", 2], + 0x62: ["最大启动允许输入电压", 2], + 0x63: ["最小停机输入电压", 2], + 0x64: ["最大停机输入电压", 2], + 0x65: ["最小启动允许输出电压", 2], + 0x66: ["最大启动允许输出电压", 2], + 0x67: ["最小停止允许输出电压", 2], + 0x68: ["最大停止允许输出电压", 2], + 0x69: ["最小MPPT电流限值", 2], + 0x6A: ["最大MPPT电流限值", 2], + 0x6B: ["保留数据项", 2], + 0x6C: ["最大功率限值", 3], + 0x6E: ["最大功率限值存储值", 3], + 0x70: ["Boost输入过压保护值", 2], + 0x71: ["Boost输出过压保护值", 2], + 0x72: ["LLC输出过压保护值", 2], + 0x73: ["LLC输出欠压保护值", 2], + 0x74: ["Boost电感过流保护值", 2], + 0x75: ["LLC输出电流均值保护值", 2], + 0x76: ["LLC输出电流峰值保护值", 2], + 0x77: ["保留数据项", 2], + 0x78: ["过载保护值", 3], + 0x7A: ["过温故障值", 2], + 0x7B: ["过温告警值", 2], + 0x7C: ["过温恢复值", 2], + 0x7D: ["输出继电器故障判断差值", 2], + 0x7E: ["保留数据项", 1], + 0x7F: ["保留数据项", 1], + 0x80: ["三点法中间阈值", 2], + 0x81: ["浮充电压", 2], + 0x82: ["恒压充电电压", 2], + 0x83: ["llc软起开始电压", 2], + 0x84: ["boost开始运行电压", 2], + 0x85: ["boost停止运行电压", 2], + 0x86: ["绝缘检测正阻抗限值", 3], + 0x88: ["绝缘检测负阻抗限值", 3], + 0x8A: ["保留地址项", 2], + 0x8B: ["保留地址项", 2], + 0x8C: ["保留地址项", 2], + 0x8D: ["保留地址项", 2], + 0x8E: ["保留地址项", 2], + 0x8F: ["保留地址项", 2], + + 0x100: ["程序版本字符串", 4, 16], + 0x110: ["设备型号字符串", 4, 16], + 0x120: ["保留地址项", 4, 16], + 0x130: ["生产厂家字符串", 4, 8], + 0x138: ["保留地址项", 4, 8], + 0x140: ["保留地址项", 4, 16], + 0x150: ["保留地址项", 4, 16], + 0x160: ["硬件版本字符串", 4, 16], + 0x170: ["设备序列号", 4, 16], + 0x180: ["设备MES码", 4, 16], + 0x190: ["出厂日期批次", 4, 16], +} + + +class LaminaController: + def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs): + # 初始化串口通信 + if com_name is not None: + com_config = {} + com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200 + com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N' + com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8 + com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1 + self.__com = Serial(com_name, **com_config) + else: + self.__com =None + + self.flag_print = 'frame_print' in kwargs.keys() + self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 + self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 + self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1 + + self.block = { + 'addr' : addr_645, + 'type' : 'modbus', + 'data' : { + 'addr_dev' : addr_modbus, + 'data_define': modbus_map, + }, + } + + def __transfer_data(self, frame): + """ 报文数据传输 """ + + if self.__com is None: + print(trans_list_to_str(frame)) + return False + + cnt = 0 + while cnt < self.retry: + self.__com.read_all() + self.__com.write(bytearray(frame)) + + cnt_sub = 0 + frame_recv = None + while not frame_recv: + time.sleep(self.time_out) + frame_recv = self.__com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + output_text = check_frame_dlt645(frame_recv, block=self.block) + except Exception as e: + print(e) + cnt += 1 + continue + print(output_text) + break + + if self.flag_print: + print(trans_list_to_str(frame)) + print(trans_list_to_str(frame_recv)) + + return cnt < self.retry + + def frame_read(self, daddr=0x60, dlen=0x30): + self.block['data']['type'] = 'read' + self.block['data']['data_addr'] = daddr + self.block['data']['data_len'] = dlen + frame = make_frame_dlt645(self.block) + + return self.__transfer_data(frame) + + def frame_write_one(self, daddr=0x85, dval=-900): + self.block['data']['type'] = 'write_one' + self.block['data']['data_addr'] = daddr + self.block['data']['data_val'] = dval + frame = make_frame_dlt645(self.block) + + return self.__transfer_data(frame) + + def frame_write_dual(self, daddr=0x91, dval=600): + self.block['data']['type'] = 'write_dual' + self.block['data']['data_addr'] = daddr + self.block['data']['data_val'] = dval + frame = make_frame_dlt645(self.block) + + return self.__transfer_data(frame) + + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): + self.block['data']['type'] = 'write_str' + self.block['data']['data_addr'] = daddr + self.block['data']['data_val'] = dval + frame = make_frame_dlt645(self.block) + + return self.__transfer_data(frame) + + def frame_update(self, path_bin): + """ 程序升级 + 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; + + """ + self.block['data']['type'] = 'update' + self.block['data']['step'] = 'start' + self.block['data']['index'] = 0 + self.block['data']['file'] = Path(path_bin).read_bytes() + self.block['data']['header_offset'] = 184 + # 启动帧 + frame_master = bytearray(make_frame_dlt645(self.block)) + + # 等待擦除完成返回 + try_times = 30 + self.__com.read_all() + while try_times: + time.sleep(self.time_out) + self.__com.write(frame_master) + frame_slave = self.__com.read_all() + if not frame_slave: + try_times -= 1 + continue + + _, _, self.block["data"]['file_block_size'] = check_frame_dlt645(frame_slave, self.block) + break + + if self.block["data"]['file_block_size'] == 0: + raise Exception("Error slave response.") + + # 避免接收到延迟返回报文 + time.sleep(self.time_out) + + # 文件传输 + self.block["data"]['step'] = 'trans' + data_remain = len(self.block["data"]['file']) - self.block['data']['header_offset'] + while data_remain > 0: + frame_master = bytearray(make_frame_dlt645(self.block)) + + cnt = 0 + while cnt < self.retry: + self.__com.read_all() + self.__com.write(frame_master) + + cnt_sub = 0 + frame_slave = None + while not frame_slave: + time.sleep(self.time_out) + frame_slave = self.__com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + ret = check_frame_dlt645(frame_slave, self.block) + except Exception as e: + print(e) + ret = False, 0, 0 + if ret[0]: + break + else: + cnt += 1 + + if cnt >= self.retry: + raise Exception("Error, Retry failed.") + + self.block["data"]['index'] += 1 + data_remain -= self.block["data"]['file_block_size'] + + # 结束升级 + self.block["data"]['step'] = 'end' + frame_master = bytearray(make_frame_dlt645(self.block)) + + cnt = 0 + while cnt < self.retry: + self.__com.read_all() + self.__com.write(frame_master) + + cnt_sub = 0 + frame_slave = None + while not frame_slave: + time.sleep(self.time_out) + frame_slave = self.__com.read_all() + cnt_sub += 1 + if cnt_sub >= self.retry_sub: + break + + try: + ret = check_frame_dlt645(frame_slave, self.block) + except Exception as e: + print(e) + ret = False, 0, 0 + if ret[0]: + break + else: + cnt += 1 + + if cnt >= self.retry: + raise Exception("Error, Retry failed.") + + +def test_communication(time_out=2): + """ 通信成功率测试 """ + log_success = 0 + log_failed = 0 + log_failedseries = 0 + cnt_failedseries = 0 + time_start = time.time() + saveconfig_print = dev_lamina.flag_print + dev_lamina.flag_print = False + try: + while 1: + if dev_lamina.frame_read(0x0E, 0x13): + log_success += 1 + cnt_failedseries = 0 + else: + log_failed += 1 + cnt_failedseries += 1 + if log_failedseries <= cnt_failedseries: + log_failedseries = cnt_failedseries + print(f"Time Stamp: {time.ctime()}") + print(f"Success Frame: {log_success}") + print(f"Failed Frame: {log_failed}") + print(f"Max Series Failed Frame: {log_failedseries}") + time.sleep(time_out) + finally: + time_end = time.time() + print("Test Result: ") + print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}") + print(f"Time Elapsed: {time_end - time_start}") + print(f"Success Rate: {log_success / (log_success + log_failed) * 100}%") + dev_lamina.flag_print = saveconfig_print + +if __name__=='__main__': + mode_config = { + "Log": {'com_name': None, + # 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40], + }, + "Debug": {'com_name': 'COM8', 'baudrate': 250000, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, + 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], + 'frame_print': True, + 'time_out': 0.2, 'retry': 1, 'retry_sub': 10}, + } + + + dev_lamina = LaminaController(**mode_config['Debug']) + + dev_lamina.frame_read(0x0100, 0x20) + + if 0: + dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]) + dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + dev_lamina.frame_write_str(0x82, [0x0A, 0x00, 0x00, 0x00, 0x00, 0x00]) + dev_lamina.frame_write_str(0x82, [0xFF, 0x00, 0x00, 0x00, 0x00, 0x00]) + dev_lamina.frame_write_str(0x82, [0xA1, 0x00, 0x00, 0x00, 0x00, 0x00]) + dev_lamina.frame_write_str(0x82, [0x1A, 0x00, 0x00, 0x00, 0x00, 0x00]) + dev_lamina.frame_write_str(0x82, [0x99, 0x99, 0x99, 0x99, 0x99, 0x99]) + dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) + if 0: + dev_lamina.frame_write_one(0x52, 0x01) + time.sleep(0.5) + dev_lamina.frame_write_one(0x50, 0x00) + if 0: + while 1: + code_mes = input("扫描数据") + temp = code_mes[5:7] + code_mes[-10:] + code_addr = [int(temp[i:i+2], 16) for i in range(0, len(temp), 2)] + print(f"扫描结果: {code_mes}") + print(f"载波地址: {trans_list_to_str(code_addr)}") + dev_lamina.frame_read(0x100, 0x20) + time.sleep(0.5) + dev_lamina.frame_write_str(0x0180, list(code_mes)) + time.sleep(0.5) + dev_lamina.frame_read(0x180, 0x10) + time.sleep(0.5) + dev_lamina.frame_write_str(0x82, code_addr) + time.sleep(0.5) + dev_lamina.frame_read(0x80, 0x08) + time.sleep(0.5) + dev_lamina.frame_write_one(0xA3, 0x01) + time.sleep(0.5) + dev_lamina.frame_write_one(0x51, 0x01) + time.sleep(5) + dev_lamina.frame_read(0x0E, 0x13) + time.sleep(5) + dev_lamina.frame_read(0x0E, 0x13) + time.sleep(0.5) + dev_lamina.frame_write_one(0x50, 0x00) + + if 0: + dev_lamina.frame_write_one(0x0054, 0x01) + dev_lamina.frame_read(0x000E, 0x02) + dev_lamina.frame_write_one(0x0054, 0x03) + dev_lamina.frame_read(0x000E, 0x02) + dev_lamina.frame_write_one(0x0054, 0x07) + dev_lamina.frame_read(0x000E, 0x02) + dev_lamina.frame_write_one(0x0054, 0x06) + dev_lamina.frame_read(0x000E, 0x02) + dev_lamina.frame_write_one(0x0054, 0x04) + dev_lamina.frame_read(0x000E, 0x02) + dev_lamina.frame_write_one(0x0054, 0x00) + dev_lamina.frame_read(0x000E, 0x02) + if 0: + dev_lamina.frame_read(0x0E, 0x14) + if 0: + dev_lamina.frame_read(0x60, 0x60) + if 0: + dev_lamina.frame_read(0x70, 0x02) + dev_lamina.frame_write_one(0x70, 2100) + dev_lamina.frame_write_one(0x71, 2200) + dev_lamina.frame_read(0x70, 0x02) + dev_lamina.frame_write_one(0x70, 2300) + dev_lamina.frame_write_one(0x71, 2100) + dev_lamina.frame_read(0x70, 0x02) + if 0: + dev_lamina.frame_write_str(0x0170, list("SN202405201117-1")) + dev_lamina.frame_write_str(0x0180, list("MES202405201117-2")) + dev_lamina.frame_write_str(0x0190, list("D202405211500-3")) + time.sleep(2) + dev_lamina.frame_read(0x0170, 0x30) + + if not hasattr(__builtins__,"__IPYTHON__"): + path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240625_2030_V1.04.bin") + # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240520_0000_T1.11.bin") + # 生产镜像版本 + # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240525_1800_V1.12.bin") + # 江苏发货产品灌装版本 + # path_bin = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\tools\upgrade\SLCP001_240603_2100_V1.18.bin") + dev_lamina.frame_update(path_bin) + + time.sleep(6) + + dev_lamina.frame_read(0x0100, 0x20) + + dev_lamina.frame_write_one(0x52, 0x01) + +