更新数据项地址;
添加通信函数返回值; 添加通信成功率测试函数;
This commit is contained in:
@@ -26,8 +26,8 @@ modbus_map = {
|
||||
0x1A: ["输入功率", 3],
|
||||
0x1C: ["设备温度", 2],
|
||||
0x1D: ["开关机状态", 1],
|
||||
0x1E: ["电池电压", 6],
|
||||
0x20: ["并机功率限值", 6],
|
||||
0x1E: ["电池电压", 2],
|
||||
0x1F: ["并机功率限值", 3],
|
||||
|
||||
0x60: ["光伏通道使能", 1],
|
||||
0x61: ["最小启动输入电压", 2],
|
||||
@@ -90,7 +90,7 @@ modbus_map = {
|
||||
0xA4: ["电感电流给定值curr_set", 2],
|
||||
0xA5: ["抖动频率上限", 2],
|
||||
0xA6: ["抖动频率下限", 2],
|
||||
0xA7: ["保留", 1],
|
||||
0xA7: ["电池电压判断限值", 2],
|
||||
0xA8: ["保留", 1],
|
||||
0xA9: ["保留", 1],
|
||||
0xAA: ["保留", 1],
|
||||
@@ -187,49 +187,59 @@ class LaminaAdapter:
|
||||
print(trans_list_to_str(frame))
|
||||
print(trans_list_to_str(frame_recv))
|
||||
|
||||
return cnt < self.retry
|
||||
|
||||
def frame_read(self, daddr=0x60, dlen=0x50):
|
||||
self.block['data']['type'] = 'read'
|
||||
self.block['data']['data_addr'] = daddr
|
||||
self.block['data']['data_len'] = dlen
|
||||
frame = make_frame_dlt645(self.block)
|
||||
|
||||
ret = False
|
||||
if self.com is None:
|
||||
print(trans_list_to_str(frame))
|
||||
else:
|
||||
self._transfer_data(frame)
|
||||
ret = self._transfer_data(frame)
|
||||
return ret
|
||||
|
||||
def frame_write_one(self, daddr=0x85, dval=-900):
|
||||
self.block['data']['type'] = 'write_one'
|
||||
self.block['data']['data_addr'] = daddr
|
||||
self.block['data']['data_val'] = dval
|
||||
frame = make_frame_dlt645(self.block)
|
||||
|
||||
ret = False
|
||||
if self.com is None:
|
||||
print(trans_list_to_str(frame))
|
||||
return
|
||||
else:
|
||||
self._transfer_data(frame)
|
||||
ret = self._transfer_data(frame)
|
||||
return ret
|
||||
|
||||
def frame_write_dual(self, daddr=0x91, dval=600):
|
||||
self.block['data']['type'] = 'write_dual'
|
||||
self.block['data']['data_addr'] = daddr
|
||||
self.block['data']['data_val'] = dval
|
||||
frame = make_frame_dlt645(self.block)
|
||||
|
||||
ret = False
|
||||
if self.com is None:
|
||||
print(trans_list_to_str(frame))
|
||||
return
|
||||
else:
|
||||
self._transfer_data(frame)
|
||||
ret = self._transfer_data(frame)
|
||||
return ret
|
||||
|
||||
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]):
|
||||
self.block['data']['type'] = 'write_str'
|
||||
self.block['data']['data_addr'] = daddr
|
||||
self.block['data']['data_val'] = dval
|
||||
frame = make_frame_dlt645(self.block)
|
||||
|
||||
ret = False
|
||||
if self.com is None:
|
||||
print(trans_list_to_str(frame))
|
||||
return
|
||||
else:
|
||||
self._transfer_data(frame)
|
||||
ret = self._transfer_data(frame)
|
||||
return ret
|
||||
|
||||
def frame_update(self, path_bin):
|
||||
""" 程序升级
|
||||
@@ -331,6 +341,36 @@ class LaminaAdapter:
|
||||
if cnt >= self.retry:
|
||||
raise Exception("Error, Retry failed.")
|
||||
|
||||
|
||||
def test_communication():
|
||||
""" 通信成功率测试 """
|
||||
log_success = 0
|
||||
log_failed = 0
|
||||
log_failedseries = 0
|
||||
cnt_failedseries = 0
|
||||
time_start = time.time()
|
||||
try:
|
||||
while 1:
|
||||
if dev_lamina.frame_read(0x0E, 0x13):
|
||||
log_success += 1
|
||||
cnt_failedseries = 0
|
||||
else:
|
||||
log_failed += 1
|
||||
cnt_failedseries += 1
|
||||
if log_failedseries <= cnt_failedseries:
|
||||
log_failedseries = cnt_failedseries
|
||||
print(f"Time Stamp: {time.ctime()}")
|
||||
print(f"Success Frame: {log_success}")
|
||||
print(f"Failed Frame: {log_failed}")
|
||||
print(f"Max Series Failed Frame: {log_failedseries}")
|
||||
time.sleep(3)
|
||||
finally:
|
||||
time_end = time.time()
|
||||
print("Test Result: ")
|
||||
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
|
||||
print(f"Time Elapsed: {time_end - time_start}")
|
||||
print(f"Success Rate: {log_success / (log_success + log_failed) * 100}%")
|
||||
|
||||
if __name__=='__main__':
|
||||
mode_config = {
|
||||
"Log": {'com_name': None},
|
||||
|
||||
Reference in New Issue
Block a user