import time from pathlib import Path from serial import Serial from serial.tools import list_ports from abc import ABC, abstractmethod from .tools import ByteConv from .function import protocols class DeviceSerial: """ 串口通信设备原型 Note: 串口资源释放与重复开启 """ def __init__(self, com_name, callbacks, **kwargs): """ 初始化设备 """ self.__com = None if com_name is not None: self.open_connection(com_name, **kwargs) self.flag_print = kwargs['frame_print'] if 'frame_print' in kwargs.keys() else False self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1 self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01 self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1 match callbacks: case (maker, parser): self._frame_maker = maker if maker is not None else lambda self: '' self._frame_parser = parser if parser is not None else lambda self, frame: '' case _: self._frame_maker = lambda self: '' self._frame_parser = lambda self, frame: '' self.output = { 'result': False, 'code_func': 0x00, } self.log = { 'send': 0, 'read': 0, 'keep-fail': 0, 'record': { 'config': None, 'data': None, }, } def __read_frame(self) -> bool: """ 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """ frame_recv = b'' time_start, time_current, flag_frame = time.time(), time.time(), False while (time_current - time_start) < self.time_out: time.sleep(self.time_gap) time_current = time.time() bytes_read = self.__com.read_all() if flag_frame and len(bytes_read) == 0: break elif len(bytes_read): flag_frame = True frame_recv += bytes_read try: self.output = self._frame_parser(frame_recv) if self.flag_print: print("Read Frame: ", ByteConv.trans_list_to_str(frame_recv)) except Exception as ex: print("Error Info: ", ex) if self.flag_print and frame_recv: print("Fail Data: " , ByteConv.trans_list_to_str(frame_recv)) self.output['result'] = False return self.output['result'] def _transfer_data(self) -> bool: """ 串口收发报文, 包含重试逻辑与数据打印 """ # 生成发送帧 frame: bytearray = self._frame_maker() if self.__com is None: """ 无效通信接口, 打印报文后返回 """ print(ByteConv.trans_list_to_str(frame)) return False fail_count = 0 while fail_count < self.retry: frame_discard = self.__com.read_all() self.__com.write(frame) self.log['send'] += 1 if self.flag_print and frame_discard: print("Discard Data: " , frame_discard) if self.flag_print: print("Send Frame: ", ByteConv.trans_list_to_str(frame)) time.sleep(2 * self.time_gap) if self.__read_frame(): if (self.flag_print is not None) and 'Regs' in self.output.keys(): protocols.print_display(self.output['Regs']) self.log['read'] += 1 break fail_count += 1 self.log['keep-fail'] = fail_count if fail_count >= self.log['keep-fail'] else self.log['keep-fail'] time.sleep(2 * self.time_out) return fail_count < self.retry def close_connection(self) -> bool: """ 关闭连接, 释放通信资源 """ if self.__com is not None: self.__com.close() return self.__com is None or (not self.__com.is_open) def open_connection(self, port=None, **kwargs) -> bool: """ 打开连接, 更新或重新配置通信资源 """ com_config = { 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, } kwargs = kwargs if kwargs else None serial_close = lambda com: com.close() if com.isOpen() else None serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports()) match (self.__com, port, kwargs): case (None, str() as port, None): """ 使用默认参数打开串口 """ if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) self.__com = Serial(port, timeout=0, **com_config) case (None, str() as port, dict() as kwargs): """ 使用指定参数打开串口 """ if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits'] self.__com = Serial(port, timeout=0, **com_config) case (Serial() as com, None, None): """ 无参数重开串口 """ serial_close(com) com.open() case (Serial() as com, port, None): """ 重新指定端口号并打开串口 """ serial_close(com) if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) com.port = port com.open() case (Serial() as com, str() as port, dict() as kwargs): """ 重新指定端口号与配置并打开串口 """ serial_close(com) if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) com.port = port com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] com.parity = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] com.bytesize = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] com.stopbits = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else com_config['stopbits'] com.open() case _: """ 匹配失败, 报错 """ raise ValueError("Invalid config.") return self.__com.is_open @abstractmethod def frame_read(self, daddr=0x60, dlen=0x50) -> bool: pass @abstractmethod def frame_write(self, daddr, dlen=1, dval=None) -> bool: pass @abstractmethod def frame_update(self, path_file: Path, makefile: bool = False) -> bool: pass