From 9876c81d5c46fa9f701402028635aa5362129900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=20=E6=B3=BD=E9=9A=86?= Date: Fri, 4 Oct 2024 21:18:55 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=B8=A7=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=87=BD=E6=95=B0;=20=E4=BF=AE=E6=94=B9Hex=E5=AD=97=E7=AC=A6?= =?UTF-8?q?=E6=98=BE=E7=A4=BA=E5=87=BD=E6=95=B0;=20=E6=B7=BB=E5=8A=A0Bin?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=89=88=E6=9C=AC=E4=BF=A1=E6=81=AF=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E5=87=BD=E6=95=B0;(=E9=9C=80=E8=A6=81=E7=89=B9?= =?UTF-8?q?=E6=AE=8A=E4=BB=A3=E7=A0=81=E6=AE=B5=E7=BB=93=E6=9E=84)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source/func_frame_re.py | 391 +++++++++++++++++++++++++++++++++++++++ source/func_upgrade.py | 22 ++- source/tools/ByteConv.py | 8 +- 3 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 source/func_frame_re.py diff --git a/source/func_frame_re.py b/source/func_frame_re.py new file mode 100644 index 0000000..ac876b8 --- /dev/null +++ b/source/func_frame_re.py @@ -0,0 +1,391 @@ +import re +import struct +from crc import Calculator, Crc16 +from tools.ByteConv import trans_list_to_str, trans_str_to_list, display_hex + +modbus_map = { + # 1 - Hex + # 2 - Int16 + # 3 - lnt32 + # 4 - str + # 5 - addr + # 6 - float + 0x01: ["Hex示例", 1], + 0x02: ["Int示例", 2], + 0x03: ["Int32示例", 3], + 0x04: ["str示例", 4, 16], + 0x10: ["addr示例", 5, 6], + 0x20: ["Float示例", 6], +} + +frame_modbus = { + # func: len_base, type, idx + # type = 0, fixed length + # type = 1, add uint8 length by idx + # type = 2, add uint16 length by idx + # type = 3, check sub func code by idx, choose fixed length + 0x03: [5, 1, 2], + 0x04: [5, 1, 2], + 0x06: [8, 0], + 0x07: [8, 3, 2, {0x01: 2, 0x02: 0, 0x03: 0}], + 0x10: [8, 0], + 0x11: [11, 2, 7] +} + + +def make_frame_modbus(block:dict) -> bytearray: + """ modbus 生成函数""" + frame = [] + calculator = Calculator(Crc16.MODBUS) + + frame.append(block['addr_dev']) + if block['type'] == 'update': + """ 升级系列报文 """ + frame.append(0x07) + if len(block['file']) <= block["header_offset"]: + raise Exception("Modbus Update error, file too small.") + + if block['step'] == 'start': + frame.append(0x01) + frame.append(0x00) + frame.append(0x00) + frame.append(0x00) + frame.append(block['header_offset'] // 256) + frame.append(block['header_offset'] % 256) + frame += list(block['file'][:block['header_offset']]) + elif block['step'] == 'trans': + file_offset = block["header_offset"] + block['index'] * block['file_block_size'] + file_block = block['file'][file_offset: file_offset + block['file_block_size']] + frame.append(0x02) + frame.append(0x00) + frame.append(block['index'] // 256) + frame.append(block['index'] % 256) + frame.append(len(file_block) // 256) + frame.append(len(file_block) % 256) + frame += list(file_block) + elif block['step'] == 'end': + frame.append(0x03) + frame.append(0x00) + frame.append(block['index'] // 256) + frame.append(block['index'] % 256) + frame.append(0x00) + frame.append(0x00) + else: + raise Exception("Modbus Update Frame Step Error.") + elif block['type'][:6] == 'record': + """ 录波系列报文 """ + frame.append(0x11) + if block['type'][7:10] == 'cfg': + frame.append(0x01) + elif block['type'][7:11] == 'data': + frame.append(0x02) + else: + raise Exception("Modbus Record Frame Type Error.") + + if block['step'] == 'start': + frame.append(0x00) + elif block['step'] == 'next': + frame.append(0x01) + elif block['step'] == 'repeat': + frame.append(0x02) + else: + raise Exception("Modbus Record Frame Step Error.") + + frame.append(block['file_block_size'] // 256) + frame.append(block['file_block_size'] % 256) + else: + """ 数据读取系列报文 """ + data_addr = block['data_addr'] + if block['type'] == "read": + frame.append(0x03) + data_len = block['data_len'] + frame.append(data_addr // 256 % 256) + frame.append(data_addr % 256) + frame.append(data_len // 256 % 256) + frame.append(data_len % 256) + elif block['type'] == "write_one": + frame.append(0x06) + data_val = block['data_val'] + if data_val < 0: + data_val += 0x1_0000 + frame.append(data_addr // 256 % 256) + frame.append(data_addr % 256) + frame.append(data_val // 256 % 256) + frame.append(data_val % 256) + elif block['type'] == "write_dual": + frame.append(0x10) + data_val = block['data_val'] + if data_val < 0: + data_val += 0x1_0000_0000 + frame.append(data_addr // 256 % 256) + frame.append(data_addr % 256) + frame.append(0x00) + frame.append(0x02) + frame.append(0x04) + frame.append(data_val // 256 % 256) + frame.append(data_val % 256) + data_val //= 0x1_0000 + frame.append(data_val // 256 % 256) + frame.append(data_val % 256) + elif block['type'] == "write_str": + frame.append(0x10) + data_len = len(block['data_val']) + item_len = 2 * block['data_define'][data_addr][2] + data_val = block['data_val'] + if data_len > item_len: + raise Exception("Modbus data len oversize.") + elif data_len < item_len: + data_val += '\000' * (item_len - data_len) + data_len = len(data_val) + frame.append(data_addr // 256 % 256) + frame.append(data_addr % 256) + frame.append(0x00) + frame.append(data_len // 2) + frame.append(data_len) + + if type(data_val[0]) == str: + data_val = list(map(lambda x: x.encode()[0], data_val)) + for i in range(data_len//2): + frame.append(data_val[2*i + 1]) + frame.append(data_val[2*i]) + else: + raise Exception("Modbus Frame Type Error.") + + crc = calculator.checksum(bytearray(frame)) + frame.append(crc % 256) + frame.append(crc // 256) + + return bytearray(frame) + + +def make_frame_dlt645(block:dict) -> bytearray: + """ dlt645 生成函数""" + frame = [] + + if block['type'] == 'modbus': + """ Modbus 透传帧 """ + ctrl_code = 0x1F # Modbus 透传帧功能码 + data_frame = make_frame_modbus(block["data"]) + len_data = len(data_frame) + else: + raise Exception("Unknown dlt645 frame type.") + + frame.append(0x68) + frame += block['addr'][:6] + frame.append(0x68) + frame.append(ctrl_code) + frame.append(len_data) + frame += data_frame + frame.append(sum(frame) % 256) + frame.append(0x16) + + return bytearray(frame) + + +def check_frame_modbus(frame:bytes, block:dict) -> dict: + """ 校验modbus帧回复 """ + if len(frame:=find_frame_modbus(frame, block['addr_dev'])) == 0: + raise Exception("No frame data") + + output = { + 'result': False, + 'code_func': frame[1], + 'code_sub': frame[2], + } + if output['code_func'] == 0x07: + """ 升级回复帧 """ + output['upgrade'] = {} + output['upgrade']['index'] = frame[4] * 256 + frame[5] + if output['code_sub'] == 0x01: + """ 升级开始, 处理文件头 """ + output['upgrade']['length'] = frame[6] * 256 + frame[7] + elif output['code_sub'] == 0x02: + """ 升级传输, 解析帧序号及返回值, 不做序列号校验 """ + pass + elif output['code_sub'] == 0x03: + """ 升级结束 """ + pass + else: + raise Exception(" Upgrade SubFunc code error.") + output['result'] = True if frame[3] == 0x00 else False + output['code_error'] = frame[3] + elif output['code_func'] == 0x03 or output['code_func'] == 0x04: + """ 数据读取帧 """ + if frame[2] == len(frame[3:-2]): + output['Regs'] = display_data(block['data_addr'], frame[3:-2], block['data_define']) + else: + raise Exception("Frame read data length error.") + output['result'] = True + elif output['code_func'] == 0x06: + """ 单个数据写入帧 """ + output['result'] = True + elif output['code_func'] == 0x10: + """ 多个数据写入帧 """ + output['result'] = True + elif output['code_func'] == 0x11: + """ 录波功能帧 """ + data_record = {} + if frame[2] == 0x01: + data_record['type'] = 'config' + elif frame[2] == 0x02: + data_record["type"] = 'data' + else: + raise Exception("Unknow data type") + data_record['seq'] = frame[5] * 0x100 + frame[6] + data_record['total'] = frame[3] * 0x100 + frame[4] + data_record['data'] = frame[9:-2] + output['record'] = data_record + output['result'] = True + elif output['code_func'] & 0x80: + """ 错误返回帧 """ + output['code_error'] = frame[2] if output['code_func'] == 0x87 else frame[3] + else: + raise Exception(f"Frame Date error. func={output['code_func']}, func_sub={output['code_sub']}, len={len(frame)}") + + return output + + +def check_frame_dlt645(frame:bytes, block:dict) -> dict: + """ 校验dlt645帧回复 """ + if len(frame:=find_frame_dlt645(frame, block['addr'])) == 0: + raise Exception("No frame data") + + code_func = frame[8] + if code_func == 0x9F: + return check_frame_modbus(frame[10:-2], block['data']) + else: + raise Exception("DLT645 Frame type error.") + + +def find_frame_modbus(buffer:bytes, address:int, frame_defines: dict=frame_modbus) -> bytes: + """ 搜索合法modbus帧子串 """ + len_buffer = len(buffer) + + pos_frame, len_frame = 0, 0 + calculator = Calculator(Crc16.MODBUS) + for i in range(len_buffer): + if buffer[i] != address: + continue + if (buffer[i+1] & 0x7F) not in frame_defines.keys(): + continue + + frame_define = frame_defines[buffer[i+1] & 0x7F] + j = frame_define[0] + if buffer[i+1] & 0x80: + j = 5 + elif frame_define[1] == 0: + pass + elif frame_define[1] == 1: + j += buffer[i+frame_define[2]] + elif frame_define[1] == 2: + j += buffer[i+frame_define[2]] * 0x100 + buffer[i+frame_define[2]+1] + elif frame_define[1] == 3: + if buffer[i+frame_define[2]] not in frame_define[3].keys(): + continue + j += frame_define[3][buffer[i+frame_define[2]]] + else: + raise Exception("Unknow Modbus Define type.") + + if ((i+j) <= len_buffer) and calculator.checksum(buffer[i:i+j-2]) == (buffer[i+j-1] * 0x100 + buffer[i+j-2]): + pos_frame, len_frame = i, j + break + + return buffer[pos_frame: pos_frame+len_frame] + + +def find_frame_dlt645(buffer:bytes, address: list) -> bytes: + """ 搜索合法645帧子串 """ + len_buffer = len(buffer) + + pos_frame, len_frame = 0, 0 + for i in range(len_buffer): + if buffer[i] != 0x68 or buffer[i+7] != 0x68: + continue + if address[0] != 0xAA and buffer[i+1:i+7] == bytes(address): + continue + + j = buffer[i+9] + 12 + if sum(buffer[i:i+j-2]) % 0x100 == buffer[i+j-2]: + pos_frame, len_frame = i, j + break + + return buffer[pos_frame: pos_frame+len_frame] + + +def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict: + """ 格式化表示数据, 得到显示数据字典 """ + def swapping_words(data:bytes, length:int) -> bytearray: + item = bytearray(data[:2 * length]) + for i in range(length): + item[2*i], item[2*i+1] = item[2*i+1], item[2*i] + return item + def convert_regs_to_int(data:bytes, length:int, signed: bool=True) -> int: + result = 0 + for idx_data, byte_data in enumerate(swapping_words(data, length)): + result += byte_data * 2 ** (8 * idx_data) + result -= 2 ** (16 * length) if signed and result >= (2 ** (16 * length - 1)) else 0 + return result + + output_data = {} + idx = address + while data: + data_label, data_len = "未知数据", 1 + if idx not in modbus_map.keys(): + item = convert_regs_to_int(data, 1, signed=False) + item = display_hex(item, 4) + else: + current_map = modbus_map[idx] + data_label = current_map[0] + if current_map[1] == 1: + """ Hex字符表示 """ + item = convert_regs_to_int(data, 1, signed=False) + item = display_hex(item, 4) + elif current_map[1] == 2: + """ 16位数值表示 """ + item = convert_regs_to_int(data, 1) + if len(current_map) > 2: + item /= current_map[2] + elif current_map[1] == 3: + """ 32位数值表示 """ + data_len = 2 + item = convert_regs_to_int(data, 2) + if len(current_map) > 2: + item /= current_map[2] + elif current_map[1] == 4: + """ 字符串表示 """ + data_len = current_map[2] + item = swapping_words(data, data_len) + item = item.replace(b'\xff', b' 0xFF').decode() + elif current_map[1] == 5: + """ 载波地址表示 """ + data_len = current_map[2] + item = swapping_words(data, data_len) + item = trans_list_to_str(item) + elif current_map[1] == 6: + """ 浮点数值表示 """ + data_len = 2 + item = struct.unpack('>f', bytes([data[2], data[3], data[0], data[1]]))[0] + elif current_map[1] == 7: + """ 正序数值表示 """ + data_len = current_map[2] + item = list(map(lambda x: x, data[:2 * data_len])) + + output_data[idx] = data_label, item + idx += data_len + data = data[2*data_len:] + return output_data + + +def print_display(output_data: dict): + """ 格式化表示输出数据 """ + print("Parse Result:") + label_len_max = max(map(lambda x: len(x[0]), output_data.values())) + data_len_max = max(map(lambda x: len(str(x[1])), output_data.values())) + for key, value in output_data.items(): + label = value[0] + data = "-".join(map(str, value[1])) if type(value) == list else value[1] + print(f"{display_hex(key, 4)}: {data:<{data_len_max}} {label:<{label_len_max}}") + + + + diff --git a/source/func_upgrade.py b/source/func_upgrade.py index 4561663..9c13e1d 100644 --- a/source/func_upgrade.py +++ b/source/func_upgrade.py @@ -166,6 +166,25 @@ def build_header_new(config: dict): return m_file_header +def parser_version_info(file_bin: bytearray, base_addr:int): + """ 解析Bin文件内的版本信息结构体 """ + address_block = (file_bin[6] << 24) + (file_bin[7] << 16) + (file_bin[4] << 8) + file_bin[5] + address_block -= base_addr + address_block *= 2 + + offset = address_block + block_version = {} + block_version['name'] = file_bin[offset + 1 : offset + 64 : 2] + offset += 64 + block_version['device'] = file_bin[offset + 1 : offset + 64 : 2] + offset += 64 + block_version['factory'] = file_bin[offset + 1 : offset + 32 : 2] + offset += 32 + block_version['hardware'] = file_bin[offset + 1 : offset + 64 : 2] + offset += 64 + + return block_version + def test1(fp: Path): """ bin文件生成测试 """ file1 = Path(fp) @@ -581,11 +600,12 @@ def GeneratePackage_DLSP001_p280039(path_hex: Path): bin_main = file_IntelHex_to_Bin(path_hex.read_text(), len_max=2 * 0x014000, conv_end=False) encrypt_main = file_encryption(bin_main) + info_version = parser_version_info(bin_main, 0x88000) md5_ctx = hashlib.md5() md5_ctx.update(bin_main) config["md5"] = list(md5_ctx.digest()) config['file_length'] = conv_int_to_array(len(bin_main)) - config['hex_name'] = list(path_hex.name.encode())[:64] + config['hex_name'] = list(info_version['name'])[:64] config['hex_name'] += [0] * (64 - len(config['hex_name'])) if (main_header:=build_header_new(config)) is None: raise Exception("Header tag oversize. ") diff --git a/source/tools/ByteConv.py b/source/tools/ByteConv.py index e2c6293..0aedbc1 100644 --- a/source/tools/ByteConv.py +++ b/source/tools/ByteConv.py @@ -24,8 +24,6 @@ def conv_int_to_array(num: int, big_end=False): return result -def display_hex(data, len) -> str: - """ Hex字符表示 """ - data %= 2 ** (4 * len) - result = "0" * len + hex(data)[2:] - return "0x" + result[-len:].upper() \ No newline at end of file +def display_hex(data:int, length:int=2) -> str: + """ Hex字符固定最小长度表示 """ + return f"0x{data:0{length}X}" \ No newline at end of file