添加升级流程代码;

调整程序调用结构;
This commit is contained in:
何 泽隆
2024-04-16 17:34:50 +08:00
parent 147aad00a5
commit 281e1b04a8
2 changed files with 436 additions and 275 deletions

329
source/func_frame.py Normal file
View File

@@ -0,0 +1,329 @@
from crc import Calculator, Crc16
from utl import display_hex, trans_list_to_str
modbus_map = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
0x11: ["系统工作状态", 1],
0x12: ["系统工作模式", 1],
0x13: ["输入电压", 2],
0x14: ["电感电流", 2],
0x15: ["12V电压", 2],
0x16: ["输出电压", 2],
0x17: ["输入电流", 2],
0x18: ["温度1", 2],
0x19: ["温度2", 2],
0x1A: ["输入功率", 3],
0x1C: ["设备温度", 2],
0x1D: ["开关机状态", 1],
0x1E: ["保留", 1],
0x1F: ["保留", 1],
0x60: ["光伏通道使能", 1],
0x61: ["最小启动输入电压", 2],
0x62: ["最大启动输入电压", 2],
0x63: ["最小停止输入电压", 2],
0x64: ["最大停止输入电压", 2],
0x65: ["最小MPPT电压", 2],
0x66: ["最大MPPT电压", 2],
0x67: ["最小启动输出电压", 2],
0x68: ["最大启动输出电压", 2],
0x69: ["最小停止输出电压", 2],
0x6A: ["最大停止输出电压", 2],
0x6B: ["输入过压保护值", 2],
0x6C: ["输出过压保护值", 2],
0x6D: ["输出欠压保护值", 2],
0x6E: ["电感过流保护值", 2],
0x6F: ["输入过流保护值", 2],
0x70: ["最小电感电流限值", 2],
0x71: ["最大电感电流限值", 2],
0x72: ["浮充电压阈值", 2],
0x73: ["三点法中间阈值", 2],
0x74: ["恒压充电电压", 2],
0x75: ["过温故障值", 2],
0x76: ["过温告警值", 2],
0x77: ["温度恢复值", 2],
0x78: ["最低满载电压", 2],
0x79: ["最高满载电压", 2],
0x7A: ["输入过载保护值", 3],
0x7C: ["最小功率限值", 3],
0x7E: ["最大功率限值", 3],
0x80: ["最大功率限值存储值", 3],
0x82: ["载波通信地址", 5],
0x85: ["电压环out_max", 2],
0x86: ["电压环out_min", 2],
0x87: ["电流环out_max", 2],
0x88: ["电流环out_min", 2],
0x89: ["MPPT扰动系数k_d_vin", 2],
0x8A: ["dmin", 2],
0x8B: ["dmax", 2],
0x8C: ["扫描电压偏移scanvolt_offset", 2],
0x8D: ["电压环Kp", 3],
0x8F: ["电压环Ki", 3],
0x91: ["电流环Kp", 3],
0x93: ["电流环Ki", 3],
0x95: ["日志级别", 1],
0x96: ["日志输出方式", 1],
0x97: ["采样校准volt_in_a", 2],
0x98: ["采样校准volt_in_b", 2],
0x99: ["采样校准volt_out_a", 2],
0x9A: ["采样校准volt_out_b", 2],
0x9B: ["采样校准curr_in_a", 2],
0x9C: ["采样校准curr_in_b", 2],
0x9D: ["采样校准curr_induc_a", 2],
0x9E: ["采样校准curr_induc_b", 2],
0x9F: ["采样校准volt_12V_a", 2],
0xA0: ["采样校准volt_12V_b", 2],
0xA1: ["温度补偿temp1_b", 2],
0xA2: ["温度补偿temp2_b", 2],
0xA3: ["系统工作模式", 2],
0xA4: ["电感电流给定值curr_set", 2],
0xA5: ["抖动频率上限", 2],
0xA6: ["抖动频率下限", 2],
0xA7: ["保留", 1],
0xA8: ["保留", 1],
0xA9: ["保留", 1],
0xAA: ["保留", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
0xAE: ["保留", 1],
0xAF: ["保留", 1],
0x100: ["版本", 4],
0x110: ["型号", 4],
}
def make_frame_modbus(block:dict):
""" modbus 生成函数"""
frame = []
calculator = Calculator(Crc16.MODBUS)
frame.append(block['addr_dev'])
if block['type'] == 'update':
""" 升级系列报文 """
frame.append(0x07)
if len(block['file']) <= block["header_offset"]:
raise("Modbus Update error, file too small.")
if block['step'] == 'start':
frame.append(0x01)
frame.append(0x00)
frame.append(block['header_offset'] // 256)
frame.append(block['header_offset'] % 256)
frame += list(block['file'][:block['header_offset']])
elif block['step'] == 'trans':
file_offset = block["header_offset"] + block['index'] * block['file_block_size']
file_block = block['file'][file_offset: file_offset + block['file_block_size']]
frame.append(0x02)
frame.append(0x00)
frame.append(block['index'] // 256)
frame.append(block['index'] % 256)
frame.append(len(file_block) // 256)
frame.append(len(file_block) % 256)
frame += list(file_block)
elif block['step'] == 'end':
frame.append(0x03)
frame.append(0x00)
else:
raise("Modbus Update Frame Step Error.")
else:
""" 数据读取系列报文 """
data_addr = block['data_addr']
if block['type'] == "read":
frame.append(0x03)
data_len = block['data_len']
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_len // 256 % 256)
frame.append(data_len % 256)
elif block['type'] == "write_one":
frame.append(0x06)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_dual":
frame.append(0x10)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(0x02)
frame.append(0x04)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
data_val //= 0x1_0000
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_str":
frame.append(0x10)
data_len = len(block['data_val'])
data_val = block['data_val']
if data_len > 0x0100:
raise("modbus data len oversize.")
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(data_len // 2)
frame.append(data_len)
for i in range(data_len//2):
frame.append(data_val[2*i + 1])
frame.append(data_val[2*i])
else:
raise("Modbus Frame Type Error.")
crc = calculator.checksum(bytearray(frame))
frame.append(crc % 256)
frame.append(crc // 256)
return frame
def make_frame_dlt645(block:dict):
""" dlt645 生成函数"""
frame = []
if block['type'] == 'modbus':
""" Modbus 透传帧 """
ctrl_code = 0x1F # Modbus 透传帧功能码
data_frame = make_frame_modbus(block["data"])
len_data = len(data_frame)
else:
raise("Unknown dlt645 frame type.")
frame.append(0x68)
frame += block['addr'][:6]
frame.append(0x68)
frame.append(ctrl_code)
frame.append(len_data)
frame += data_frame
frame.append(sum(frame) % 256)
frame.append(0x16)
return frame
def display_data(address: int, data: list):
""" 格式化表示数据 """
def display_str(data, len_data):
item = bytearray(data[:2 * len_data])
for i in range(len_data):
t = item[2*i]
item[2*i] = item[2*i+1]
item[2*i+1] = t
return item
output = "Parse Result: \n"
idx = address
len_max = 0
while data:
if idx not in modbus_map.keys():
output_line = "\t Error, doesn`t match item.\n"
output += output_line
break
len_data = 1
current_map = modbus_map[idx]
if current_map[1] == 1:
""" Hex字符表示 """
item = data[0] * 0x0100 + data[1]
item = display_hex(item, 4)
elif current_map[1] == 2:
""" 16位数值表示 """
item = data[0] * 0x0100 + data[1]
if item > 0x8000:
item -= 0x1_0000
elif current_map[1] == 3:
""" 32位数值表示 """
item = data[2] * 0x0100 + data[3]
item *= 0x10000
item += data[0] * 0x0100 + data[1]
if data[2] > 0x80:
item -= 0x1_0000_0000
len_data = 2
elif current_map[1] == 4:
""" 字符串表示 """
len_data = 16
item = display_str(data, len_data)
item = item.decode()
elif current_map[1] == 5:
""" 载波地址表示 """
len_data = 3
item = display_str(data, len_data)
item = trans_list_to_str(item)
if len_max < len(current_map[0]):
len_max = len(current_map[0])
len_remain = len_max - len(current_map[0])
output_line = "\t".join([display_hex(idx, 4), current_map[0] + " " * len_remain, str(item)])
idx += len_data
del data[0:len_data * 2]
output += output_line + "\n"
return output
def check_frame_modbus(frame, block=None):
""" 校验modbus帧回复 """
calculator = Calculator(Crc16.MODBUS)
if calculator.checksum(frame[:-2]) != (frame[-1] * 0x100 + frame[-2]):
raise Exception("Frame Crc check failed.")
elif frame[0] != 0x01:
raise Exception("Frame device address error.")
code_func = frame[1]
code_subfunc = frame[2]
if code_func == 0x07:
""" 升级回复帧 """
if code_subfunc == 0x01 and frame[3] == 0x00:
return frame[4] * 256 + frame[5]
elif code_subfunc == 0x02 and frame[3] == 0x00:
update_index = frame[4] * 256 + frame[5]
if type(block) is dict and block['index'] != update_index:
raise Exception("Error slave response.")
return update_index
elif code_subfunc == 0x03 and frame[3] == 0x00:
return frame[4] * 256 + frame[5]
elif code_func == 0x03 or code_func == 0x04:
""" 数据读取帧 """
if frame[2] == len(frame[3:-2]):
if type(block) is dict:
data_addr = block['data_addr']
return display_data(data_addr, list(frame[3:-2]))
else:
return list(frame[3:-2])
else:
raise("Frame read error.")
elif code_func == 0x06 and frame[2] == len(frame[3:-2]):
""" 单个数据写入帧 """
return frame[13:-4]
elif code_func == 0x10 and frame[2] == len(frame[3:-2]):
""" 多个数据写入帧 """
return frame[13:-4]
else:
raise(f"Frame Date error. func={code_func}, func_sub={code_subfunc}, len={len(frame)}")
def check_frame_dlt645(frame, block=None):
""" 校验dlt645帧回复 """
if sum(frame[:-2]) % 0x100 != frame[-2]:
raise Exception("DLT645 Frame Verify error.")
elif len(frame[10:-2]) != frame[9]:
raise Exception("DLT645 Frame len error.")
code_func = frame[8]
if code_func == 0x9F:
block = block['data']
return check_frame_modbus(frame[10:-2], block)
else:
raise Exception("DLT645 Frame type error.")

View File

@@ -2,269 +2,9 @@ import time
from webui import webui from webui import webui
from pathlib import Path from pathlib import Path
from serial import Serial from serial import Serial
from crc import Calculator, Crc16 from utl import trans_list_to_str
from utl import trans_list_to_str, trans_str_to_list, display_hex from func_frame import make_frame_dlt645, check_frame_dlt645
root = Path(".")
com_name = "COM16"
modbus_map = {
# 1 - Hex
# 2 - Int16
# 3 - lnt32
# 4 - str
# 5 - addr
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
0x11: ["系统工作状态", 1],
0x12: ["系统工作模式", 1],
0x13: ["输入电压", 2],
0x14: ["电感电流", 2],
0x15: ["12V电压", 2],
0x16: ["输出电压", 2],
0x17: ["输入电流", 2],
0x18: ["温度1", 2],
0x19: ["温度2", 2],
0x1A: ["输入功率", 3],
0x1C: ["设备温度", 2],
0x1D: ["开关机状态", 1],
0x1E: ["保留", 1],
0x1F: ["保留", 1],
0x60: ["光伏通道使能", 1],
0x61: ["最小启动输入电压", 2],
0x62: ["最大启动输入电压", 2],
0x63: ["最小停止输入电压", 2],
0x64: ["最大停止输入电压", 2],
0x65: ["最小MPPT电压", 2],
0x66: ["最大MPPT电压", 2],
0x67: ["最小启动输出电压", 2],
0x68: ["最大启动输出电压", 2],
0x69: ["最小停止输出电压", 2],
0x6A: ["最大停止输出电压", 2],
0x6B: ["输入过压保护值", 2],
0x6C: ["输出过压保护值", 2],
0x6D: ["输出欠压保护值", 2],
0x6E: ["电感过流保护值", 2],
0x6F: ["输入过流保护值", 2],
0x70: ["最小电感电流限值", 2],
0x71: ["最大电感电流限值", 2],
0x72: ["浮充电压阈值", 2],
0x73: ["三点法中间阈值", 2],
0x74: ["恒压充电电压", 2],
0x75: ["过温故障值", 2],
0x76: ["过温告警值", 2],
0x77: ["温度恢复值", 2],
0x78: ["最低满载电压", 2],
0x79: ["最高满载电压", 2],
0x7A: ["输入过载保护值", 3],
0x7C: ["最小功率限值", 3],
0x7E: ["最大功率限值", 3],
0x80: ["最大功率限值存储值", 3],
0x82: ["载波通信地址", 5],
0x85: ["电压环out_max", 2],
0x86: ["电压环out_min", 2],
0x87: ["电流环out_max", 2],
0x88: ["电流环out_min", 2],
0x89: ["MPPT扰动系数k_d_vin", 2],
0x8A: ["dmin", 2],
0x8B: ["dmax", 2],
0x8C: ["扫描电压偏移scanvolt_offset", 2],
0x8D: ["电压环Kp", 3],
0x8F: ["电压环Ki", 3],
0x91: ["电流环Kp", 3],
0x93: ["电流环Ki", 3],
0x95: ["日志级别", 1],
0x96: ["日志输出方式", 1],
0x97: ["采样校准volt_in_a", 2],
0x98: ["采样校准volt_in_b", 2],
0x99: ["采样校准volt_out_a", 2],
0x9A: ["采样校准volt_out_b", 2],
0x9B: ["采样校准curr_in_a", 2],
0x9C: ["采样校准curr_in_b", 2],
0x9D: ["采样校准curr_induc_a", 2],
0x9E: ["采样校准curr_induc_b", 2],
0x9F: ["采样校准volt_12V_a", 2],
0xA0: ["采样校准volt_12V_b", 2],
0xA1: ["温度补偿temp1_b", 2],
0xA2: ["温度补偿temp2_b", 2],
0xA3: ["系统工作模式", 1],
0xA4: ["电感电流给定值curr_set", 2],
0xA5: ["抖动频率上限", 2],
0xA6: ["抖动频率下限", 2],
0xA7: ["保留", 1],
0xA8: ["保留", 1],
0xA9: ["保留", 1],
0xAA: ["保留", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
0xAE: ["保留", 1],
0xAF: ["保留", 1],
}
def make_frame_modbus(block:dict):
""" modbus 生成函数"""
frame = []
calculator = Calculator(Crc16.MODBUS)
func_code = 0x1F # Modbus 透传帧
frame.append(block['addr_dev'])
data_addr = block['data_addr']
if block['type'] == "read":
frame.append(0x03)
data_len = block['data_len']
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_len // 256 % 256)
frame.append(data_len % 256)
elif block['type'] == "write_one":
frame.append(0x06)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_dual":
frame.append(0x10)
data_val = block['data_val']
if data_val < 0:
data_val += 0x1_0000_0000
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(0x02)
frame.append(0x04)
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
data_val //= 0x1_0000
frame.append(data_val // 256 % 256)
frame.append(data_val % 256)
elif block['type'] == "write_str":
frame.append(0x10)
data_len = len(block['data_val'])
data_val = block['data_val']
if data_len > 0x0100:
raise("modbus data len oversize.")
frame.append(data_addr // 256 % 256)
frame.append(data_addr % 256)
frame.append(0x00)
frame.append(data_len // 2)
frame.append(data_len)
for i in range(data_len//2):
frame.append(data_val[2*i + 1])
frame.append(data_val[2*i])
crc = calculator.checksum(bytearray(frame))
frame.append(crc % 256)
frame.append(crc // 256)
return frame
def make_frame_dlt645(block:dict):
""" dlt645 生成函数"""
frame = []
len_data = len(block['data'])
ctrl_code = 0x1F # Modbus 透传帧
frame.append(0x68)
frame += block['addr'][:6]
frame.append(0x68)
frame.append(ctrl_code)
frame.append(len_data)
frame += block['data']
frame.append(sum(frame) % 256)
frame.append(0x16)
return frame
def display_data(address: int, data: list):
""" 格式化表示数据 """
output = "Phase Result: \n"
idx = address
len_max = 0
while data:
if idx not in modbus_map.keys():
output_line = "\t Error, doesn`t match item.\n"
output += output_line
break
len_data = 1
current_map = modbus_map[idx]
if current_map[1] == 1:
""" Hex字符表示 """
item = data[0] * 0x0100 + data[1]
item = display_hex(item, 4)
elif current_map[1] == 2:
""" 16位数值表示 """
item = data[0] * 0x0100 + data[1]
if item > 0x8000:
item -= 0x1_0000
elif current_map[1] == 3:
""" 32位数值表示 """
item = data[2] * 0x0100 + data[3]
item *= 0x10000
item += data[0] * 0x0100 + data[1]
if data[2] > 0x80:
item -= 0x1_0000_0000
len_data = 2
elif current_map[1] == 4:
""" 字符串表示 """
item = bytearray(data[:16])
item = item[4:6] + item[2:4] + item[0:2]
len_data = 16
elif current_map[1] == 5:
""" 载波地址表示 """
item = trans_list_to_str(data[:6])
len_data = 3
if len_max < len(current_map[0]):
len_max = len(current_map[0])
len_remain = len_max - len(current_map[0])
output_line = "\t".join([display_hex(idx, 4), current_map[0] + " " * len_remain, str(item)])
idx += len_data
del data[0:len_data * 2]
output += output_line + "\n"
return output
def test1():
""" 测试1 """
frame = "68 01 02 03 04 05 06 68 9F 29 01 03 24 20 30 00 00 00 00 00 04 00 00 00 9A 00 00 00 00 00 00 00 00 01 02 01 10 00 01 00 00 01 10 00 00 FF FF FF FF 5D 3B 7D 16 "
frame1 = trans_str_to_list(frame)
print(frame1)
frame2 = frame1[13:-4]
print(frame2)
output_text = display_data(0x0e, frame2)
print(output_text)
def test2():
""" 测试2 """
block_modbus = {
'addr_dev' : 0x01,
'data_addr' : 0x0E,
'data_len' : 0x12,
'type' : 'read_one',
}
frame1 = make_frame_modbus(block_modbus)
block_dlt645 = {
'addr' : [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'data' : frame1,
}
frame2 = make_frame_dlt645(block_dlt645)
frame_string = trans_list_to_str(frame2)
print(frame_string)
def frame_read(daddr=0x60, dlen=0x50, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): def frame_read(daddr=0x60, dlen=0x50, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]):
block_modbus = { block_modbus = {
@@ -273,10 +13,10 @@ def frame_read(daddr=0x60, dlen=0x50, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x
'data_len' : dlen, 'data_len' : dlen,
'type' : 'read', 'type' : 'read',
} }
frame1 = make_frame_modbus(block_modbus)
block_dlt645 = { block_dlt645 = {
'addr' : dev_addr, 'addr' : dev_addr,
'data' : frame1, 'type' : 'modbus',
'data' : block_modbus,
} }
frame2 = make_frame_dlt645(block_dlt645) frame2 = make_frame_dlt645(block_dlt645)
if com is None: if com is None:
@@ -286,8 +26,7 @@ def frame_read(daddr=0x60, dlen=0x50, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x
com.write(bytearray(frame2)) com.write(bytearray(frame2))
time.sleep(0.5) time.sleep(0.5)
frame3 = com.read_all() frame3 = com.read_all()
frame4 = list(frame3[13:-4]) output_text = check_frame_dlt645(frame3, block=block_dlt645)
output_text = display_data(block_modbus['data_addr'], frame4)
print(output_text) print(output_text)
def frame_write_one(daddr=0x85, dval=-900, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): def frame_write_one(daddr=0x85, dval=-900, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]):
@@ -297,12 +36,15 @@ def frame_write_one(daddr=0x85, dval=-900, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xA
'data_val' : dval, 'data_val' : dval,
'type' : 'write_one', 'type' : 'write_one',
} }
frame1 = make_frame_modbus(block_modbus)
block_dlt645 = { block_dlt645 = {
'addr' : dev_addr, 'addr' : dev_addr,
'data' : frame1, 'type' : 'modbus',
'data' : block_modbus,
} }
frame2 = make_frame_dlt645(block_dlt645) frame2 = make_frame_dlt645(block_dlt645)
if com is None:
print(trans_list_to_str(frame2))
return
com.write(bytearray(frame2)) com.write(bytearray(frame2))
def frame_write_dual(daddr=0x91, dval=600, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): def frame_write_dual(daddr=0x91, dval=600, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]):
@@ -312,12 +54,15 @@ def frame_write_dual(daddr=0x91, dval=600, dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xA
'data_val' : dval, 'data_val' : dval,
'type' : 'write_dual', 'type' : 'write_dual',
} }
frame1 = make_frame_modbus(block_modbus)
block_dlt645 = { block_dlt645 = {
'addr' : dev_addr, 'addr' : dev_addr,
'data' : frame1, 'type' : 'modbus',
'data' : block_modbus,
} }
frame2 = make_frame_dlt645(block_dlt645) frame2 = make_frame_dlt645(block_dlt645)
if com is None:
print(trans_list_to_str(frame2))
return
com.write(bytearray(frame2)) com.write(bytearray(frame2))
def frame_write_str(daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01], dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]): def frame_write_str(daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01], dev_addr=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]):
@@ -327,14 +72,88 @@ def frame_write_str(daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01], dev_a
'data_val' : dval, 'data_val' : dval,
'type' : 'write_str', 'type' : 'write_str',
} }
frame1 = make_frame_modbus(block_modbus)
block_dlt645 = { block_dlt645 = {
'addr' : dev_addr, 'addr' : dev_addr,
'data' : frame1, 'type' : 'modbus',
'data' : block_modbus,
} }
frame2 = make_frame_dlt645(block_dlt645) frame2 = make_frame_dlt645(block_dlt645)
if com is None:
print(trans_list_to_str(frame2))
return
com.write(bytearray(frame2)) com.write(bytearray(frame2))
def frame_update(com, path_bin, dev_addr):
""" 程序升级 """
block_modbus = {
'addr_dev' : 0x01,
'type' : 'update',
'step' : 'start',
'index' : 0,
'file' : Path(path_bin).read_bytes(),
'header_offset': 128,
}
block_dlt645 = {
'addr' : dev_addr,
'type' : 'modbus',
'data' : block_modbus,
}
# 启动帧
frame_master = bytearray(make_frame_dlt645(block_dlt645))
# 等待擦除完成返回
try_times = 3000
com.read_all()
while try_times:
time.sleep(0.4)
com.write(frame_master)
frame_slave = com.read_all()
if not frame_slave:
try_times -= 1
continue
block_dlt645["data"]['file_block_size'] = check_frame_dlt645(frame_slave, block_dlt645)
break
if block_dlt645["data"]['file_block_size'] == 0:
raise Exception("Error slave response.")
# 避免接收到延迟返回报文
time.sleep(0.4)
# 文件传输
block_dlt645["data"]['step'] = 'trans'
data_remain = len(block_dlt645["data"]['file']) - block_dlt645['data']['header_offset']
while data_remain > 0:
frame_master = bytearray(make_frame_dlt645(block_dlt645))
com.read_all()
com.write(frame_master)
time.sleep(0.2)
frame_slave = None
while not frame_slave:
frame_slave = com.read_all()
check_frame_dlt645(frame_slave, block_dlt645)
block_dlt645["data"]['index'] += 1
data_remain -= block_dlt645["data"]['file_block_size']
# 结束升级
block_dlt645["data"]['step'] = 'end'
frame_master = bytearray(make_frame_dlt645(block_dlt645))
com.read_all()
com.write(frame_master)
time.sleep(0.1)
frame_slave = None
while not frame_slave:
frame_slave = com.read_all()
check_frame_dlt645(frame_slave[:18], block_dlt645)
return com
def my_function(e : webui.event): def my_function(e : webui.event):
""" WebUI回调函数 """ """ WebUI回调函数 """
global events global events
@@ -343,15 +162,15 @@ def my_function(e : webui.event):
print("Data from JavaScript: " + e.window.get_str(e, 0)) # Message from JS print("Data from JavaScript: " + e.window.get_str(e, 0)) # Message from JS
frame = e.window.get_str(e, 0) frame = e.window.get_str(e, 0)
frame1 = trans_str_to_list(frame) block_dlt645 = e.window.get_str(e, 1)
frame2 = frame1[13:-4] output_text = check_frame_dlt645(frame, block=block_dlt645)
output_text = display_data(0x0e, frame2)
return output_text return output_text
events = [] events = []
def main_webui(): def main_webui():
myWindow = webui.window() myWindow = webui.window()
root = Path(".")
file_main = root / 'resource\main.html' file_main = root / 'resource\main.html'
@@ -369,6 +188,19 @@ com = None
if __name__ == "__main__": if __name__ == "__main__":
com = Serial("Com16", baudrate=115200, parity='N', timeout=2) com = Serial("Com16", baudrate=115200, parity='N', timeout=2)
address = [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA] address = [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA]
path_bin1 = "D:\\WorkingProject\\LightStackAdapter\\software\\lamina_adapter\\tools\\upgrade\\DGAPD_240415_1000_V1.05.bin"
path_bin2 = "D:\\WorkingProject\\LightStackAdapter\\software\\lamina_adapter\\tools\\upgrade\\DGAPD_240416_1400_V1.05.bin"
frame_read(0x100, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"):
path_bin = path_bin1
frame_update(com, path_bin, address)
time.sleep(1)
com.read_all()
frame_read(0x100, 0x20)
pass pass