diff --git a/source/dev_EnergyRouter.py b/source/dev_EnergyRouter.py new file mode 100644 index 0000000..00ff4a9 --- /dev/null +++ b/source/dev_EnergyRouter.py @@ -0,0 +1,24 @@ +import time +import socket +from pathlib import Path +from utl import trans_list_to_str +from func_frame import make_frame_modbus, check_frame_modbus + + +class EnergyRouter: + def __init__(self, ip="192.168.100.10", port=7, adddr_modbus=0x01): + self._ip = ip + self._port = port + self._adddr_modbus =adddr_modbus + + self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.tcp_socket.connect((self._ip, self._port)) + + def frame_update(fp): + pass + + +if __name__ == "__main__": + dev_ep = EnergyRouter() + + \ No newline at end of file diff --git a/source/dev_Lamina.py b/source/dev_Lamina.py new file mode 100644 index 0000000..44f7c61 --- /dev/null +++ b/source/dev_Lamina.py @@ -0,0 +1,173 @@ +import time +from pathlib import Path +from serial import Serial +from utl import trans_list_to_str +from func_frame import make_frame_dlt645, check_frame_dlt645 + +class LaminaAdapter: + def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01): + # 初始化串口通信 + if com_name is not None: + self.com = Serial(com_name, baudrate=115200, parity='N', timeout=2) + else: + self.com =None + + # 设置645协议地址 + self.addr_645 = addr_645 + # 设置Modbus地址 + self.addr_modbus = addr_modbus + + def frame_read(self, daddr=0x60, dlen=0x50): + block_modbus = { + 'addr_dev' : self.addr_modbus, + 'data_addr' : daddr, + 'data_len' : dlen, + 'type' : 'read', + } + block_dlt645 = { + 'addr' : self.addr_645, + 'type' : 'modbus', + 'data' : block_modbus, + } + frame = make_frame_dlt645(block_dlt645) + + if self.com is None: + print(trans_list_to_str(frame)) + return + + self.com.read_all() + self.com.write(bytearray(frame)) + time.sleep(0.5) + frame_recv = self.com.read_all() + output_text = check_frame_dlt645(frame_recv, block=block_dlt645) + print(output_text) + + def frame_write_one(self, daddr=0x85, dval=-900): + block_modbus = { + 'addr_dev' : self.addr_modbus, + 'data_addr' : daddr, + 'data_val' : dval, + 'type' : 'write_one', + } + block_dlt645 = { + 'addr' : self.addr_645, + 'type' : 'modbus', + 'data' : block_modbus, + } + frame = make_frame_dlt645(block_dlt645) + if self.com is None: + print(trans_list_to_str(frame)) + return + + self.com.write(bytearray(frame)) + + def frame_write_dual(self, daddr=0x91, dval=600): + block_modbus = { + 'addr_dev' : self.addr_modbus, + 'data_addr' : daddr, + 'data_val' : dval, + 'type' : 'write_dual', + } + block_dlt645 = { + 'addr' : self.addr_645, + 'type' : 'modbus', + 'data' : block_modbus, + } + frame = make_frame_dlt645(block_dlt645) + if self.com is None: + print(trans_list_to_str(frame)) + return + + self.com.write(bytearray(frame)) + + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]): + block_modbus = { + 'addr_dev' : self.addr_modbus, + 'data_addr' : daddr, + 'data_val' : dval, + 'type' : 'write_str', + } + block_dlt645 = { + 'addr' : self.addr_645, + 'type' : 'modbus', + 'data' : block_modbus, + } + frame = make_frame_dlt645(block_dlt645) + if self.com is None: + print(trans_list_to_str(frame)) + return + + self.com.write(bytearray(frame)) + + def frame_update(self, path_bin): + """ 程序升级 """ + block_modbus = { + 'addr_dev' : self.addr_modbus, + 'type' : 'update', + 'step' : 'start', + 'index' : 0, + 'file' : Path(path_bin).read_bytes(), + 'header_offset': 128, + } + block_dlt645 = { + 'addr' : self.addr_645, + 'type' : 'modbus', + 'data' : block_modbus, + } + # 启动帧 + frame_master = bytearray(make_frame_dlt645(block_dlt645)) + + # 等待擦除完成返回 + try_times = 3000 + self.com.read_all() + while try_times: + time.sleep(0.4) + self.com.write(frame_master) + frame_slave = self.com.read_all() + if not frame_slave: + try_times -= 1 + continue + + block_dlt645["data"]['file_block_size'] = check_frame_dlt645(frame_slave, block_dlt645) + break + + if block_dlt645["data"]['file_block_size'] == 0: + raise Exception("Error slave response.") + + # 避免接收到延迟返回报文 + time.sleep(0.4) + + # 文件传输 + block_dlt645["data"]['step'] = 'trans' + data_remain = len(block_dlt645["data"]['file']) - block_dlt645['data']['header_offset'] + while data_remain > 0: + frame_master = bytearray(make_frame_dlt645(block_dlt645)) + + self.com.read_all() + self.com.write(frame_master) + time.sleep(0.2) + frame_slave = None + while not frame_slave: + frame_slave = self.com.read_all() + check_frame_dlt645(frame_slave, block_dlt645) + + block_dlt645["data"]['index'] += 1 + data_remain -= block_dlt645["data"]['file_block_size'] + + # 结束升级 + block_dlt645["data"]['step'] = 'end' + frame_master = bytearray(make_frame_dlt645(block_dlt645)) + + self.com.read_all() + self.com.write(frame_master) + time.sleep(0.1) + frame_slave = None + while not frame_slave: + frame_slave = self.com.read_all() + check_frame_dlt645(frame_slave[:18], block_dlt645) + +if __name__=='__main__': + dev_lamina = LaminaAdapter() + + dev_lamina.frame_read(0x0100, 0x20) + diff --git a/source/main.py b/source/main.py index ab6c23d..461ff61 100644 --- a/source/main.py +++ b/source/main.py @@ -1,157 +1,9 @@ import time from webui import webui from pathlib import Path -from serial import Serial -from utl import trans_list_to_str -from func_frame import make_frame_dlt645, check_frame_dlt645 - - -def frame_read(daddr=0x60, dlen=0x50, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): - block_modbus = { - 'addr_dev' : 0x01, - 'data_addr' : daddr, - 'data_len' : dlen, - 'type' : 'read', - } - block_dlt645 = { - 'addr' : dev_addr, - 'type' : 'modbus', - 'data' : block_modbus, - } - frame2 = make_frame_dlt645(block_dlt645) - if com is None: - print(trans_list_to_str(frame2)) - return - com.read_all() - com.write(bytearray(frame2)) - time.sleep(0.5) - frame3 = com.read_all() - output_text = check_frame_dlt645(frame3, block=block_dlt645) - print(output_text) - -def frame_write_one(daddr=0x85, dval=-900, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): - block_modbus = { - 'addr_dev' : 0x01, - 'data_addr' : daddr, - 'data_val' : dval, - 'type' : 'write_one', - } - block_dlt645 = { - 'addr' : dev_addr, - 'type' : 'modbus', - 'data' : block_modbus, - } - frame2 = make_frame_dlt645(block_dlt645) - if com is None: - print(trans_list_to_str(frame2)) - return - com.write(bytearray(frame2)) - -def frame_write_dual(daddr=0x91, dval=600, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): - block_modbus = { - 'addr_dev' : 0x01, - 'data_addr' : daddr, - 'data_val' : dval, - 'type' : 'write_dual', - } - block_dlt645 = { - 'addr' : dev_addr, - 'type' : 'modbus', - 'data' : block_modbus, - } - frame2 = make_frame_dlt645(block_dlt645) - if com is None: - print(trans_list_to_str(frame2)) - return - com.write(bytearray(frame2)) - -def frame_write_str(daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01], dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): - block_modbus = { - 'addr_dev' : 0x01, - 'data_addr' : daddr, - 'data_val' : dval, - 'type' : 'write_str', - } - block_dlt645 = { - 'addr' : dev_addr, - 'type' : 'modbus', - 'data' : block_modbus, - } - frame2 = make_frame_dlt645(block_dlt645) - if com is None: - print(trans_list_to_str(frame2)) - return - com.write(bytearray(frame2)) - -def frame_update(com, path_bin, dev_addr): - """ 程序升级 """ - block_modbus = { - 'addr_dev' : 0x01, - 'type' : 'update', - 'step' : 'start', - 'index' : 0, - 'file' : Path(path_bin).read_bytes(), - 'header_offset': 128, - } - block_dlt645 = { - 'addr' : dev_addr, - 'type' : 'modbus', - 'data' : block_modbus, - } - # 启动帧 - frame_master = bytearray(make_frame_dlt645(block_dlt645)) - - # 等待擦除完成返回 - try_times = 3000 - com.read_all() - while try_times: - time.sleep(0.4) - com.write(frame_master) - frame_slave = com.read_all() - if not frame_slave: - try_times -= 1 - continue - - block_dlt645["data"]['file_block_size'] = check_frame_dlt645(frame_slave, block_dlt645) - break - - if block_dlt645["data"]['file_block_size'] == 0: - raise Exception("Error slave response.") - - # 避免接收到延迟返回报文 - time.sleep(0.4) - - # 文件传输 - block_dlt645["data"]['step'] = 'trans' - data_remain = len(block_dlt645["data"]['file']) - block_dlt645['data']['header_offset'] - while data_remain > 0: - frame_master = bytearray(make_frame_dlt645(block_dlt645)) - - com.read_all() - com.write(frame_master) - time.sleep(0.2) - frame_slave = None - while not frame_slave: - frame_slave = com.read_all() - check_frame_dlt645(frame_slave, block_dlt645) - - block_dlt645["data"]['index'] += 1 - data_remain -= block_dlt645["data"]['file_block_size'] - - # 结束升级 - block_dlt645["data"]['step'] = 'end' - frame_master = bytearray(make_frame_dlt645(block_dlt645)) - - com.read_all() - com.write(frame_master) - time.sleep(0.1) - frame_slave = None - while not frame_slave: - frame_slave = com.read_all() - check_frame_dlt645(frame_slave[:18], block_dlt645) - - return com - +from func_frame import check_frame_dlt645 +from dev_Lamina import LaminaAdapter +from dev_EnergyRouter import EnergyRouter def my_function(e : webui.event): @@ -184,23 +36,22 @@ def main_webui(): print(events) myWindow.show(str(file_main)) -com = None if __name__ == "__main__": - com = Serial("Com16", baudrate=115200, parity='N', timeout=2) - address = [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA] path_bin1 = "D:\\WorkingProject\\LightStackAdapter\\software\\lamina_adapter\\tools\\upgrade\\DGAPD_240415_1000_V1.05.bin" - path_bin2 = "D:\\WorkingProject\\LightStackAdapter\\software\\lamina_adapter\\tools\\upgrade\\DGAPD_240416_1400_V1.05.bin" + path_bin2 = "D:\\WorkingProject\\LightStackAdapter\\software\\Laminaadapter\\tools\\upgrade\\DGAPD_240416_1400_V1.05.bin" + path_bin3 = "F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT_data_v0.bin" - frame_read(0x100, 0x20) + dev_la = LaminaAdapter("COM16") + dev_la.frame_read(0x0100, 0x20) if not hasattr(__builtins__,"__IPYTHON__"): - path_bin = path_bin1 - frame_update(com, path_bin, address) + path_bin = path_bin3 + + dev_la.frame_update(path_bin) time.sleep(1) - com.read_all() - - frame_read(0x100, 0x20) + dev_la.frame_read(0x0100, 0x20) + pass