积累修改;

This commit is contained in:
何 泽隆
2025-01-20 11:13:57 +08:00
parent 52406c45b9
commit f3847f4f34
12 changed files with 44493 additions and 470 deletions

35739
source/Interactive-2.ipynb Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from bs4 import BeautifulSoup
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.colors as mcolors
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, String, Float, Integer, DateTime
@@ -45,6 +45,20 @@ SemaMap_adapter = {
'curr_out': ('0305121001', 'adapter', True, "输出电流"),
'power_out': ('0305122001', 'adapter', True, "输出功率"),
}
semamap_combiner = {
'IMSI': ('0305102001', 'combiner', False, "IMSI"),
'ICCID': ('0305103001', 'combiner', False, "SIM卡ICCID"),
'MSISDN': ('0305104001', 'combiner', False, "MSISDN"),
'dev_type': ('0305101001', 'combiner', False, "系统类型"),
'facturer': ('0305107001', 'combiner', False, "汇流箱厂家"),
'model': ('0305108001', 'combiner', False, "汇流箱型号"),
'ver_software': ('0305105001', 'combiner', False, "软件版本"),
'ver_hardware': ('0305106001', 'combiner', False, "硬件版本"),
'power_total': ('0305109001', 'combiner', True, "系统总功率"),
'energy_total': ('0305110001', 'combiner', True, "系统累计发电量"),
'energy_daily': ('0305111001', 'combiner', True, "系统日发电量"),
}
SemaMap_meter = {
'mtr_id': ('0305123001', 'meter', False, "电表号"),
'mtr_volt': ('0305124001', 'meter', True, "直流电压"),
@@ -198,8 +212,9 @@ class Lamina_Data(object):
print(f"Get data success, len={len(json_data['data'])}")
table_data = pd.DataFrame(json_data['data'])
column_name = sorted(table_data.columns)
table_data['dev'] = device_id
table_data['time'] = table_data['updateTime'].apply(lambda x: int(time.mktime(time.strptime(x, r"%Y-%m-%d %H:%M:%S"))))
table_data = table_data[['time', *column_name]].drop(columns='updateTime')
table_data = table_data[['time', 'dev', *column_name]].drop(columns='updateTime')
return table_data
else:
print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}")
@@ -345,12 +360,13 @@ class Lamina_Data(object):
dev_meter = []
dev_adapter = []
dev_info = []
try:
for dev in sorted(json_data['rows'], key=lambda x: x['devCode']):
print(f"Dev: {dev['devTypeName']}, id={dev['devCode']}")
time.sleep(0.5)
fsu_id = dev['parentCode'] if 'parentCode' in dev.keys() else None
self.get_real_data_by_net(dev['devCode'], fsu_id, header=header)
dev_info.append(self.get_real_data_by_net(dev['devCode'], fsu_id, header=header))
time.sleep(0.5)
match dev['devType']:
case "0101":
@@ -366,6 +382,7 @@ class Lamina_Data(object):
result = {
'result': True,
'station': station_id,
'information': pd.concat(dev_info, ignore_index=True),
'adapter': pd.concat(dev_adapter, ignore_index=True),
'meter': pd.concat(dev_meter, ignore_index=True),
}
@@ -493,8 +510,6 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
file_input = Path(file_path)
file_output = file_input.parent / (file_input.stem + '_output.xlsx')
df_input = pd.read_excel(file_input)
time_start_timestamp = df_input['开始时间'][0].tz_localize('Asia/Shanghai').timestamp()
time_end_timestamp = df_input['结束时间'][0].tz_localize('Asia/Shanghai').timestamp()
if file_output.exists():
finished_station = pd.read_excel(file_output, sheet_name=None)
finished_station["Station"]['station'] = finished_station["Station"]['station'].astype('str')
@@ -506,11 +521,16 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
remain_station = df_input
dataset = []
df_input = df_input.set_index('点位名称')
for name in remain_station['点位名称']:
print(f"Station: {name}")
time_start_timestamp = df_input['开始时间'][name].tz_localize('Asia/Shanghai').timestamp()
time_end_timestamp = df_input['结束时间'][name].tz_localize('Asia/Shanghai').timestamp()
data = data_lamina.spider_station(name, time_start_timestamp, time_end_timestamp)
if data['result']:
dataset.append(data)
analysis_info1(data)
plt.waitforbuttonpress()
elif data['token']:
""" Token 失效 """
data_lamina.api_origin['header']['authorization'] = data['token']
@@ -536,9 +556,99 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
df_meter.to_excel(writer, sheet_name='Meter', index=False, columns=column_meter)
print(f"数据已成功保存到 {file_output}")
return result
def analysis_info(df_station: pd.DataFrame):
""" 站点Log数据分析 """
map_mid = {}
for k, v in SemaMap_adapter.items():
map_mid[v[0]] = v[3]
for k, v in SemaMap_meter.items():
map_mid[v[0]] = v[3]
map_dev = {
'TTE0102': 'Adapter',
'TTE0103': 'Meter',
}
data = df_station.assign(
timestamp = lambda df: pd.to_datetime(df['time'], unit='s', utc=True).apply(lambda x: x.tz_convert('Asia/Shanghai')),
type = lambda df: df['dev'].apply(lambda x: map_dev[x[:7]]),
date = lambda df: df['timestamp'].apply(lambda x: x.date()),
name = lambda df: df['mid'].map(map_mid),
value = lambda df: pd.to_numeric(df['value'])
)
data_daliy = data.loc[(data['dev'] == 'TTE0102DX2406272727') & (data['date'] == np.datetime64('2024-12-25')) & (data['type'] == 'Adapter')]
fig, axes = plt.subplots(3, 2)
axes = axes.flatten()
i = 0
for name, df_plot in data_daliy.set_index('timestamp').sort_index()[['name', 'value']].groupby('name'):
df_plot.plot(ax=axes[i], title=name)
i += 1
plt.show()
def analysis_info1(data_station: dict):
""" 站点spider返回数据分析 """
# 创建双色颜色过渡
color_map = mcolors.LinearSegmentedColormap.from_list("mycmap", ["blue", "red"])
for dev_id in data_station['information']['dev'].unique():
data_dev = data_station['information'].loc[data_station['information']['dev'] == dev_id]
print(f"Device: {dev_id}")
match dev_id[:7]:
case "TTE0101": # 汇流箱
pass
case "TTE0102": # 适配器
pass
history_dev = data_station['adapter'].assign(
date = lambda df: df['time'].apply(lambda x: x.date()),
)
case "TTE0103": # 电表
pass
history_dev = data_station['meter'].assign(
date = lambda df: df['time'].apply(lambda x: x.date()),
id_group = lambda df: df['date'].diff().ne(0).cumsum(),
)
# 按日期分组并绘制折线图
fig, axs = plt.subplots(3, 1)
axs = axs.flatten()
for date, group in history_dev.groupby('date'):
# 计算当天的起始时间
start_time = pd.Timestamp(date)
# 调整时间索引,使其从当天的起始时间开始
adjusted_time = group['time'] - start_time
# 计算颜色和不透明度
color = color_map(group['id_group'] / history_dev['id_group'][-1])
alpha = 0.5
group.set_index(adjusted_time)['volt'].plot(ax=axs[0], label=str(date), color=color, alpha=alpha)
group.set_index(adjusted_time)['curr'].plot(ax=axs[1], label=str(date), color=color, alpha=alpha)
group.set_index(adjusted_time)['power'].plot(ax=axs[2], label=str(date), color=color, alpha=alpha)
# 添加图例
axs[0].legend(title='Date')
# 添加标题和标签
axs[0].set_title('Value over Time by Date')
axs[0].set_xlabel('Timestamp')
axs[0].set_ylabel('Value')
plt.show()
plt.savefig(Path(f"result\Analysis\{dev_id}.png"))
print(data_dev.head())
if __name__=='__main__':
""" 主体调用流程 """
# plt中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
# 坐标轴负数显示
plt.rcParams['axes.unicode_minus'] = False
if not hasattr(__builtins__,"__IPYTHON__") and 0:
import pickle
path_data1 = Path(r"result\Analysis\station_data1.pkl")
with open(path_data1, 'rb') as f:
loaded_data = pickle.load(f)
analysis_info1(loaded_data)
if hasattr(__builtins__,"__IPYTHON__"):
path_db = '../result/chinatowercom.db'
else:
@@ -547,22 +657,22 @@ if __name__=='__main__':
if not (file_db:= Path(path_db)).exists():
file_db.touch()
API_HEADER['Cookie'] = "HWWAFSESID=7e7df7972959068f88; HWWAFSESTIME=1736230263315; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0"
API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM2MjM3NTMxLCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiIzMWRlNDRiMy05ZTNjLTQwOTEtOWUzMS0wYWFjNTYzZDljZWIiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.INV4DumSZkZZ68TW0DTF1XlIIFuoHD90_JefmmOBxcHPDxHAZPzG4JX9BEcEPrRLfSENtfYW7XNCluzB9nxs_pBTT9iu--tPZwlLAiPD7LZ552VdoAFEsYaigFmwxtedTLTzzm2GVbUReInd1dARjgaK0mKxljkKfGkTJURobpHC9Aw5mu25fSWjv7U9sZ0gOmpCuFr_OukEssi0hV8lvztfN5Ax_E1NObbteY2e8tUh6xVj49pHwDPnQScofGTaSviuMO46zmim6X3AKUJ-jDa95dOygKhk704AiA2nVCHXrlVkJI7zYLZB_zZTw3EhyonpksYS8NPp9wLlearWqg'
API_HEADER['Cookie'] = "HWWAFSESTIME=1737167522632; HWWAFSESID=6cb0288b7bc75e5a66; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0"
API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM3MzQxNDg4LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiIwOGFlZDdjYy1hZGE2LTQ4ZWQtYmQyZS0xYjY3NGRkZmVmMWMiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.CnfJh2ie0D0dOG1yELiQPuXCwez_nzeYD8rXTL0ILSeq31kmTnhOJJTA6aI8JTEtDVgFyqC084uDR1KvDgwKL5aXXzKwCNqBxziJQbA2AuBRdDgdWXM0r_3qrBGL-0MuYB2jygJaNwue2GIh_3PYsMQGRqHBeyJ9JUgdiWYUVpmbYInSyOlY2l_QtzQTFlz8L7eUC0sDeAWSPNamvYczLas0MtuQquH6JM_-WaFfc-6TblmFp6qSxZHJT-0dy7LLTw5zpXbh3QnbjgBARCaOvzLaDtfArgU20Hq3AqAIwvTVOQFeI4jChFIRvyXwnnUDX-IrFru_sOYLX1jcc88cPA'
data_lamina = Lamina_Data('sqlite:///' + path_db)
# 依据站点内设备爬取整个站点的实时与历史数据
# today = datetime.datetime.today()
# yesterday = today - datetime.timedelta(days=1)
# yesterday = today - datetime.timedelta(days=30)
# today_midnight = today.replace(hour=0, minute=0, second=0, microsecond=0)
# yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
# today_midnight_timestamp = time.mktime(today_midnight.timetuple())
# yesterday_midnight_timestamp = time.mktime(yesterday_midnight.timetuple())
# data = data_lamina.spider_station('TTE0102DX2410091439', yesterday_midnight_timestamp, today_midnight_timestamp)
# data = data_lamina.spider_station("乐亭后庞河村", yesterday_midnight_timestamp, today_midnight_timestamp)
# 读取站点历史数据
# save_station_by_file1(data_lamina)
save_station_by_file2(data_lamina, "result\station_Q0107.xlsx")
result = save_station_by_file2(data_lamina, "result\station_Q0120.xlsx")
# 网站令牌更新
body = {

View File

@@ -1,8 +1,7 @@
import time
import warnings
from pathlib import Path
from device.LaminaAdapter import LaminaAdapter
from device.LaminaAdapter import GeneratePackage_SLCP001_p4a0, GeneratePackage_SLCP101_p460, GeneratePackage_DLSY001_p460
from device.LaminaAdapter import GeneratePackage_SLCP102_p460
from device.tools.ByteConv import trans_list_to_str, trans_str_to_list
def test_communication(time_out=2):
@@ -192,6 +191,42 @@ def test():
dev_lamina.frame_read(0x0E, 0x20)
dev_lamina.flag_print = True
if 0: # 程序升级
dev_lamina.frame_update(file_hex, makefile=True)
time.sleep(4.5)
dev_lamina.frame_read(0x100, 0x20)
if 0: # 曲线扫描
dev_lamina.flag_print = False
action_list = [(0x50, 1), (0x50, 0), (0xA8, 0), (0x50, 1)]
dev_lamina.frame_write_one(0x50, 0)
time.sleep(0.5)
dev_lamina.frame_write_one(0x52, 1)
time.sleep(4.5)
dev_lamina.frame_read(0x60, 0x60)
step = 0
time_start = time.time()
time_interval = 120
while True:
time.sleep(1)
print(time.ctime())
dev_lamina.frame_read(0x0E, 0x20)
if time.time() - time_start > time_interval:
if step >= len(action_list):
break
time.sleep(0.5)
time_start = time.time()
result = dev_lamina.frame_write_one(*action_list[step])
print(f"Write Value: {action_list[step][1]} in Addr: 0x{action_list[step][0]:x} by Time: {time.ctime(time_start)}. \n\tresult: {'Seccusss' if result else 'Fail'}.")
step += 1
dev_lamina.flag_print = True
if 0: # 数据读写验证
addr, value = 0x61, 29.9
dlen = 1
result = dev_lamina.frame_write(addr, dlen, value)
print(f"Write Result: \n0x{addr:04x}:\t{value}\t{'Seccusss' if result else 'Fail'}.")
time.sleep(0.5)
dev_lamina.frame_read(addr, dlen)
if __name__=='__main__':
mode_config = {
"Log": {'com_name': None,
@@ -201,19 +236,15 @@ if __name__=='__main__':
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.5, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM9', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
"HPLC": {'com_name': 'COM8', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
'addr_645': trans_str_to_list("02 01 00 00 24 20"),
'frame_print': True,
'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10},
}
dev_lamina = LaminaAdapter(type_dev="SLCP101", **mode_config['Debug'])
dev_lamina = LaminaAdapter(type_dev="SLCP001", **mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_write_one(0x51, 0x01)
# dev_lamina.frame_read(0x1A0, 0x20)
# dev_lamina.frame_log()
# dev_lamina.frame_read(0x1A0, 0x20)
dev_lamina.frame_read(0x0100, 0x30)
# 工程-即时转换
if dev_lamina.device == 'SLCP001':
@@ -227,7 +258,9 @@ if __name__=='__main__':
else:
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
if not file_hex.exists():
raise Exception("工程编译目标文件不存在.")
warnings.warn("工程编译目标文件不存在.", UserWarning)
if dev_lamina.output['Regs'][0x100][1].split('_')[0] != dev_lamina.device:
warnings.warn("设备型号不匹配.", UserWarning)
print(dev_lamina.device)
print(file_hex)

File diff suppressed because it is too large Load Diff

View File

@@ -269,13 +269,30 @@ if __name__ == '__main__':
"dev22": {'device_id': 'TTE0101DX2406300067', # (新版限功率升级)
'frame_print': True,
'time_out': 6, 'retry': 1},
"dev23": {'device_id': 'TTE0101HP2411260059', # 孟村董林小区/移动
'frame_print': True,
'time_out': 6, 'retry': 1},
"dev24": {'device_id': 'TTE0101DX2406280016', # 复兴村-光伏
'frame_print': True,
'time_out': 6, 'retry': 1},
"dev25": {'device_id': 'TTE0101HP2411180035', # 霸州-纸房头
'frame_print': True,
'time_out': 6, 'retry': 1},
"dev26": {'device_id': 'TTE0101DX2408010159', # 句容市小衣庄基站机房
'frame_print': True,
'time_out': 6, 'retry': 1},
"dev27": {'device_id': 'TTE0101HP2411180003', # 文安何庄村新建1
'frame_print': True,
'time_out': 6, 'retry': 1},
}
dev_lamina = LaminaStation(**mode_config["dev22"])
dev_lamina = LaminaStation(**mode_config["dev27"])
dev_lamina.frame_read(0x0000, 0x20)
time.sleep(2)
dev_lamina.frame_read(0x857, 0x40)
dev_lamina.frame_read_adapter(0, 0x0E, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"):
pass
dev_lamina.frame_read(0x400E + 0x200 * (6-1), 0x20)

View File

@@ -52,7 +52,7 @@ class DeviceMQTT:
},
}
def open_connection(self, broker, port, account=None, **kwargs) ->bool:
def open_connection(self, broker, port, account=None, **kwargs):
""" 创建链接 """
def on_connect(client, userdata, flags, rc, properties):
""" 回调函数-创建链接 """
@@ -87,15 +87,18 @@ class DeviceMQTT:
if account is not None:
client.username_pw_set(account[0], account[1])
client.on_connect = on_connect if 'func_on_connect' not in kwargs.keys() else kwargs['func_on_connect']
# client.on_disconnect = on_disconnect if 'func_on_disconnect' not in kwargs.keys() else kwargs['func_on_disconnect']
client.on_disconnect = on_disconnect if 'func_on_disconnect' not in kwargs.keys() else kwargs['func_on_disconnect']
client.on_message = on_message if 'func_on_message' not in kwargs.keys() else kwargs['func_on_message']
client.connect(broker, port)
client.loop_start()
return client
def close_connection(self) ->bool:
""" 关闭连接 """
self.client.loop_close()
self.client.disconnect()
return True
def _subscribe(self, device_id):
""" 订阅主题 """

View File

@@ -174,15 +174,7 @@ class DeviceSerial:
pass
@abstractmethod
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
pass
@abstractmethod
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
pass
@abstractmethod
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
def frame_write(self, daddr, dlen=1, dval=None) -> bool:
pass
@abstractmethod

View File

@@ -1,4 +1,5 @@
import time
import warnings
import hashlib
from math import ceil
from tqdm import tqdm
@@ -113,7 +114,7 @@ ParamMap_LaminaAdapter = {
0xA7: ["电池电压判断限值", 2, 10],
0xA8: ["MPPT追踪模式", 1],
0xA9: ["ADC参考电压", 2, 1000],
0xAA: ["保留", 1],
0xAA: ["硬件版本", 1],
0xAB: ["保留", 1],
0xAC: ["保留", 1],
0xAD: ["保留", 1],
@@ -204,25 +205,44 @@ class LaminaAdapter(DeviceSerial):
}
def frame_read(self, daddr=0x60, dlen=0x50) -> bool:
""" 通用参数读取 """
self.block['data']['type'] = 'read'
self.block['data']['data_addr'] = daddr
self.block['data']['data_len'] = dlen
return self._transfer_data()
def frame_write(self, daddr, dlen=1, dval=None) -> bool:
""" 通用参数写入 """
if daddr in self.block['data']['data_define'].keys():
param_define = self.block['data']['data_define'][daddr]
else:
param_define = [f'未知参数{daddr}', 1]
match param_define[1]:
case 1 | 2:
return self.frame_write_one(daddr, 0 if dval is None else dval)
case 3:
return self.frame_write_dual(daddr, 0 if dval is None else dval)
case 4 | 5 | 6:
return self.frame_write_str(daddr, [] if dval is None else dval)
case 6 | 7 | 8:
type_param_define = next(v for k, v in protocols.modbus_map.items() if v == param_define[1])
warnings.warn(f"DataType unsupport write: {type_param_define}")
return False
case _:
return self.frame_write_one(daddr, 0 if dval is None else dval)
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['data']['type'] = 'write_one'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
self.block['data']['data_val'] = int(dval * item_coff)
return self._transfer_data()
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['data']['type'] = 'write_dual'
self.block['data']['data_addr'] = daddr
self.block['data']['data_val'] = dval
item_coff = self.block['data']['data_define'][daddr][2] if len(self.block['data']['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff)
self.block['data']['data_val'] = int(dval * item_coff)
return self._transfer_data()
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
@@ -302,6 +322,8 @@ def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray:
'prog_id': list(type_dev.encode('ascii')), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
# 升级方式(0-片外缓冲, 1-片内缓冲, 2-升级备份)
'upgrade_type': [0x02, 0x00] if 'upgrade_backup' in kwargs.keys() else [0x00, 0x00],
}
match type_dev:
case 'SLCP001':
@@ -421,277 +443,6 @@ def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Pa
return bytearray(Image)
def GeneratePackage_SLCP001_p4a0(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x040000)
encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GeneratePackage_SLCP101_p460(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GeneratePackage_SLCP102_p460(path_hex: Path):
""" 叠光适配器-460平台版本 生成升级包 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP102"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_main = IntelHex.file_IntelHex_to_Bin(path_hex.read_text(), len_max=0x024000)
encrypt_main = file_upgrade.file_encryption(bin_main)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_hex.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Package generated successfully.")
print(f"File name: {path_hex.name}")
print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
# 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main))
offset_image = 0
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image += len(main_header)
Image[offset_image: offset_image + len(encrypt_main)] = encrypt_main
return bytearray(Image), bin_main
def GenerateImage_SLCP001_p4a0(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-4A0平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP001"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x010000)
bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x0CC000)
bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x040000)
encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x100000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x010000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x0BC000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x0FC000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x0FE000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GenerateImage_SLCP101_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP101"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GenerateImage_SLCP102_p460(path_boot: Path, path_main: Path, path_back: Path):
""" 叠光适配器-460平台版本 镜像生成 """
config = {
'prod_type': [0x45, 0x00], # 产品类型
'method_compress': False, # 文件压缩
'prog_id': list(b"SLCP102"), # 程序识别号
'prog_type': 'app', # 程序类型
'area_code': [0x00, 0x00], # 地区
}
bin_boot = IntelHex.file_IntelHex_to_Bin(path_boot.read_text(), len_max=0x00C000)
bin_main = IntelHex.file_IntelHex_to_Bin(path_main.read_text(), len_max=0x024000)
bin_back = IntelHex.file_IntelHex_to_Bin(path_back.read_text(), len_max=0x024000)
encrypt_main = file_upgrade.file_encryption(bin_main)
encrypt_back = file_upgrade.file_encryption(bin_back)
md5_ctx = hashlib.md5()
md5_ctx.update(bin_main)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_main))
config['hex_name'] = list(path_main.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (main_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
md5_ctx = hashlib.md5()
md5_ctx.update(bin_back)
config["md5"] = list(md5_ctx.digest())
config['file_length'] = ByteConv.conv_int_to_array(len(bin_back))
config['hex_name'] = list(path_back.name.encode())[:64]
config['hex_name'] += [0] * (64 - len(config['hex_name']))
if (back_header:=file_upgrade.build_header_new(config)) is None:
raise Exception("Header tag oversize. ")
print("Merge Image generated successfully.")
print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]")
print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]")
# 组装镜像
Image = [0xFF] * 0x058000
offset_image = 0
Image[offset_image: offset_image + len(bin_boot)] = bin_boot
offset_image = 0x00C000
Image[offset_image: offset_image + len(bin_main)] = bin_main
offset_image = 0x030000
Image[offset_image: offset_image + len(bin_back)] = bin_back
offset_image = 0x054000
Image[offset_image: offset_image + len(main_header)] = main_header
offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header
def GeneratePackage_DLSY001_p460(path_hex: Path):
""" 叠光优化器-460平台版本 生成升级包 """

View File

@@ -14,9 +14,11 @@ modbus_map = {
0x01: ["Hex示例", 1],
0x02: ["Int示例", 2],
0x03: ["Int32示例", 3],
0x04: ["str示例", 4, 16],
0x10: ["addr示例", 5, 6],
0x20: ["Float示例", 6],
0x05: ["str示例", 4, 16],
0x15: ["addr示例", 5, 6],
0x1B: ["Float示例", 6],
0x1D: ["numberList示例", 3],
0x20: ["callback示例", 16],
}
frame_modbus = {

View File

@@ -1,158 +1,8 @@
from pathlib import Path
from device.LaminaAdapter import LaminaAdapter
from device.LaminaAdapter import GenerateImage_SLCP001_p4a0, GeneratePackage_SLCP001_p4a0
from device.LaminaAdapter import GenerateImage_SLCP101_p460, GeneratePackage_SLCP101_p460
from device.LaminaAdapter import GenerateImage_SLCP102_p460, GeneratePackage_SLCP102_p460
from device.LaminaAdapter import GenerateImage_DLSY001_p460, GeneratePackage_DLSY001_p460
def Process0(path_boot: Path, path_project: Path):
""" 适配器-SLCP001 镜像生成流程 """
root_boot = path_boot
root_main = path_project
result = root_main
# 正常启动镜像
hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / "lamina_adapter.hex"
hex_back = root_main / "lamina_adapter_back.hex"
hex_update = hex_main
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'SLCP001_main.header'
file_back_header = result / 'SLCP001_back.header'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_SLCP001_p4a0(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_SLCP001_p4a0(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x0FC018: 0x0FC01A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x0FE018: 0x0FE01A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x0FC018: 0x0FC01A] = [0x00, 0x01]
data_image[0x0FE018: 0x0FE01A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def Process1(path_boot: Path, path_project: Path):
""" 适配器-SLCP101 镜像生成流程 """
root_boot = path_boot
root_main = path_project
result = root_main
# 正常启动镜像
hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / "lamina_adapter.hex"
hex_back = root_main / "lamina_adapter_back.hex"
hex_update = hex_main
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'SLCP101_main.header'
file_back_header = result / 'SLCP101_back.header'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_SLCP101_p460(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_SLCP101_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def Process1_v2(path_boot: Path, path_project: Path):
""" 适配器-SLCP102 镜像生成流程 """
root_boot = path_boot
root_main = path_project
result = root_main
# 正常启动镜像
hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / "lamina_adapter.hex"
hex_back = root_main / "lamina_adapter_back.hex"
hex_update = hex_main
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
file_image = result / f'{hex_main.stem}_ROM.bin'
file_main_header = result / 'SLCP102_main.header'
file_back_header = result / 'SLCP102_back.header'
file_package = result / f'{hex_update.stem}.dat'
file_bin = result / f'{hex_update.stem}.bin'
data_bins = GenerateImage_SLCP102_p460(hex_boot, hex_main, hex_back)
data_package, data_bin = GeneratePackage_SLCP102_p460(hex_update)
file_image.write_bytes(data_bins[0].copy())
file_main_header.write_bytes(data_bins[1].copy())
file_back_header.write_bytes(data_bins[2].copy())
file_package.write_bytes(data_package)
file_bin.write_bytes(data_bin)
# 异常镜像-主分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image)
# 异常镜像-备份分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin'
data_image = data_bins[0].copy()
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image)
# 异常镜像-双分区md5错误
file_image3 = result / f'{file_image.stem}_b3.bin'
data_image = data_bins[0].copy()
data_image[0x054018: 0x05401A] = [0x00, 0x01]
data_image[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image)
def Process2():
""" 优化器-DLSY001 镜像生成流程 """
root = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug")
@@ -215,18 +65,20 @@ def Process(type_dev: str, path_boot: Path, path_project: Path, makePackages: bo
hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / f"{program_name}.hex"
hex_back = root_main / f"{program_name}_back.hex"
hex_update = hex_main
file_image = result / f'{hex_main.stem}_ROM.bin'
file_package = result / f'{hex_update.stem}.dat'
file_package = result / f'{hex_main.stem}.dat'
file_package_backup = result / f'{hex_back.stem}.dat'
dev_lamina = LaminaAdapter(None, type_dev=type_dev)
if makePackages:
if not hex_main.exists():
raise Exception("缺失必要程序文件")
data_package = dev_lamina.make_package(hex_update)
file_package.write_bytes(data_package)
if hex_main.exists():
data_package = dev_lamina.make_package(hex_main)
file_package.write_bytes(data_package)
if hex_back.exists():
data_package_backup = dev_lamina.make_package(hex_back, upgrade_backup=True)
file_package_backup.write_bytes(data_package_backup)
if makeImage:
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
@@ -286,4 +138,4 @@ if __name__ == "__main__":
# Process1_v2(path_boot2, path_main) # 适配器SLCP102
# Process2()
Process(*mode_config['SLCP001'])
Process(*mode_config['SLCP101'])

View File

@@ -1,5 +1,5 @@
import time
from function.tools.ByteConv import display_hex
from device.tools.ByteConv import display_hex
from device.LaminaController import LaminaController
from device.LaminaController import ParamMap_LaminaController

View File

@@ -1,6 +1,196 @@
import time
import pytest
import logging
from typing import Tuple, Generator, List, Optional, Union
from device.DeviceSerial import DeviceSerial
from device.LaminaAdapter import ParamMap_LaminaAdapter
from device.LaminaAdapter import LaminaAdapter
from device.tools.ByteConv import display_hex
TestCase_Controller = {
# 测试用例定义
# 0 - 正常写入数据序列; [(写入数据, 读取数据)]
# 1 - 范围限制测试; (最小值, 最大值)
# 2 - 相关大小限制测试; ((0-等于, 1-小于, 2-大于), 相关值地址, 死区范围)
0x60: {0: [(0, '0x0000'), (1, '0x0001'),
(2, '0x0001'), (0xFF, '0x0001'), (0xFFFF, '0x0001')]}, # 整机运行使能
0x61: {0: [(60.0, 60.0)], 1: (40, 100), 2: (1, 0x62, 0.5)}, # 最小启动允许输入电压
0x62: {0: [(440.0, 440.0)], 1: (350, 560), 2: (2, 0x61, 0.5)}, # 最大启动允许输入电压
0x63: {0: [(58, 58.0)], 1: (38, 98), 2: (1, 0x64, 0.5)}, # 最小停机输入电压
0x64: {0: [(442, 442.0)], 1: (352, 562), 2: (2, 0x63, 0.5)}, # 最大停机输入电压
0x65: {0: [(41, 41.0)], 1: (30, 60), 2: (1, 0x66, 0.5)}, # 最小启动允许输出电压
0x66: {0: [(57, 57.0)], 1: (50, 80), 2: (2, 0x65, 0.5)}, # 最大启动允许输出电压
0x67: {0: [(40, 40.0)], 1: (28, 58), 2: (1, 0x68, 0.5)}, # 最小停止允许输出电压
0x68: {0: [(58, 58.0)], 1: (52, 82), 2: (2, 0x67, 0.5)}, # 最大停止允许输出电压
0x69: {0: [(0.1, 0.1)], 1: (0, 5), 2: (1, 0x6A, 1.0)}, # 最小MPPT电流限值
0x6A: {0: [(22, 22.0)], 1: (5, 30), 2: (2, 0x69, 1.0)}, # 最大MPPT电流限值
0x6C: {0: [(6000, 6000)], 1: (3000, 7999.999),2: (0, 0x6E, 0.0)}, # 最大功率限值(由于转换误差, 无法写入8000W)
0x6E: {0: [(6000, 6000)], 1: (3000, 7999.999), }, # 最大功率限值存储值(由于转换误差, 无法写入8000W)
0x70: {0: [(500, 500)], 1: (400, 800), }, # Boost输入过压保护值
0x71: {0: [(460, 460)], 1: (300, 600), }, # Boost输出过压保护值
0x72: {0: [(60, 60)], 1: (40, 80), 2: (2, 0x73, 0.5)}, # LLC输出过压保护值
0x73: {0: [(40, 40)], 1: (20, 50), 2: (1, 0x72, 0.5)}, # LLC输出欠压保护值
0x74: {0: [(20, 20)], 1: (6, 30), }, # Boost电感过流保护值
0x75: {0: [(20, 20)], 1: (10, 30), }, # LLC输出电流均值保护值
0x76: {0: [(20, 20)], 1: (10, 30), }, # LLC输出电流峰值保护值
0x78: {0: [(6200, 6200)], 1: (4500, 9000), }, # 过载保护值
0x7A: {0: [(105, 105)], 1: (30, 150), 2: (2, 0x7B, 1.0)}, # 过温故障值
0x7B: {0: [(95, 95)], 1: (20, 150), 2: (1, 0x7A, 1.0)}, # 过温告警值
0x7C: {0: [(85, 85)], 1: (0, 120), 2: (1, 0x7A, 1.0)}, # 过温恢复值
0x7D: {0: [(10, 10)], 1: (0, 60), }, # 输出继电器故障判断差值
0x7E: {0: [(55, 55)], 1: (35, 60), }, # LLC输出电压给定值
0x7F: {0: [(420, 420)], 1: (320, 460), }, # Boost输出电压给定值
0x80: {0: [(56, 56)], 1: (10, 80), }, # 三点法中间阈值
0x81: {0: [(57, 57)], 1: (10, 80), }, # 浮充电压
0x82: {0: [(56, 56)], 1: (10, 80), }, # 恒压充电电压
0x83: {0: [(380, 380)], 1: (0, 450), }, # llc软起开始电压
0x84: {0: [(395, 395)], 1: (0, 450), 2: (1, 0x85, 0.5)}, # boost开始运行电压
0x85: {0: [(410, 410)], 1: (0, 450), 2: (2, 0x84, 0.5)}, # boost停止运行电压
0x86: {0: [(15000, 15000)], 1: (0, 30000), }, # 绝缘检测正阻抗限值
0x88: {0: [(15000, 15000)], 1: (0, 30000), }, # 绝缘检测负阻抗限值
0x8A: {0: [(123, 123), (137, 137)], 1: (50, 200), 2: (1, 0x8B, 0.2)}, # 抖动频率下限(精度误差严重, 难以正常测试)
0x8B: {0: [(137, 137), (123, 123)], 1: (50, 200), 2: (2, 0x8A, -0.2)}, # 抖动频率上限(精度误差严重, 难以正常测试)
0x170: {0: [(b'TTE0102DX20241001120001', 'TTE0102DX20241001120001\x00\x00\x00\x00\x00\x00\x00\x00\x00')]}, # 设备序列号
0x180: {0: [([0x24, 0x10, 0x01, 0x12, 0x00, 0x01], '$\x10\x01\x12\x00\x01' + 26 * '\x00')]}, # 设备MES码
0x190: {0: [(b'241001120001', '241001120001' + 20 * '\000')]}, # 出厂日期批次
}
TestCase_Adapter = {
# 测试用例定义
# 0 - 正常写入数据序列; [(写入数据, 读取数据)]
# 1 - 范围限制测试; (最小值, 最大值)
# 2 - 相关大小限制测试; ((0-等于, 1-小于, 2-大于), 相关值地址, 死区范围)
0x60: {0: [(0, '0x0000'), (1, '0x0001'),
(2, '0x0001'), (0xFF, '0x0001'), (0xFFFF, '0x0001')]}, # 整机运行使能
0x61: {0: [(60.0, 60.0)], 1: (40, 100), 2: (1, 0x62, 0.5)}, # 最小启动允许输入电压
0x62: {0: [(440.0, 440.0)], 1: (350, 560), 2: (2, 0x61, 0.5)}, # 最大启动允许输入电压
0x63: {0: [(58, 58.0)], 1: (38, 98), 2: (1, 0x64, 0.5)}, # 最小停机输入电压
0x64: {0: [(442, 442.0)], 1: (352, 562), 2: (2, 0x63, 0.5)}, # 最大停机输入电压
0x65: {0: [(41, 41.0)], 1: (30, 60), 2: (1, 0x66, 0.5)}, # 最小启动允许输出电压
0x66: {0: [(57, 57.0)], 1: (50, 80), 2: (2, 0x65, 0.5)}, # 最大启动允许输出电压
0x67: {0: [(40, 40.0)], 1: (28, 58), 2: (1, 0x68, 0.5)}, # 最小停止允许输出电压
0x68: {0: [(58, 58.0)], 1: (52, 82), 2: (2, 0x67, 0.5)}, # 最大停止允许输出电压
0x69: {0: [(0.1, 0.1)], 1: (0, 5), 2: (1, 0x6A, 1.0)}, # 最小MPPT电流限值
0x6A: {0: [(22, 22.0)], 1: (5, 30), 2: (2, 0x69, 1.0)}, # 最大MPPT电流限值
0x6C: {0: [(6000, 6000)], 1: (3000, 7999.999),2: (0, 0x6E, 0.0)}, # 最大功率限值(由于转换误差, 无法写入8000W)
0x6E: {0: [(6000, 6000)], 1: (3000, 7999.999), }, # 最大功率限值存储值(由于转换误差, 无法写入8000W)
0x70: {0: [(500, 500)], 1: (400, 800), }, # Boost输入过压保护值
0x71: {0: [(460, 460)], 1: (300, 600), }, # Boost输出过压保护值
0x72: {0: [(60, 60)], 1: (40, 80), 2: (2, 0x73, 0.5)}, # LLC输出过压保护值
0x73: {0: [(40, 40)], 1: (20, 50), 2: (1, 0x72, 0.5)}, # LLC输出欠压保护值
0x74: {0: [(20, 20)], 1: (6, 30), }, # Boost电感过流保护值
0x75: {0: [(20, 20)], 1: (10, 30), }, # LLC输出电流均值保护值
0x76: {0: [(20, 20)], 1: (10, 30), }, # LLC输出电流峰值保护值
0x78: {0: [(6200, 6200)], 1: (4500, 9000), }, # 过载保护值
0x7A: {0: [(105, 105)], 1: (30, 150), 2: (2, 0x7B, 1.0)}, # 过温故障值
0x7B: {0: [(95, 95)], 1: (20, 150), 2: (1, 0x7A, 1.0)}, # 过温告警值
0x7C: {0: [(85, 85)], 1: (0, 120), 2: (1, 0x7A, 1.0)}, # 过温恢复值
0x7D: {0: [(10, 10)], 1: (0, 60), }, # 输出继电器故障判断差值
0x7E: {0: [(55, 55)], 1: (35, 60), }, # LLC输出电压给定值
0x7F: {0: [(420, 420)], 1: (320, 460), }, # Boost输出电压给定值
0x80: {0: [(56, 56)], 1: (10, 80), }, # 三点法中间阈值
0x81: {0: [(57, 57)], 1: (10, 80), }, # 浮充电压
0x82: {0: [(56, 56)], 1: (10, 80), }, # 恒压充电电压
0x83: {0: [(380, 380)], 1: (0, 450), }, # llc软起开始电压
0x84: {0: [(395, 395)], 1: (0, 450), 2: (1, 0x85, 0.5)}, # boost开始运行电压
0x85: {0: [(410, 410)], 1: (0, 450), 2: (2, 0x84, 0.5)}, # boost停止运行电压
0x86: {0: [(15000, 15000)], 1: (0, 30000), }, # 绝缘检测正阻抗限值
0x88: {0: [(15000, 15000)], 1: (0, 30000), }, # 绝缘检测负阻抗限值
0x8A: {0: [(123, 123), (137, 137)], 1: (50, 200), 2: (1, 0x8B, 0.2)}, # 抖动频率下限(精度误差严重, 难以正常测试)
0x8B: {0: [(137, 137), (123, 123)], 1: (50, 200), 2: (2, 0x8A, -0.2)}, # 抖动频率上限(精度误差严重, 难以正常测试)
0x170: {0: [(b'TTE0102DX20241001120001', 'TTE0102DX20241001120001\x00\x00\x00\x00\x00\x00\x00\x00\x00')]}, # 设备序列号
0x180: {0: [([0x24, 0x10, 0x01, 0x12, 0x00, 0x01], '$\x10\x01\x12\x00\x01' + 26 * '\x00')]}, # 设备MES码
0x190: {0: [(b'241001120001', '241001120001' + 20 * '\000')]}, # 出厂日期批次
}
def frame_write(device:DeviceSerial, address, length, value):
""" 整合参数写入接口 """
return device.frame_write(address, length, value)
def frame_read(device:DeviceSerial, address, length=None):
""" 整合参数读取接口 """
if length is None:
info = device.block['data']['data_define'][address]
length = 2 if info[1] in [3, 6] else 1
length = info[2] if info[1] in [4, 5, 7] else length
return device.output['Regs'][address][1] if device.frame_read(address, length) else None
def value_case_generator(device:DeviceSerial, address:int, itemCase:dict) -> Generator[object, object, bool]:
""" 依据地址与参数配置生成测试用例 """
addr_relate = None
list_case_normal = []
match itemCase.keys():
case 0: # 常规读写用例测试
for data_write, data_read in itemCase[0]:
logging.info(f"Param Case:\taddr={display_hex(address)}, \tvalue={data_write}")
yield data_write, data_read, True
case 2: # 存在大小约束相关项
assert device.block['data']['data_define'][address][1] in [2, 3]
type_relate, addr_relate, dead_zone = itemCase[2]
val_relate = frame_read(device, addr_relate)
yield val_relate, val_relate, False
pass
case 1: # 写入范围用例测试
val_min, val_max = itemCase[1]
pass
if 1 in itemCase.keys():
""" 写入范围用例测试 """
testzone = 0.7 if info_param[1] in [2, 3] and len(info_param) < 3 else 7 / info_param[2]
accuracy = 0.20001 if info_param[1] in [2, 3] and len(info_param) < 3 else 2.0001 / info_param[2]
val_min, val_max = itemCase[1]
if 2 in itemCase.keys():
""" 存在大小约束相关项 """
list_case_relate = [] # 对约束相关项的修改
list_case_late = [] # 存在约束相关项影响后的测试用例
mode_relate, addr_relate, deadzone = itemCase[2]
val_relate = frame_read(device, addr_relate, ParamCase[addr_relate])
if mode_relate == 1:
if (val_relate - deadzone) < val_max:
""" 约束项已限制写入范围 """
list_case_relate.append((val_max + 2 * abs(deadzone), val_max + 2 * abs(deadzone), True))
list_case_late.append((val_max + testzone, val_max + testzone, False))
list_case_late.append((val_max, val_max, True))
list_case_late.append((val_max - testzone, val_max - testzone, True))
val_max = val_relate - deadzone
else:
""" 约束项未限制写入范围 """
val_relate = val_max
list_case_relate.append((val_relate, val_relate, True))
list_case_late.append((val_relate - deadzone + testzone, val_relate - deadzone + testzone, False))
list_case_late.append((val_relate - deadzone, val_relate - deadzone, True))
list_case_late.append((val_relate - deadzone - testzone, val_relate - deadzone - testzone, True))
elif mode_relate == 2:
if (val_relate + deadzone) > val_min:
""" 约束项已限制写入范围 """
list_case_relate.append((val_min - 2 * abs(deadzone), val_min - 2 * abs(deadzone), True))
list_case_late.append((val_min - testzone, val_min - testzone, False))
list_case_late.append((val_min, val_min, True))
list_case_late.append((val_min + testzone, val_min + testzone, True))
val_min = val_relate + deadzone
else:
""" 约束项未限制写入范围 """
val_relate = val_min
list_case_relate.append((val_relate, val_relate, True))
list_case_late.append((val_relate + deadzone - testzone, val_relate + deadzone - testzone, False))
list_case_late.append((val_relate + deadzone, val_relate + deadzone, True))
list_case_late.append((val_relate + deadzone + testzone, val_relate + deadzone + testzone, True))
list_case_normal.append((val_min - testzone, val_min - testzone, False))
list_case_normal.append((val_min, val_min, True))
list_case_normal.append((val_min + testzone, val_min + testzone, True))
list_case_normal.append((val_max + testzone, val_max + testzone, False))
list_case_normal.append((val_max, val_max, True))
list_case_normal.append((val_max - testzone, val_max - testzone, True))
print(f"Param Case:\taddr={display_hex(addr_param)}")
pass
def test_communication(device: DeviceSerial, time_out=2):
@@ -25,3 +215,150 @@ def test_communication(device: DeviceSerial, time_out=2):
print(f"Success Rate: {device.log['read'] / device.log['send'] * 100}%")
device.flag_print, device.retry, device.time_out = param_saved
@pytest.fixture
def device():
mode_config = {
"Debug": {'com_name': 'COM3',
'frame_print': None,
'time_out': 0.5, 'retry': 3},
}
device = LaminaAdapter(**mode_config['Debug'])
yield device
device.close_connection()
@pytest.mark.parametrize("address, case_value", *TestCase_Adapter.items())
def test_parameters(device:DeviceSerial, address, itemCase):
""" 参数读写测试 """
addr_relate = None
itemCase = ParamCase[addr_param]
list_case_normal = []
if 0 in itemCase.keys():
""" 常规读写用例测试 """
for data_write, data_read in itemCase[0]:
list_case_normal.append((data_write, data_read, True))
if 1 in itemCase.keys():
""" 写入范围用例测试 """
testzone = 0.7 if info_param[1] in [2, 3] and len(info_param) < 3 else 7 / info_param[2]
accuracy = 0.20001 if info_param[1] in [2, 3] and len(info_param) < 3 else 2.0001 / info_param[2]
val_min, val_max = itemCase[1]
if 2 in itemCase.keys():
""" 存在大小约束相关项 """
list_case_relate = [] # 对约束相关项的修改
list_case_late = [] # 存在约束相关项影响后的测试用例
mode_relate, addr_relate, deadzone = itemCase[2]
val_relate = frame_read(device, addr_relate, ParamCase[addr_relate])
if mode_relate == 1:
if (val_relate - deadzone) < val_max:
""" 约束项已限制写入范围 """
list_case_relate.append((val_max + 2 * abs(deadzone), val_max + 2 * abs(deadzone), True))
list_case_late.append((val_max + testzone, val_max + testzone, False))
list_case_late.append((val_max, val_max, True))
list_case_late.append((val_max - testzone, val_max - testzone, True))
val_max = val_relate - deadzone
else:
""" 约束项未限制写入范围 """
val_relate = val_max
list_case_relate.append((val_relate, val_relate, True))
list_case_late.append((val_relate - deadzone + testzone, val_relate - deadzone + testzone, False))
list_case_late.append((val_relate - deadzone, val_relate - deadzone, True))
list_case_late.append((val_relate - deadzone - testzone, val_relate - deadzone - testzone, True))
elif mode_relate == 2:
if (val_relate + deadzone) > val_min:
""" 约束项已限制写入范围 """
list_case_relate.append((val_min - 2 * abs(deadzone), val_min - 2 * abs(deadzone), True))
list_case_late.append((val_min - testzone, val_min - testzone, False))
list_case_late.append((val_min, val_min, True))
list_case_late.append((val_min + testzone, val_min + testzone, True))
val_min = val_relate + deadzone
else:
""" 约束项未限制写入范围 """
val_relate = val_min
list_case_relate.append((val_relate, val_relate, True))
list_case_late.append((val_relate + deadzone - testzone, val_relate + deadzone - testzone, False))
list_case_late.append((val_relate + deadzone, val_relate + deadzone, True))
list_case_late.append((val_relate + deadzone + testzone, val_relate + deadzone + testzone, True))
list_case_normal.append((val_min - testzone, val_min - testzone, False))
list_case_normal.append((val_min, val_min, True))
list_case_normal.append((val_min + testzone, val_min + testzone, True))
list_case_normal.append((val_max + testzone, val_max + testzone, False))
list_case_normal.append((val_max, val_max, True))
list_case_normal.append((val_max - testzone, val_max - testzone, True))
print(f"Param Case:\taddr={display_hex(addr_param)}")
last_value = frame_read(device, addr_param, info_param)
for case_test in list_case_normal:
print(f"\tnormal case={case_test}")
result = frame_write(device, addr_param, info_param, case_test[0])
assert result == case_test[2]
current_value = frame_read(device, addr_param, info_param)
if current_value is None:
raise Exception("Param Read Fail")
elif result:
if type(current_value) is float:
if abs(current_value - case_test[1]) > accuracy:
raise Exception("Param Check Fail")
else:
if current_value != case_test[1]:
raise Exception("Param Check Fail")
elif (not result) and current_value != last_value:
raise Exception("Param Check Fail")
last_value = current_value
if list_case_normal and list_case_normal[0][1] != last_value:
""" 为参数写入首个测试用例数据(一般为参数默认值), 避免影响后续参数测试 """
case_test = list_case_normal[0]
result = frame_write(device, addr_param, info_param, case_test[0])
assert result == case_test[2]
last_value = frame_read(device, addr_param, info_param)
if addr_relate:
""" 存在关联测试项 """
for case_relate in list_case_relate:
print(f"\trelate state: addr={display_hex(addr_relate)}, value={case_relate[0]}")
result = frame_write(device, addr_relate, info_param, case_relate[0])
if result == case_relate[2]:
for case_test in list_case_late:
print(f"\t\tstate case={case_test}")
result = frame_write(device, addr_param, info_param, case_test[0])
assert result == case_test[2]
current_value = frame_read(device, addr_param, info_param)
if current_value is None:
raise Exception("Param Read Fail")
elif result:
if type(current_value) is float:
if abs(current_value - case_test[1]) > accuracy:
raise Exception("Param Check Fail")
else:
if current_value != case_test[1]:
raise Exception("Param Check Fail")
elif (not result) and current_value != last_value:
raise Exception("Param Check Fail")
last_value = current_value
if list_case_normal and list_case_normal[0][1] != last_value:
""" 为参数写入首个测试用例数据(一般为参数默认值), 避免影响后续参数测试 """
case_test = list_case_normal[0]
result = frame_write(device, addr_param, info_param, case_test[0])
assert result == case_test[2]
def main():
mode_config = {
"Log": {'com_name': None,
# 'addr_645': [0x01, 0x00, 0x00, 0x00, 0x00, 0x40],
},
"Debug": {'com_name': 'COM3',
'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True,
'time_out': 0.5, 'retry': 3},
}
dev_lamina = LaminaAdapter(**mode_config['Debug'])
test_parameters(dev_lamina, dev_lamina.block['data']['data_define'], TestCase)
if __name__== "__main__":
pytest.main()