482 lines
20 KiB
Python
482 lines
20 KiB
Python
import time
|
|
from math import ceil
|
|
from tqdm import tqdm
|
|
from pathlib import Path
|
|
from serial import Serial
|
|
from tools.ByteConv import trans_list_to_str
|
|
from func_frame import make_frame_dlt645, check_frame_dlt645, print_display
|
|
from func_upgrade import GeneratePackage_SLCP101_p460
|
|
|
|
modbus_map = {
|
|
# 1 - Hex
|
|
# 2 - Int16
|
|
# 3 - lnt32
|
|
# 4 - str
|
|
# 5 - addr
|
|
# 6 - float
|
|
0x0E: ["故障字1", 1],
|
|
0x0F: ["故障字2", 1],
|
|
0x10: ["MPPT工作状态", 1],
|
|
0x11: ["系统工作状态", 1],
|
|
0x12: ["系统工作模式", 1],
|
|
0x13: ["输入电压", 2],
|
|
0x14: ["电感电流", 2],
|
|
0x15: ["12V电压", 2],
|
|
0x16: ["输出电压", 2],
|
|
0x17: ["输入电流", 2],
|
|
0x18: ["温度1", 2],
|
|
0x19: ["温度2", 2],
|
|
0x1A: ["输入功率", 3],
|
|
0x1C: ["设备温度", 2],
|
|
0x1D: ["开关机状态", 1],
|
|
0x1E: ["电池电压", 2],
|
|
0x1F: ["并机功率限值", 3],
|
|
|
|
0x60: ["光伏通道使能", 1],
|
|
0x61: ["最小启动输入电压", 2],
|
|
0x62: ["最大启动输入电压", 2],
|
|
0x63: ["最小停止输入电压", 2],
|
|
0x64: ["最大停止输入电压", 2],
|
|
0x65: ["最小MPPT电压", 2],
|
|
0x66: ["最大MPPT电压", 2],
|
|
0x67: ["最小启动输出电压", 2],
|
|
0x68: ["最大启动输出电压", 2],
|
|
0x69: ["最小停止输出电压", 2],
|
|
0x6A: ["最大停止输出电压", 2],
|
|
0x6B: ["输入过压保护值", 2],
|
|
0x6C: ["输出过压保护值", 2],
|
|
0x6D: ["输出欠压保护值", 2],
|
|
0x6E: ["电感过流保护值", 2],
|
|
0x6F: ["输入过流保护值", 2],
|
|
0x70: ["最小电感电流限值", 2],
|
|
0x71: ["最大电感电流限值", 2],
|
|
0x72: ["浮充电压阈值", 2],
|
|
0x73: ["三点法中间阈值", 2],
|
|
0x74: ["恒压充电电压", 2],
|
|
0x75: ["过温故障值", 2],
|
|
0x76: ["过温告警值", 2],
|
|
0x77: ["温度恢复值", 2],
|
|
0x78: ["最低满载电压", 2],
|
|
0x79: ["最高满载电压", 2],
|
|
0x7A: ["输入过载保护值", 3],
|
|
0x7C: ["最小功率限值", 3],
|
|
0x7E: ["最大功率限值", 3],
|
|
0x80: ["最大功率限值存储值", 3],
|
|
0x82: ["载波通信地址", 5, 3],
|
|
0x85: ["电压环out_max", 2],
|
|
0x86: ["电压环out_min", 2],
|
|
0x87: ["电流环out_max", 2],
|
|
0x88: ["电流环out_min", 2],
|
|
0x89: ["MPPT扰动系数k_d_vin", 2],
|
|
0x8A: ["dmin", 2],
|
|
0x8B: ["dmax", 2],
|
|
0x8C: ["扫描电压偏移scanvolt_offset", 2],
|
|
0x8D: ["电压环Kp", 3],
|
|
0x8F: ["电压环Ki", 3],
|
|
0x91: ["电流环Kp", 3],
|
|
0x93: ["电流环Ki", 3],
|
|
0x95: ["日志级别", 1],
|
|
0x96: ["日志输出方式", 1],
|
|
0x97: ["采样校准volt_in_a", 2],
|
|
0x98: ["采样校准volt_in_b", 2],
|
|
0x99: ["采样校准volt_out_a", 2],
|
|
0x9A: ["采样校准volt_out_b", 2],
|
|
0x9B: ["采样校准curr_in_a", 2],
|
|
0x9C: ["采样校准curr_in_b", 2],
|
|
0x9D: ["采样校准curr_induc_a", 2],
|
|
0x9E: ["采样校准curr_induc_b", 2],
|
|
0x9F: ["采样校准volt_12V_a", 2],
|
|
0xA0: ["采样校准volt_12V_b", 2],
|
|
0xA1: ["温度补偿temp1_b", 2],
|
|
0xA2: ["温度补偿temp2_b", 2],
|
|
0xA3: ["系统工作模式", 2],
|
|
0xA4: ["电感电流给定值curr_set", 2],
|
|
0xA5: ["抖动频率上限", 2],
|
|
0xA6: ["抖动频率下限", 2],
|
|
0xA7: ["电池电压判断限值", 2],
|
|
0xA8: ["MPPT追踪模式", 1],
|
|
0xA9: ["ADC参考电压", 2],
|
|
0xAA: ["保留", 1],
|
|
0xAB: ["保留", 1],
|
|
0xAC: ["保留", 1],
|
|
0xAD: ["保留", 1],
|
|
0xAE: ["保留", 1],
|
|
0xAF: ["保留", 1],
|
|
0x100: ["版本", 4, 16],
|
|
0x110: ["型号", 4, 16],
|
|
0x120: ["载波芯片地址", 4, 16],
|
|
0x130: ["厂商", 4, 8],
|
|
0x138: ["保留", 4, 8],
|
|
0x140: ["保留", 4, 16],
|
|
0x150: ["保留", 4, 16],
|
|
0x160: ["硬件", 4, 16],
|
|
0x170: ["SN", 4, 16],
|
|
0x180: ["MES", 4, 16],
|
|
0x190: ["Datetime", 4, 16],
|
|
}
|
|
|
|
|
|
class LaminaAdapter:
|
|
""" 叠光适配器\优化器
|
|
"""
|
|
def __init__(self, com_name="COM16", addr_645=[0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA], addr_modbus=0x01, **kwargs):
|
|
# 初始化串口通信
|
|
if com_name is not None:
|
|
com_config = {}
|
|
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else 115200
|
|
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else 'N'
|
|
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else 8
|
|
com_config['stopbits'] = kwargs['stopbits'] if 'stopbits' in kwargs.keys() else 1
|
|
self.__com = Serial(com_name, **com_config)
|
|
else:
|
|
self.__com =None
|
|
|
|
self.flag_print = 'frame_print' in kwargs.keys()
|
|
self.time_out = kwargs['time_out'] if 'time_out' in kwargs.keys() else 1
|
|
self.time_gap = kwargs['time_gap'] if 'time_gap' in kwargs.keys() else 0.01
|
|
self.retry = kwargs['retry'] if 'retry' in kwargs.keys() else 1
|
|
self.retry_sub = kwargs['retry_sub'] if 'retry_sub' in kwargs.keys() else 1
|
|
|
|
self.block = {
|
|
'addr' : addr_645,
|
|
'type' : 'modbus',
|
|
'data' : {
|
|
'addr_dev' : addr_modbus,
|
|
'data_define': modbus_map,
|
|
},
|
|
}
|
|
self.output = {
|
|
'result': False,
|
|
'code_func': 0x00,
|
|
}
|
|
self.log = {
|
|
'send': 0,
|
|
'read': 0,
|
|
'keep-fail': 0,
|
|
'record': {
|
|
'config': None,
|
|
'data': None,
|
|
},
|
|
}
|
|
|
|
def __read_frame(self) -> bool:
|
|
""" 使用帧字节超时策略读报文帧, 并进行解析数据, 打印异常 """
|
|
frame_recv = b''
|
|
time_start, time_current, flag_frame = time.time(), time.time(), False
|
|
while (time_current - time_start) < self.time_out:
|
|
time.sleep(self.time_gap)
|
|
bytes_read = self.__com.read_all()
|
|
time_current = time.time()
|
|
if flag_frame and len(bytes_read) == 0:
|
|
break
|
|
elif len(bytes_read):
|
|
flag_frame = True
|
|
frame_recv += bytes_read
|
|
try:
|
|
self.output = check_frame_dlt645(frame_recv, self.block['data'])
|
|
if self.flag_print:
|
|
print("Read Frame: ", trans_list_to_str(frame_recv))
|
|
except Exception as ex:
|
|
print("Error Info: ", ex)
|
|
if self.flag_print and frame_recv:
|
|
print("Fail Data: " , trans_list_to_str(frame_recv))
|
|
self.output['result'] = False
|
|
|
|
return self.output['result']
|
|
|
|
def __transfer_data(self, frame: bytearray) -> bool:
|
|
""" 报文数据传输 """
|
|
if self.__com is None:
|
|
print(trans_list_to_str(frame))
|
|
return False
|
|
|
|
fail_count = 0
|
|
while fail_count < self.retry:
|
|
frame_discard = self.__com.read_all()
|
|
self.__com.write(frame)
|
|
self.log['send'] += 1
|
|
|
|
if self.flag_print and frame_discard:
|
|
print("Discard Data: " , frame_discard)
|
|
if self.flag_print:
|
|
print("Send Frame: ", trans_list_to_str(frame))
|
|
|
|
if self.__read_frame():
|
|
if 'Regs' in self.output.keys():
|
|
print_display(self.output['Regs'])
|
|
self.log['read'] += 1
|
|
break
|
|
else:
|
|
fail_count += 1
|
|
if fail_count >= self.log['keep-fail']:
|
|
self.log['keep-fail'] = fail_count
|
|
time.sleep(2*self.time_out)
|
|
|
|
return fail_count < self.retry
|
|
|
|
def frame_read(self, daddr=0x60, dlen=0x50):
|
|
self.block['data']['type'] = 'read'
|
|
self.block['data']['data_addr'] = daddr
|
|
self.block['data']['data_len'] = dlen
|
|
frame = make_frame_dlt645(self.block)
|
|
|
|
return self.__transfer_data(frame)
|
|
|
|
def frame_write_one(self, daddr=0x85, dval=-900):
|
|
self.block['data']['type'] = 'write_one'
|
|
self.block['data']['data_addr'] = daddr
|
|
self.block['data']['data_val'] = dval
|
|
frame = make_frame_dlt645(self.block)
|
|
|
|
return self.__transfer_data(frame)
|
|
|
|
def frame_write_dual(self, daddr=0x91, dval=600):
|
|
self.block['data']['type'] = 'write_dual'
|
|
self.block['data']['data_addr'] = daddr
|
|
self.block['data']['data_val'] = dval
|
|
frame = make_frame_dlt645(self.block)
|
|
|
|
return self.__transfer_data(frame)
|
|
|
|
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
|
|
self.block['data']['type'] = 'write_str'
|
|
self.block['data']['data_addr'] = daddr
|
|
self.block['data']['data_val'] = dval
|
|
frame = make_frame_dlt645(self.block)
|
|
|
|
return self.__transfer_data(frame)
|
|
|
|
def frame_update(self, path_bin):
|
|
""" 程序升级
|
|
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
|
|
|
|
"""
|
|
param_saved = self.flag_print, self.retry, self.time_out
|
|
|
|
self.block['data']['type'] = 'update'
|
|
self.block['data']['step'] = 'start'
|
|
self.block['data']['index'] = 0
|
|
self.block['data']['file'] = Path(path_bin).read_bytes()
|
|
self.block['data']['header_offset'] = 184
|
|
# 启动帧
|
|
frame_master = bytearray(make_frame_dlt645(self.block))
|
|
|
|
if not self.__transfer_data(frame_master):
|
|
self.flag_print, self.retry, self.time_out = param_saved
|
|
print('Upgrade Fail: start')
|
|
return False
|
|
self.block["data"]['file_block_size'] = self.output['upgrade']['length']
|
|
|
|
# 避免接收到延迟返回报文
|
|
time.sleep(self.time_out)
|
|
|
|
# 文件传输
|
|
self.retry = 3
|
|
self.time_out = 1.5
|
|
self.block["data"]['step'] = 'trans'
|
|
self.block['data']['index'] = 0
|
|
frame_total = ceil((len(self.block["data"]['file']) - self.block['data']['header_offset']) / self.block["data"]['file_block_size'])
|
|
for idx in tqdm(range(frame_total), desc="File Transmitting"):
|
|
self.block["data"]['index'] = idx
|
|
frame_master = make_frame_dlt645(self.block['data'])
|
|
if not self.__transfer_data(frame_master):
|
|
self.flag_print, self.retry, self.time_out = param_saved
|
|
print(f'Upgrade Fail: trans data in {idx}')
|
|
return False
|
|
|
|
# 结束升级
|
|
self.time_out = 1
|
|
self.block["data"]['step'] = 'end'
|
|
self.block["data"]['index'] += 1
|
|
frame_master = make_frame_dlt645(self.block['data'])
|
|
if not self.__transfer_data(frame_master):
|
|
self.flag_print, self.retry, self.time_out = param_saved
|
|
print(f'Upgrade Fail: end')
|
|
return False
|
|
|
|
self.flag_print, self.retry, self.time_out = param_saved
|
|
return True
|
|
|
|
|
|
def test_communication(time_out=2):
|
|
""" 通信成功率测试 """
|
|
time_start = time.time()
|
|
param_saved = dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out
|
|
dev_lamina.flag_print = False
|
|
dev_lamina.retry = 1
|
|
try:
|
|
while True:
|
|
dev_lamina.frame_read(0x0C, 0x20)
|
|
print(f"Time Stamp: {time.ctime()}")
|
|
print(f"Success Frame: {dev_lamina.log['read']}")
|
|
print(f"Failed Frame: {dev_lamina.log['send'] - dev_lamina.log['read']}")
|
|
print(f"Max Series Failed Frame: {dev_lamina.log['keep-fail']}")
|
|
time.sleep(time_out)
|
|
finally:
|
|
time_end = time.time()
|
|
print("Test Result: ")
|
|
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
|
|
print(f"Time Elapsed: {time_end - time_start}")
|
|
print(f"Success Rate: {dev_lamina.log['read'] / dev_lamina.log['send'] * 100}%")
|
|
dev_lamina.flag_print, dev_lamina.retry, dev_lamina.time_out = param_saved
|
|
|
|
|
|
def make_Pakeage(fp: Path):
|
|
""" 生成升级包 """
|
|
|
|
hex_update = fp
|
|
file_package = fp.parent / f'{hex_update.stem}.dat'
|
|
file_update_bin = fp.parent / f'{hex_update.stem}.bin'
|
|
|
|
data_package, data_update_bin = GeneratePackage_SLCP101_p460(hex_update)
|
|
|
|
file_package.write_bytes(data_package)
|
|
file_update_bin.write_bytes(data_update_bin)
|
|
|
|
return file_package
|
|
|
|
|
|
def test():
|
|
if 0:
|
|
dev_lamina.frame_read(0xA9, 1) # 读ADC参考电压
|
|
dev_lamina.frame_write_one(0xA9, 2000) # 写ADC参考电压:2.0V
|
|
dev_lamina.frame_write_one(0xA9, 3300) # 写ADC参考电压:3.3V
|
|
dev_lamina.frame_read(0x13, 1) # 读输入电压
|
|
dev_lamina.frame_read(0x16, 1) # 读输出电压
|
|
dev_lamina.frame_read(0x97, 4) # 读校准参数: Vin_a, Vin_b, Vout_a, Vout_b
|
|
dev_lamina.frame_write_one(0x97, 1000) # 写校准参数Vin_a: 1.000
|
|
dev_lamina.frame_write_one(0x97, 1500) # 写校准参数Vin_a: 1.500
|
|
dev_lamina.frame_write_one(0x98, 100) # 写校准参数Vin_b: 1.00
|
|
dev_lamina.frame_write_one(0x98, 150) # 写校准参数Vin_b: 1.50
|
|
dev_lamina.frame_write_one(0x99, 1000) # 写校准参数Vout_a: 1.000
|
|
dev_lamina.frame_write_one(0x99, 1500) # 写校准参数Vout_a: 1.500
|
|
dev_lamina.frame_write_one(0x9A, 1000) # 写校准参数Vout_a: 1.00
|
|
dev_lamina.frame_write_one(0x9A, 1500) # 写校准参数Vout_a: 1.50
|
|
if 0:
|
|
dev_lamina.frame_write_str(0x82, [0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
|
|
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
dev_lamina.frame_write_str(0x82, [0x0A, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
dev_lamina.frame_write_str(0x82, [0xFF, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
dev_lamina.frame_write_str(0x82, [0xA1, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
dev_lamina.frame_write_str(0x82, [0x1A, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
dev_lamina.frame_write_str(0x82, [0x99, 0x99, 0x99, 0x99, 0x99, 0x99])
|
|
dev_lamina.frame_write_str(0x82, [0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
|
|
if 0:
|
|
dev_lamina.frame_write_one(0x52, 0x01)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_one(0x50, 0x00)
|
|
if 0:
|
|
while 1:
|
|
code_mes = input("扫描数据")
|
|
temp = code_mes[5:7] + code_mes[-10:]
|
|
code_addr = [int(temp[i:i+2], 16) for i in range(0, len(temp), 2)]
|
|
print(f"扫描结果: {code_mes}")
|
|
print(f"载波地址: {trans_list_to_str(code_addr)}")
|
|
dev_lamina.frame_read(0x100, 0x20)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_str(0x0180, list(code_mes))
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_read(0x180, 0x10)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_str(0x82, code_addr)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_read(0x80, 0x08)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_one(0xA3, 0x01)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_one(0x51, 0x01)
|
|
time.sleep(5)
|
|
dev_lamina.frame_read(0x0E, 0x13)
|
|
time.sleep(5)
|
|
dev_lamina.frame_read(0x0E, 0x13)
|
|
time.sleep(0.5)
|
|
dev_lamina.frame_write_one(0x50, 0x00)
|
|
if 0:
|
|
dev_lamina.frame_write_one(0x0054, 0x01)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
dev_lamina.frame_write_one(0x0054, 0x03)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
dev_lamina.frame_write_one(0x0054, 0x07)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
dev_lamina.frame_write_one(0x0054, 0x06)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
dev_lamina.frame_write_one(0x0054, 0x04)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
dev_lamina.frame_write_one(0x0054, 0x00)
|
|
dev_lamina.frame_read(0x000E, 0x02)
|
|
if 0:
|
|
dev_lamina.frame_read(0x0E, 0x14)
|
|
if 0:
|
|
dev_lamina.frame_read(0x60, 0x60)
|
|
if 0:
|
|
dev_lamina.frame_read(0x70, 0x02)
|
|
dev_lamina.frame_write_one(0x70, 2100)
|
|
dev_lamina.frame_write_one(0x71, 2200)
|
|
dev_lamina.frame_read(0x70, 0x02)
|
|
dev_lamina.frame_write_one(0x70, 2300)
|
|
dev_lamina.frame_write_one(0x71, 2100)
|
|
dev_lamina.frame_read(0x70, 0x02)
|
|
if 0:
|
|
dev_lamina.frame_write_str(0x0170, list("SN202405201117-1"))
|
|
dev_lamina.frame_write_str(0x0180, list("MES202405201117-2"))
|
|
dev_lamina.frame_write_str(0x0190, list("D202405211500-3"))
|
|
time.sleep(2)
|
|
dev_lamina.frame_read(0x0170, 0x30)
|
|
|
|
|
|
if __name__=='__main__':
|
|
mode_config = {
|
|
"Log": {'com_name': None,
|
|
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
|
|
},
|
|
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
|
|
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
|
|
'frame_print': True,
|
|
'time_out': 0.1, 'retry': 1, 'retry_sub': 10},
|
|
"HPLC": {'com_name': 'COM10', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
|
|
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
|
|
'frame_print': True,
|
|
'time_out': 0.5, 'retry': 3, 'retry_sub': 10},
|
|
}
|
|
|
|
dev_lamina = LaminaAdapter(**mode_config['Debug'])
|
|
|
|
dev_lamina.frame_read(0x0100, 0x20)
|
|
|
|
|
|
if not hasattr(__builtins__,"__IPYTHON__"):
|
|
# 工程-即时转换
|
|
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
|
|
if not file_hex.exists():
|
|
raise Exception("工程编译目标文件不存在.")
|
|
file_package = make_Pakeage(file_hex)
|
|
|
|
version = "DLSY001_240911_1600_V1.01"
|
|
addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00]
|
|
|
|
while True:
|
|
""" 自动检测升级流程(需要调试) """
|
|
ret = False
|
|
while not ret or ('Reg' not in dev_lamina.output.keys()) or (version == dev_lamina.output['Reg'][0x0100][1]):
|
|
dev_lamina.frame_read(0x82, 3)
|
|
ret = dev_lamina.frame_read(0x0100, 0x20)
|
|
time.sleep(1)
|
|
|
|
dev_lamina.frame_update(file_package)
|
|
|
|
time.sleep(6)
|
|
|
|
ret = dev_lamina.frame_read(0x0100, 0x20)
|
|
|
|
if ret and (version == dev_lamina.output['Reg'][0x0100][1]):
|
|
dev_lamina.frame_write_one(0x52, 0x01)
|
|
|
|
print(f"address: {' '.join(map(lambda x: ('000' + hex(x)[2:])[-2:], addr))}")
|
|
dev_lamina.frame_write_str(0x82, addr)
|
|
dev_lamina.frame_read(0x82, 3)
|
|
addr[5] += 1
|
|
if addr[5] & 0x0F >= 10:
|
|
addr[5] += 0x10 - 10
|
|
|
|
|