import time import socket from pathlib import Path from utl import trans_list_to_str from func_frame import make_frame_modbus, check_frame_modbus modbus_map = { } class EnergyRouter: def __init__(self, ip="192.168.100.10", port=7, adddr_modbus=0x01): self._ip = ip self._port = port self._addr_modbus =adddr_modbus self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_socket.connect((self._ip, self._port)) def frame_read(self, daddr=0x00, dlen=0x10): block_modbus = { 'addr_dev' : self._addr_modbus, 'data_addr' : daddr, 'data_len' : dlen, 'type' : 'read', } frame = make_frame_modbus(block_modbus) if self.tcp_socket is None: print(trans_list_to_str(frame)) return # self.tcp_socket.recv(8) self.tcp_socket.send(bytearray(frame)) time.sleep(0.5) frame_recv = self.tcp_socket.recv(128) output_text = check_frame_modbus(frame_recv, block_modbus) print(output_text) def frame_update(self, path_bin): """ 程序升级 """ block_modbus = { 'addr_dev' : self.addr_modbus, 'type' : 'update', 'step' : 'start', 'index' : 0, 'file' : Path(path_bin).read_bytes(), 'header_offset': 128, } # 启动帧 frame_master = bytearray(make_frame_modbus(block_modbus)) # 等待擦除完成返回 try_times = 3000 self.tcp_socket.recv() while try_times: time.sleep(0.4) self.tcp_socket.send(frame_master) frame_slave = self.tcp_socket.recv() if not frame_slave: try_times -= 1 continue block_modbus['file_block_size'] = check_frame_modbus(frame_slave, block_modbus) break if block_modbus['file_block_size'] == 0: raise("Error slave response.") # 避免接收到延迟返回报文 time.sleep(0.4) # 文件传输 block_modbus['step'] = 'trans' data_remain = len(block_modbus['file']) - block_modbus['header_offset'] while data_remain > 0: frame_master = bytearray(make_frame_modbus(block_modbus)) self.tcp_socket.recv() self.tcp_socket.send(frame_master) time.sleep(0.2) frame_slave = None while not frame_slave: frame_slave = self.tcp_socket.recv() check_frame_modbus(frame_slave, block_modbus) block_modbus['index'] += 1 data_remain -= block_modbus['file_block_size'] # 结束升级 block_modbus['step'] = 'end' frame_master = bytearray(make_frame_modbus(block_modbus)) self.tcp_socket.recv() self.tcp_socket.send(frame_master) time.sleep(0.1) frame_slave = None while not frame_slave: frame_slave = self.tcp_socket.recv() check_frame_modbus(frame_slave[:18], block_modbus) if __name__ == "__main__": """ 升级测试 """ path_file = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.dat") dev_ep = EnergyRouter() dev_ep.frame_read() dev_ep.frame_update(path_file)