积累更新

This commit is contained in:
何 泽隆
2024-10-31 09:01:49 +08:00
parent eeb476a538
commit 73a36c35bb
13 changed files with 175 additions and 165 deletions

View File

@@ -1,11 +1,11 @@
import time
from pathlib import Path
from device.LaminaAdapter import LaminaAdapter
from source.device.LaminaAdapter import GeneratePackage_SLCP101_p460
from source.device.LaminaAdapter import GenerateImage_SLCP101_p460
from source.device.LaminaAdapter import GeneratePackage_DLSY001_p460
from source.device.LaminaAdapter import GenerateImage_DLSY001_p460
from function.tools.ByteConv import trans_list_to_str
from device.LaminaAdapter import GeneratePackage_SLCP101_p460
from device.LaminaAdapter import GenerateImage_SLCP101_p460
from device.LaminaAdapter import GeneratePackage_DLSY001_p460
from device.LaminaAdapter import GenerateImage_DLSY001_p460
from device.tools.ByteConv import trans_list_to_str
def test_communication(time_out=2):
""" 通信成功率测试 """
@@ -37,7 +37,7 @@ def make_Pakeage(fp: Path):
file_package = fp.parent / f'{hex_update.stem}.dat'
file_update_bin = fp.parent / f'{hex_update.stem}.bin'
data_package, data_update_bin = GeneratePackage_SLCP101_p460(hex_update)
data_package, data_update_bin = GeneratePackage_DLSY001_p460(hex_update)
file_package.write_bytes(data_package)
file_update_bin.write_bytes(data_update_bin)
@@ -239,45 +239,47 @@ if __name__=='__main__':
'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10},
}
Process1()
exit(0)
# Process1()
# Process2()
# exit(0)
dev_lamina = LaminaAdapter(**mode_config['HPLC'])
dev_lamina = LaminaAdapter(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"):
# 工程-即时转换
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
# file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
file_hex = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug\lamina_optimizer.hex")
if not file_hex.exists():
raise Exception("工程编译目标文件不存在.")
file_package = make_Pakeage(file_hex)
version = "DLSY001_240911_1600_V1.01"
version = "DLSY001_241030_1430_V1.02"
addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00]
while True:
""" 自动检测升级流程(需要调试) """
ret = False
while not ret or ('Regs' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Regs'][0x0100][1]):
dev_lamina.frame_read(0x82, 3)
while not ret or ('Regs' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Regs'][0x0100][1].strip('\000')):
# dev_lamina.frame_read(0x82, 3)
ret = dev_lamina.frame_read(0x0100, 0x20)
time.sleep(1)
dev_lamina.frame_update(file_package)
time.sleep(6)
time.sleep(3)
ret = dev_lamina.frame_read(0x0100, 0x20)
if ret and (version == dev_lamina.output['Regs'][0x0100][1]):
dev_lamina.frame_write_one(0x52, 0x01)
print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}")
dev_lamina.frame_write_str(0x82, addr)
dev_lamina.frame_read(0x82, 3)
addr[5] += 1
if addr[5] & 0x0F >= 10:
addr[5] += 0x10 - 10
# print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}")
# dev_lamina.frame_write_str(0x82, addr)
# dev_lamina.frame_read(0x82, 3)
# addr[5] += 1
# if addr[5] & 0x0F >= 10:
# addr[5] += 0x10 - 10

View File

@@ -5,8 +5,8 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from device.LaminaController import LaminaController
from device.LaminaController import GeneratePackage_DLSP001_p280039
from device.LaminaController import GenerateImage_DLSP001_p280039
from function.tools.ByteConv import trans_list_to_str
from tools.IntelHex import file_Bin_to_IntelHex
from device.tools.ByteConv import trans_list_to_str
from device.tools.IntelHex import file_Bin_to_IntelHex
def test_communication(time_out=2):
""" 通信成功率测试 """

View File

@@ -1,8 +1,8 @@
import time
from serial import Serial
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.frame import make_frame_modbus, check_frame_modbus, print_display
from . import tools
from . import function
class DeviceSerial:
""" 串口通信设备原型
@@ -52,20 +52,20 @@ class DeviceSerial:
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
self.output = function.frame.check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None:
print(trans_list_to_str(frame))
print(tools.ByteConv.trans_list_to_str(frame))
return False
fail_count = 0
@@ -77,12 +77,12 @@ class DeviceSerial:
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame))
time.sleep(10 * self.time_gap)
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
print_display(self.output['Regs'])
function.frame.print_display(self.output['Regs'])
self.log['read'] += 1
break
else:

View File

@@ -3,9 +3,9 @@ import socket
import random
import hashlib
from pathlib import Path
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.frame import make_frame_modbus, check_frame_modbus, print_display
from function.file_upgrade import build_header, file_encryption
from . import tools
from . import function
ParamMap_EnergyRouter = {
0x00: ['编译日期', 4, 6],
@@ -46,7 +46,7 @@ class EnergyRouter:
def __transfer_data(self, frame: bytes) -> bool:
""" 数据传输处理函数 """
if self.tcp_socket is None:
print(trans_list_to_str(frame))
print(tools.ByteConv.trans_list_to_str(frame))
return False
try:
@@ -54,15 +54,15 @@ class EnergyRouter:
time.sleep(self.time_out)
frame_recv = self.tcp_socket.recv(128)
self.output = check_frame_modbus(frame_recv, self.block)
self.output = function.frame.check_frame_modbus(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
function.frame.print_display(self.output['Regs'])
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
@@ -71,7 +71,7 @@ class EnergyRouter:
self.block['type'] = 'read'
self.block['data_addr'] = daddr
self.block['data_len'] = dlen
frame = make_frame_modbus(self.block)
frame = function.frame.make_frame_modbus(self.block)
return self.__transfer_data(frame)
@@ -86,7 +86,7 @@ class EnergyRouter:
self.block['file'] = Path(path_bin).read_bytes()
self.block['header_offset'] = 128
# 启动帧
frame_master = make_frame_modbus(self.block)
frame_master = function.frame.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
@@ -116,7 +116,7 @@ class EnergyRouter:
continue
seq_window[i] = 1
self.block['index'] = seq_offset + i
seq_frame_master[i] = make_frame_modbus(self.block)
seq_frame_master[i] = function.frame.make_frame_modbus(self.block)
self.tcp_socket.send(seq_frame_master[i])
# 接收帧回复
tmp = list(zip(range(len(seq_window)), seq_window))
@@ -126,7 +126,7 @@ class EnergyRouter:
# 接收到空数据, 对端已关闭连接
if seq_frame_slave[i] == '':
raise Exception("TCP closed.")
self.output = check_frame_modbus(seq_frame_slave[i], None)
self.output = function.frame.check_frame_modbus(seq_frame_slave[i], None)
seq_current, seq_hope = self.output['upgrade']['index'], self.output['upgrade']['hope']
if seq_current < seq_offset:
raise Exception("Error.")
@@ -152,14 +152,14 @@ class EnergyRouter:
# 结束升级
self.block['step'] = 'end'
frame_master = make_frame_modbus(self.block)
frame_master = function.frame.make_frame_modbus(self.block)
while self.output['result'] is False:
self.tcp_socket.send(frame_master)
frame_slave = self.tcp_socket.recv(8)
if frame_slave == '':
raise Exception("TCP closed.")
self.output = check_frame_modbus(frame_slave[:18], self.block)
self.output = function.frame.check_frame_modbus(frame_slave[:18], self.block)
def GeneratePackage_Demo_Xilinx(path_bin: Path):
@@ -188,19 +188,19 @@ def GeneratePackage_Demo_Xilinx(path_bin: Path):
md5_ctx = hashlib.md5()
md5_ctx.update(data_bin)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(data_bin))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(data_bin))
config['hex_name'] = list(path_bin.name.encode())[:80]
if (header:= build_header(config, 128)) is None:
if (header:= function.file_upgrade.build_header(config, 128)) is None:
raise Exception("Header tag oversize. ")
if (header_512:= build_header(config, 512)) is None:
if (header_512:= function.file_upgrade.build_header(config, 512)) is None:
raise Exception("Header tag oversize. ")
data_encrypt = file_encryption(data_bin)
data_encrypt = function.file_upgrade.file_encryption(data_bin)
print("Upgrade file generated successfully.")
print(f"\t header_length={len(header)}, bin_length={len(data_bin)}[{hex(len(data_bin))}]")
print(f"\t file md5: {trans_list_to_str(config['md5'])}")
print(f"\t file md5: {tools.ByteConv.trans_list_to_str(config['md5'])}")
file1 = path_bin.parent / (path_bin.stem + '.dat')
file1.write_bytes(header + data_bin)
file2 = path_bin.parent / (path_bin.stem + '_h512.dat')

View File

@@ -4,11 +4,9 @@ from math import ceil
from tqdm import tqdm
from pathlib import Path
from serial import Serial
from function.file_upgrade import build_header_new, file_encryption
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.tools.IntelHex import file_IntelHex_to_Bin
from function.frame import check_frame_dlt645, make_frame_dlt645, print_display
from . import tools
from . import function
ParamMap_LaminaAdapter = {
# 1 - Hex
@@ -17,6 +15,7 @@ ParamMap_LaminaAdapter = {
# 4 - str
# 5 - addr
# 6 - float
0x00: ["硬件版本识别电压", 2, 1000],
0x0E: ["故障字1", 1],
0x0F: ["故障字2", 1],
0x10: ["MPPT工作状态", 1],
@@ -111,13 +110,14 @@ ParamMap_LaminaAdapter = {
0x110: ["型号", 4, 16],
0x120: ["载波芯片地址", 4, 16],
0x130: ["厂商", 4, 8],
0x138: ["保留", 4, 8],
0x140: ["保留", 4, 16],
0x138: ["ODM版本", 4, 16],
0x148: ["保留", 4, 8],
0x150: ["保留", 4, 16],
0x160: ["硬件", 4, 16],
0x170: ["SN", 4, 16],
0x180: ["MES", 4, 16],
0x190: ["Datetime", 4, 16],
0x1A0: ["版本前缀", 4, 5],
}
@@ -178,13 +178,13 @@ class LaminaAdapter:
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_dlt645(frame_recv, self.block)
self.output = function.frame.check_frame_dlt645(frame_recv, self.block)
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
@@ -192,7 +192,7 @@ class LaminaAdapter:
def __transfer_data(self, frame: bytearray) -> bool:
""" 报文数据传输 """
if self.__com is None:
print(trans_list_to_str(frame))
print(tools.ByteConv.trans_list_to_str(frame))
return False
fail_count = 0
@@ -204,11 +204,11 @@ class LaminaAdapter:
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame))
if self.__read_frame():
if 'Regs' in self.output.keys():
print_display(self.output['Regs'])
function.frame.print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
@@ -223,7 +223,7 @@ class LaminaAdapter:
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_dlt645(self.block)
frame = function.frame.make_frame_dlt645(self.block)
return self.__transfer_data(frame)
@@ -231,7 +231,7 @@ class LaminaAdapter:
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
frame = function.frame.make_frame_dlt645(self.block)
return self.__transfer_data(frame)
@@ -239,7 +239,7 @@ class LaminaAdapter:
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
frame = function.frame.make_frame_dlt645(self.block)
return self.__transfer_data(frame)
@@ -247,7 +247,7 @@ class LaminaAdapter:
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_dlt645(self.block)
frame = function.frame.make_frame_dlt645(self.block)
return self.__transfer_data(frame)
@@ -265,7 +265,7 @@ class LaminaAdapter:
self.block['data']['file'] = Path(path_bin).read_bytes()
self.block['data']['header_offset'] = 184
# 启动帧
frame_master = bytearray(make_frame_dlt645(self.block))
frame_master = bytearray(function.frame.make_frame_dlt645(self.block))
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
@@ -284,7 +284,7 @@ class LaminaAdapter:
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_dlt645(self.block)
frame_master = function.frame.make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
@@ -294,7 +294,7 @@ class LaminaAdapter:
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_dlt645(self.block)
frame_master = function.frame.make_frame_dlt645(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
@@ -313,16 +313,16 @@ def GeneratePackage_SLCP101_p460(path_hex: Path):
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
@@ -349,28 +349,28 @@ def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
if (back_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
@@ -404,28 +404,28 @@ def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
if (back_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
@@ -459,16 +459,16 @@ def GeneratePackage_DLSY001_p460(path_hex: Path):
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
@@ -495,28 +495,28 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_encryption(bin_main)
encrypt_back = file_encryption(bin_back)
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
encrypt_back = function.file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_back))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
if (back_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")

View File

@@ -1,13 +1,12 @@
import time
import hashlib
from math import ceil
from tqdm import tqdm
from pathlib import Path
from serial import Serial
from function.file_upgrade import build_header_new, file_encryption, parser_version_info
from function.tools.ByteConv import conv_int_to_array, trans_list_to_str
from function.tools.IntelHex import file_IntelHex_to_Bin
from function.frame import check_frame_modbus, make_frame_modbus, print_display
from . import tools
from . import function
ParamMap_LaminaController = {
# 1 - Hex
@@ -168,20 +167,20 @@ class LaminaController:
flag_frame = True
frame_recv += bytes_read
try:
self.output = check_frame_modbus(frame_recv, self.block['data'])
self.output = function.frame.check_frame_modbus(frame_recv, self.block['data'])
if self.flag_print:
print("Read Frame: ", trans_list_to_str(frame_recv))
print("Read Frame: ", tools.ByteConv.trans_list_to_str(frame_recv))
except Exception as ex:
print("Error Info: ", ex)
if self.flag_print and frame_recv:
print("Fail Data: " , trans_list_to_str(frame_recv))
print("Fail Data: " , tools.ByteConv.trans_list_to_str(frame_recv))
self.output['result'] = False
return self.output['result']
def __transfer_data(self, frame: bytearray) -> bool:
""" 串口收发报文, 包含重试逻辑与数据打印 """
if self.__com is None:
print(trans_list_to_str(frame))
print(tools.ByteConv.trans_list_to_str(frame))
return False
fail_count = 0
@@ -193,12 +192,12 @@ class LaminaController:
if self.flag_print and frame_discard:
print("Discard Data: " , frame_discard)
if self.flag_print:
print("Send Frame: ", trans_list_to_str(frame))
print("Send Frame: ", tools.ByteConv.trans_list_to_str(frame))
time.sleep(10 * self.time_gap)
if self.__read_frame():
if (self.flag_print is not None) and 'Regs' in self.output.keys():
print_display(self.output['Regs'])
function.frame.print_display(self.output['Regs'])
self.log['read'] += 1
break
else:
@@ -213,7 +212,7 @@ class LaminaController:
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
frame = make_frame_modbus(self.block['data'])
frame = function.frame.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
@@ -222,7 +221,7 @@ class LaminaController:
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
frame = function.frame.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
@@ -231,7 +230,7 @@ class LaminaController:
self.block['data']['data_addr'] = daddr
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data']['data_val'] = int(dval * item_coff)
frame = make_frame_modbus(self.block['data'])
frame = function.frame.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
@@ -239,7 +238,7 @@ class LaminaController:
self.block['data']['type'] = 'write_str'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
frame = make_frame_modbus(self.block['data'])
frame = function.frame.make_frame_modbus(self.block['data'])
return self.__transfer_data(frame)
@@ -254,7 +253,7 @@ class LaminaController:
# 读取config
self.block['data']['type'] = 'record_cfg'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_cfg = []
@@ -264,7 +263,7 @@ class LaminaController:
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
@@ -273,7 +272,7 @@ class LaminaController:
# 读取data
self.block['data']['type'] = 'record_data'
self.block['data']['step'] = 'start'
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
ret, pbar = True, None
frame_data_record = []
@@ -283,7 +282,7 @@ class LaminaController:
if self.output['record']['seq'] == 0:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
self.block['data']['step'] = 'next'
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
elif (self.output['record']['seq']) >= self.output['record']['total']:
ret = False
pbar and pbar.update()
@@ -441,7 +440,7 @@ class LaminaController:
# 启动帧
self.block['data']['step'] = 'start'
self.block['data']['index'] = 0
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start')
@@ -459,7 +458,7 @@ class LaminaController:
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
@@ -469,7 +468,7 @@ class LaminaController:
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = make_frame_modbus(self.block['data'])
frame_master = function.frame.make_frame_modbus(self.block['data'])
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
@@ -490,17 +489,17 @@ def GeneratePackage_DLSP001_p280039(path_hex: Path):
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False)
encrypt_main = file_encryption(bin_main)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False)
encrypt_main = function.file_upgrade.file_encryption(bin_main)
info_version = parser_version_info(bin_main, 0x88000)
info_version = function.file_upgrade.parser_version_info(bin_main, 0x88000)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(info_version['name'])[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
@@ -529,28 +528,28 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P
'upgrade_type': [0x00, 0x00], # 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
}
bin_boot = file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False)
bin_main = file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False)
bin_back = file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False)
bin_boot = tools.IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=2 * 0x004000, conv_end=False)
bin_main = tools.IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=2 * 0x014000, conv_end=False)
bin_back = tools.IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=2 * 0x014000, conv_end=False)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x00, 0x00] # 主程序-片外缓冲
config['file_length'] = conv_int_to_array(len(bin_main))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=build_header_new(config)) is None:
if (main_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['upgrade_type'] = [0x02, 0x00] # 备份程序
config['file_length'] = conv_int_to_array(len(bin_back))
config['file_length'] = tools.ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=build_header_new(config)) is None:
if (back_header:=function.file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
main_header_buffer = bytearray()
@@ -559,8 +558,8 @@ def GenerateImage_DLSP001_p280039(path_boot: Path, path_main: Path, path_back: P
back_header_buffer = bytearray()
for byte_org in back_header:
back_header_buffer.extend(bytearray([0x00, byte_org]))
main_encrypt = file_encryption(bin_main)
back_encrypt = file_encryption(bin_back)
main_encrypt = function.file_upgrade.file_encryption(bin_main)
back_encrypt = function.file_upgrade.file_encryption(bin_back)
print("Merge Image generated successfully.")
print(f"Main File:")

View File

@@ -1,2 +1,5 @@
from . import tools
from . import function
from . import EnergyRouter
from . import LaminaAdapter
from . import LaminaController

View File

@@ -0,0 +1,2 @@
from . import frame
from . import file_upgrade

View File

@@ -1,10 +1,6 @@
import hashlib
from pathlib import Path
from datetime import datetime
from crc import Calculator, Crc16
from tools.IntelHex import file_IntelHex_to_Bin, file_Bin_to_IntelHex
Header_Tag = {
'file_type': [0x00, 2], # 0 - 文件类型; 2byte
@@ -183,19 +179,6 @@ def parser_version_info(file_bin: bytearray, base_addr:int):
return block_version
def test1(fp: Path):
""" bin文件生成测试 """
file1 = Path(fp)
path1 = file1.parent / (file1.stem + ".bin")
data1 = file1.read_text()
data2 = file_IntelHex_to_Bin(data1, 0x3F0100)
path1.write_bytes(data2)
# 测试Bin到IntelHex
bin = Path(fp).read_bytes()
result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2)
(fp.parent / (fp.stem + ".hex")).write_text(result)
def test2():
""" 校验, 加密测试 """
@@ -237,6 +220,6 @@ def task5():
if __name__ == "__main__":
# path_bin = Path("F:\\Work\\FPGA\\Test\\Vivado\\test_update\\test_update.vitis\\upgrade_system\\Debug\\sd_card\\BOOT.BIN")
# GeneratePackage_Demo_Xilinx(path_bin)
import hashlib
from pathlib import Path
pass

View File

@@ -1,6 +1,7 @@
import struct
from crc import Calculator, Crc16
from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex
from .. import tools
modbus_map = {
# 1 - Hex
@@ -332,14 +333,14 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict
data_label, data_len = "未知数据", 1
if idx not in modbus_map.keys():
item = convert_regs_to_int(data, 1, signed=False)
item = display_hex(item, 4)
item = tools.ByteConv.display_hex(item, 4)
else:
current_map = modbus_map[idx]
data_label = current_map[0]
if current_map[1] == 1:
""" Hex字符表示 """
item = convert_regs_to_int(data, 1, signed=False)
item = display_hex(item, 4)
item = tools.ByteConv.display_hex(item, 4)
elif current_map[1] == 2:
""" 16位数值表示 """
item = convert_regs_to_int(data, 1)
@@ -359,12 +360,12 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict
item = item.decode()
except Exception as ex:
item_len = sum([any(item[i:]) for i in range(len(item))])
item = trans_list_to_str(item[:item_len])
item = tools.ByteConv.trans_list_to_str(item[:item_len])
elif current_map[1] == 5:
""" 载波地址表示 """
data_len = current_map[2]
item = swapping_words(data, data_len)
item = trans_list_to_str(item)
item = tools.ByteConv.trans_list_to_str(item)
elif current_map[1] == 6:
""" 浮点数值表示 """
data_len = 2
@@ -388,7 +389,7 @@ def print_display(output_data: dict):
for key, value in output_data.items():
label = value[0]
data = "-".join(map(str, value[1])) if type(value) == list else value[1]
print(f"{display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}")
print(f"{tools.ByteConv.display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}")

View File

@@ -1,3 +1,6 @@
from pathlib import Path
def calculate_checksum(data):
""" 计算校验域 """
checksum = 0
@@ -12,7 +15,7 @@ def file_IntelHex_to_Bin(file_data, base_address=0, len_max=1, **kwargs):
"""
if base_address == 0:
if file_data[8] == '2':
base_address = int(file_data[9: 13], 16) * 0x100
base_address = int(file_data[9: 13], 16) * 0x10
offset_begin = 16
elif file_data[8] == '4':
base_address = int(file_data[9: 13], 16) * 0x10000
@@ -139,4 +142,19 @@ def file_Bin_to_IntelHex(file_data: bytearray, base_address=0, **kwargs):
hex_record = f':00000001FF\n'
result += hex_record
return result
return result
def test1(fp: Path):
""" bin文件生成测试 """
file1 = Path(fp)
path1 = file1.parent / (file1.stem + ".bin")
data1 = file1.read_text()
data2 = file_IntelHex_to_Bin(data1, 0x3F0100)
path1.write_bytes(data2)
# 测试Bin到IntelHex
bin = Path(fp).read_bytes()
result = file_Bin_to_IntelHex(bin, 0x80000, memory_width=2)
(fp.parent / (fp.stem + ".hex")).write_text(result)

View File

@@ -0,0 +1,2 @@
from . import IntelHex
from . import ByteConv