积累修改;

This commit is contained in:
何 泽隆
2024-12-12 22:02:09 +08:00
parent 38d6ff30a3
commit 810bac464f
12 changed files with 599 additions and 234 deletions

View File

@@ -1,33 +1,36 @@
import time import time
import datetime import datetime
import requests import requests
import numpy as np
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.dates as mdates import matplotlib.dates as mdates
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, String, Float, Integer
API_URL = "https://energy-iot.chinatowercom.cn/api/device/device/historyPerformance" API_URL = "https://energy-iot.chinatowercom.cn/api/device/device/historyPerformance"
headers = { API_HEADER = {
"Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Encoding": "gzip, deflate, br, zstd",
"Connection": "keep-alive", "Connection": "keep-alive",
"Content-Length": "211", "Content-Length": "170",
"Cookie": "HWWAFSESTIME=1732173820506; HWWAFSESID=1739685743c73769ff; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0", "Cookie": "HWWAFSESTIME=1732496660747; HWWAFSESID=880ee8eff3a2d23536; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0",
"Host": "energy-iot.chinatowercom.cn", "authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyNTQ4Nzc5LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiI1Y2I3ZDA2ZC1hNzczLTQ5ZDQtYmRiMy05ZjdiOWNmMjY3ZTYiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.Gpk9eYc3W4gcGLTG4aLSEL7E-VD-qPYsELcyyPuZ29e9STfa18WNcbosdIDulbH-5LDqY6i6aFAWpVm41bIgwzLZpAhlp2dthJcoBn9a90wWO4nz-Xi-MK9MVhNR0C28fyxv7h8jP_LRQkBi-pAzm2j65gkSTL3Vakky6znawZ5pY42U74lF-SnYdGyNFK3Ryy3xeMG4N1Pu_OJFdjXdGANUJZcXg1gh8WrERPo2SvuiCzOVTuUQZO7lRc8qGyILTKC-snz5CoonHaCWMaRzcV5VtCGd-yBncpyMQN9rGPqCiiLmuyTG29aZh5UfR_mloGCQHacqRmYSq3uGkPodBg",
"Origin": "https://energy-iot.chinatowercom.cn", "Host": "energy-iot.chinatowercom.cn",
"Sec-Fetch-Dest": "empty", "Origin": "https://energy-iot.chinatowercom.cn",
"Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", "Sec-Fetch-Site": "same-origin",
"accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "accept": "application/json, text/plain, */*",
"authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyMjc4NDA5LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiJkODE0YTZhYy05YmJmLTQ0ZjQtYWRhYi0wMzAzNjUzNmNhNWIiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.VhJaDKwzjekwOCsw_jOF_jvg7sX45okFcxkLyWtbfFVGWVWANhKNhVqj5Dn0Qb3wUXH3e-w74sDN1RI9QADngMOGP_H7aTwI_nukj6VmjpFA7kEtOBwa6ouvPZQMa1qa3UWl21Ac6GoLu14T4TIf4kQAMTdaYAMFrwDAXAkqvIDmKKjZbnDFUjUIcj-J_Y-LfHCEBjtcz7Rp_wMO-PMA5wII6kbcNoSFiYb0djcFQyeBcIUSUTRPixPcTYBkS-IhNrsOePIWlpNYMHbPxZdrZkV4M65BmBn4A9MUjWYHm7iIut8WVMdCXR4Sxp9m0mJHXR_IPWES4O7aBcuMkOmjyw", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"content-type": "application/json;charset=UTF-8", "content-type": "application/json;charset=UTF-8",
"sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"",
"sec-ch-ua-mobile": "?0", "sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "Windows", "sec-ch-ua-platform": "Windows",
} }
SemaMap = { SemaMap = {
@@ -39,73 +42,148 @@ SemaMap = {
'apt_power_out': ('0305122001', 'adapter', "输出功率"), 'apt_power_out': ('0305122001', 'adapter', "输出功率"),
} }
def get_history_data(device_id, data_type, times: tuple[int, int]): class Lamina_Data(object):
""" 读取信号量历史数据, 返回接口json数据 """ """ 叠光主站数据分析 """
body = { def __init__(self, database="sqlite:///:memory", header=None):
"startTimestamp": times[0], """ 初始化 """
"endTimestamp": times[1], self.engine = create_engine(database)
"deviceCode": f"{device_id}",
"mid": f"{data_type}",
"businessType": "7",
"pageNum": 2,
"pageSize": 5,
"total": 0
}
req = requests.post(API_URL, data=body, headers=headers)
return req.json()
def adapter_status_graphs(device_id, times: tuple[int, int]): metadata = MetaData()
""" 获取数据, 绘制图表 """ metadata.reflect(bind=self.engine)
data_volt_in = get_history_data(device_id, SemaMap["apt_volt_in"], times) if 'history' not in metadata.tables:
data_curr_in = get_history_data(device_id, SemaMap["apt_curr_in"], times) history_table = Table(
data_volt_out = get_history_data(device_id, SemaMap["apt_volt_out"], times) 'history', metadata,
data_power_out = get_history_data(device_id, SemaMap["apt_power_out"], times) Column('dev', String(50)),
Column('mid', String(50)),
Column('time', Integer),
Column('value', Float)
)
metadata.create_all(self.engine)
data_apt = [] self.data = {
for item in zip(data_volt_in['data'], data_volt_out['data'], data_curr_in['data'], data_power_out['data']): 'history': pd.read_sql_table('history', self.engine),
print(item)
piont_time = time.mktime(time.strptime(item[0]['collectTime'], r"%Y-%m-%d %H:%M:%S"))
point_apt = {
'time': int(piont_time),
'volt_in': item[0]['value'],
'volt_out': item[1]['value'],
'curr_in': item[2]['value'],
'power_out': item[3]['value'],
} }
data_apt.append(point_apt)
table_apt = pd.DataFrame(data_apt)
# 图表绘制 self.api_origin = {
chart_apt(table_apt) 'domain': 'https://energy-iot.chinatowercom.cn/api',
'header': API_HEADER,
}
def chart_apt(table_apt): def save_history_data(self):
""" 绘制适配器信息图表 """ """ 保存历史数据 """
fig, ax1 = plt.subplots(figsize=(12, 6))
ax1.plot(table_apt['time'], table_apt['volt_in'], color='green', label='Input Voltage')
ax1.plot(table_apt['time'], table_apt['volt_out'], color='red', label='Output Voltage')
# ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M:%S'))
# 设置x轴的主要刻度定位器为自动日期定位器这使得x轴上的刻度根据数据自动选择最合适的日期格式 data_memory = self.data['history']
ax1.xaxis.set_major_locator(mdates.AutoDateLocator()) data_file = pd.read_sql_table('history', self.engine)
ax2 = ax1.twinx() # 合并新数据和现有数据
# 绘制斜线阴影 combined_df = pd.concat([data_file, data_memory], ignore_index=True)
for i in range(len(table_apt) - 1):
ax1.fill_between( # 通过 'dev', 'mid', 'time' 三列进行去重
[table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]], combined_df.drop_duplicates(subset=['dev', 'mid', 'time'], inplace=True)
[table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]],
color='red', alpha=0.5) # 将去重后的数据插入数据库
combined_df.to_sql('history', self.engine, if_exists='replace', index=False)
lines, labels = ax1.get_legend_handles_labels() print(f"成功插入 {len(combined_df)} 条数据")
shadows, shadow_labels = ax2.get_legend_handles_labels()
ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left')
ax1.set_title('Device Data Visualization') return len(combined_df)
ax1.set_xlabel('Time')
ax1.set_ylabel('Voltage (V)')
ax2.set_ylabel('Power (W)')
plt.show() def get_histoty_data(self, device_id, data_type, time_start:int, time_end:int):
""" 读取历史数据 """
flag = 0
time_Interval = 20 * 60
database = self.data['history']
filter_data = database[database['dev'] == device_id &
database['mid'] == SemaMap[data_type][0] &
database['time'].between(time_start, time_end)]
if filter_data.shape[1] == 0 or (filter_data['time'].min() - time_start > time_Interval) or (time_end - filter_data['time'].max() > time_Interval):
""" 需要从网页获取数据 """
flag = 1
return filter_data
def get_history_data_by_net(self, device_id, data_type, time_start:int, time_end:int, header=None):
""" 读取信号量历史数据, 返回接口json数据 """
if header is None:
header = self.api_origin['header']
body = {
"businessType": "7",
"startTimestamp": int(time_start * 1000),
"endTimestamp": int(time_end * 1000),
"deviceCode": f"{device_id}",
"mid": f"{data_type[0]}",
"pageNum": 1,
"pageSize": 10,
"total": 0
}
req = requests.post(API_URL, json=body, headers=header)
json_data = req.json()
if json_data['code'] == 200:
""" 数据读取成功 """
print(f"Get data success, len={len(json_data['data'])}")
table_data = pd.DataFrame(json_data['data'], columns=['collectTime', 'mid', 'midName', 'value'])
table_data['time'] = table_data['collectTime'].apply(lambda x: int(time.mktime(time.strptime(x, r"%Y-%m-%d %H:%M:%S"))))
table_data['dev'] = device_id
return table_data[['dev', 'mid', 'time', 'value']]
else:
print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['msg']}")
return pd.DataFrame([], columns=['dev', 'mid', 'time', 'value'])
def graphs_adapter(self, device_id, time_start:int|str, time_end:int|str):
""" 绘制图表-适配器数据 """
if type(time_start) is str:
time_start = time.mktime(time.strptime(time_start, r"%Y-%m-%d %H:%M:%S"))
if type(time_end) is str:
time_end = time.mktime(time.strptime(time_end, r"%Y-%m-%d %H:%M:%S"))
data_volt_in = self.get_history_data_by_net(device_id, SemaMap["apt_volt_in"], time_start, time_end)
data_curr_in = self.get_history_data_by_net(device_id, SemaMap["apt_curr_in"], time_start, time_end)
data_volt_out = self.get_history_data_by_net(device_id, SemaMap["apt_volt_out"], time_start, time_end)
data_power_out = self.get_history_data_by_net(device_id, SemaMap["apt_power_out"], time_start, time_end)
combined_df = pd.concat([data_volt_in, data_curr_in, data_volt_out, data_power_out], ignore_index=True)
self.data['history'] = pd.concat([self.data['history'], combined_df], ignore_index=True)
self.save_history_data()
data_adapter = pd.DataFrame([], columns=['time', 'volt_in', 'volt_out', 'curr_in', 'power_out'])
data_adapter.time = data_volt_in.time
data_adapter.volt_in = data_volt_in.value
data_adapter.volt_out = data_volt_out.value
data_adapter.curr_in = data_curr_in.value
data_adapter.power_out = data_power_out.value
return data_adapter
def chart_adapter(self, data_adapter):
""" 绘制适配器信息图表 """
fig, ax1 = plt.subplots()
ax1.plot(data_adapter['time'], data_adapter['volt_in'], color='green', label='Input Voltage')
ax1.plot(data_adapter['time'], data_adapter['volt_out'], color='red', label='Output Voltage')
ax2 = ax1.twinx()
ax2.plot(data_adapter['time'], data_adapter['power_out'], color='gray', label='Output Power')
# # 绘制斜线阴影
# for i in range(len(table_apt) - 1):
# ax1.fill_between(
# [table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]],
# [table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]],
# color='red', alpha=0.5)
lines, labels = ax1.get_legend_handles_labels()
shadows, shadow_labels = ax2.get_legend_handles_labels()
ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left')
ax1.set_title('Device Data Visualization')
ax1.set_xlabel('Time')
ax1.set_ylabel('Voltage (V)')
ax2.set_ylabel('Power (W)')
plt.ioff()
plt.show()
plt.savefig('output.png')
# plt.close()
plt.ion()
def sim_data_apt(times:tuple[int, int]): def sim_data_apt(times:tuple[int, int]):
""" 模拟数据 """ """ 模拟数据 """
@@ -124,13 +202,17 @@ def sim_data_apt(times:tuple[int, int]):
return pd.DataFrame(data) return pd.DataFrame(data)
if __name__=='__main__': if __name__=='__main__':
import numpy as np
data = sim_data_apt(('2024-10-1 00:00:00', '2024-10-1 12:00:00')) # API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyNTQzMzIyLCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiI2MGZlZjYxNC0zM2QyLTQ5NjMtYjM5Mi1mNTJlYzJhM2RmYTEiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.jumI9T8swSPCKITrCU06DWRea0tpvwCidw6jFP-F26JH0fRCp801fbzPs5kHkrgh-sm_ayzO2mD9NCkC2hoC-YDQs8HCvXHZXwtYpn3qJ6vpWRQrMr8Vwk8HnuxbFUn-8Ccs88hHOa8rtrEVsglfl0gqU1C8LL1rsHxgYI3MAwwU_eHo74jNjSHuIn79iq4Gyxc6lial6Pb6WWOw1erqKRGRQPcfUtAXMjwXbQ03vFk6G-As_u353yKvznsGR4A70hFHkJJGugK3en8kJO3_g3sE6ddcx0CYicucRLckro4Md_S8mIPkw2RgGEK38vvEo2fqiyT8E67dC5D4_zMw5A'
data_lamina = Lamina_Data('sqlite:///result/chinatowercom.db')
chart_apt(data) # data = sim_data_apt(('2024-10-1 00:00:00', '2024-10-1 12:00:00'))
# chart_apt(data)
plt.waitforbuttonpress() table_apt = data_lamina.graphs_adapter('TTE0102DX2406180988', '2024-11-23 00:00:00', '2024-11-26 00:00:00')
pass data_lamina.chart_adapter(table_apt)
while True:
plt.waitforbuttonpress()

89
source/data_inverter.py Normal file
View File

@@ -0,0 +1,89 @@
import numpy as np
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
def fft_anlyies(data, time_step=0.0002) -> pd.DataFrame:
""" 生成FFT数据 """
frequencies = np.fft.fftfreq(data.shape[0], d=time_step) # 计算频率轴
result = pd.DataFrame([], index=frequencies)
for col_name in data:
""" 分列处理 """
fft_result = np.fft.fft(data[col_name])
fft_magnitude = np.abs(fft_result) # 获取幅度谱
result[col_name + '频谱'] = fft_magnitude
return result
def RecordingAnalysis(data_origin: pd.DataFrame):
""" 光伏逆变器录波数据分析 """
graph_map = {
'电网电压': ['A相电网电压', 'B相电网电压', 'C相电网电压'],
'滤波电容电压': ['A相滤波电容电压', 'B相滤波电容电压', 'C相滤波电容电压'],
'母线电压': ['上母线电压', '下母线电压'],
'母线电流': ['上母线电流', '下母线电流'],
'逆变电流': ['A相逆变电流', 'B相逆变电流', 'C相逆变电流'],
'负载电流': ['A相负载电流', 'B相负载电流', 'C相负载电流'],
'调制比': ['A相调制比', 'B相调制比', 'C相调制比'],
'电压前馈': ['A相电压前馈', 'B相电压前馈', 'C相电压前馈'],
'逆变电流参考值': ['A相逆变电流参考值', 'B相逆变电流参考值', 'C相逆变电流参考值'],
'谐振控制输出': ['谐振控制输出'],
'有源阻尼': ['有源阻尼'],
'零序电压': ['零序电压'],
'相角': ['相角'],
'故障状态': ['状态标签', '故障录波标志位'],
}
data_origin.index = data_origin.index * time_step
for key, value in graph_map.items():
ax = data_origin[value].plot(title=key)
if key == '故障状态':
threshold = 1e-5
diff = data_origin[value].iloc[:-1].reset_index(drop=True) - data_origin[value].iloc[1:].reset_index(drop=True)
diff = diff.replace(0, pd.NA).dropna(how='all')
for x, d in diff.T.items():
if threshold is None or d > threshold:
Y = data_origin.loc[x, value]
for y in Y:
ax.annotate(f'{y:.2f}', (x, y), textcoords="jump points", xytext=(0,5), ha='center')
if key == '电网电压' or key == '滤波电容电压':
""" 三相运行数据 """
data_fft = fft_anlyies(data_origin[value], time_step=time_step)
data_fft.plot(title=(key + ' FFT频谱图'), xlabel = '频率 (Hz)', ylabel = '幅度')
ax_fft = data_fft.plot(title=(key + ' FFT频谱图'), xlabel = '频率 (Hz)', ylabel = '幅度', xlim=(-200, 200))
if __name__ == "__main__":
# plt中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
# 坐标轴负数显示
plt.rcParams['axes.unicode_minus'] = False
time_step = 1 / 5000
data = pd.read_excel(Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\pv_inviter\19__24_12_06_15_19_03.xlsx"))
# Index([
# 'A相电网电压', 'B相电网电压', 'C相电网电压',
# 'A相滤波电容电压', 'B相滤波电容电压', 'C相滤波电容电压',
# '上母线电压', '下母线电压',
# '上母线电流', '下母线电流',
# 'A相逆变电流', 'B相逆变电流', 'C相逆变电流',
# 'A相负载电流', 'B相负载电流', 'C相负载电流',
# 'A相调制比', 'B相调制比', 'C相调制比',
# 'A相电压前馈', 'B相电压前馈', 'C相电压前馈',
# 'A相逆变电流参考值', 'B相逆变电流参考值', 'C相逆变电流参考值',
# '谐振控制输出',
# '有源阻尼',
# '零序电压',
# '相角',
# '状态标签', '故障录波标志位'
# ],
# dtype='object')
RecordingAnalysis(data)
if not hasattr(__builtins__,"__IPYTHON__"):
plt.waitforbuttonpress()

View File

@@ -139,25 +139,29 @@ if __name__=='__main__':
"Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, "Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True, 'frame_print': True,
'time_out': 0.1, 'retry': 1, 'retry_sub': 10}, 'time_out': 0.5, 'retry': 1, 'retry_sub': 10},
"HPLC": {'com_name': 'COM9', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, "HPLC": {'com_name': 'COM9', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x11, 0x01, 0x18, 0x06, 0x24, 0x02], 'addr_645': trans_str_to_list("02 01 00 00 24 20"),
'frame_print': True, 'frame_print': True,
'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10}, 'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10},
} }
dev_lamina = LaminaAdapter(**mode_config['Debug']) dev_lamina = LaminaAdapter(type_dev="SLCP001", **mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_write_one(0x51, 0x01)
# dev_lamina.frame_read(0x1A0, 0x20)
# dev_lamina.frame_log()
# dev_lamina.frame_read(0x1A0, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"): if not hasattr(__builtins__,"__IPYTHON__"):
# 工程-即时转换 # 工程-即时转换
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") # file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex")
file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V1\lamina_adapter\Debug\lamina_adapter.hex")
# file_hex = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug\lamina_optimizer.hex") # file_hex = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug\lamina_optimizer.hex")
if not file_hex.exists(): if not file_hex.exists():
raise Exception("工程编译目标文件不存在.") raise Exception("工程编译目标文件不存在.")
file_package = make_Pakeage(file_hex, GeneratePackage_SLCP102_p460)
version = "SLCP101_241030_2000_V2.03" version = "SLCP101_241030_2000_V2.03"
addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00] addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00]
@@ -170,7 +174,7 @@ if __name__=='__main__':
ret = dev_lamina.frame_read(0x0100, 0x20) ret = dev_lamina.frame_read(0x0100, 0x20)
time.sleep(1) time.sleep(1)
dev_lamina.frame_update(file_package) dev_lamina.frame_update(file_hex, makefile=True)
time.sleep(3) time.sleep(3)

View File

@@ -2,7 +2,7 @@ import time
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed from tenacity import retry, stop_after_attempt, wait_fixed
from device.LaminaController import LaminaController from device.LaminaController import LaminaController, LaminaController_new
from device.LaminaController import GeneratePackage_DLSP001_p280039 from device.LaminaController import GeneratePackage_DLSP001_p280039
from device.LaminaController import GenerateImage_DLSP001_p280039 from device.LaminaController import GenerateImage_DLSP001_p280039
from device.tools.ByteConv import trans_list_to_str from device.tools.ByteConv import trans_list_to_str
@@ -67,10 +67,10 @@ def test_record(path_CFG=None, path_data=None):
text_cfg += f"1\r\n" text_cfg += f"1\r\n"
path_CFG.write_text(text_cfg) path_CFG.write_text(text_cfg)
# Data File # Data File
text_record = "" text_record = ""
for point in data: for point in data:
# line_record = f"{point['index']},{point['timestamp'] * 1000}," # line_record = f"{point['index']},{point['timestamp'] * 1000},"
line_record = f"{point['index']},{30}," line_record = f"{point['index']},{30},"
for data_alg in point['ChAlg']: for data_alg in point['ChAlg']:
line_record += f"{data_alg}," line_record += f"{data_alg},"
@@ -233,17 +233,17 @@ if __name__=='__main__':
"Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, "Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1,
# 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
'frame_print': True, 'frame_print': True,
'time_out': 0.5, 'retry': 3}, 'time_out': 0.5, 'time_gap': 0.02, 'retry': 3},
} }
dev_lamina = LaminaController(**mode_config['Debug']) dev_lamina = LaminaController_new(**mode_config['Debug'])
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_read(0x0A, 0x20) # dev_lamina.frame_read(0x0A, 0x20)
# dev_lamina.frame_read(0x60, 0x60) # dev_lamina.frame_read(0x60, 0x60)
if not hasattr(__builtins__,"__IPYTHON__") and 0: # if not hasattr(__builtins__,"__IPYTHON__"): # and 0
""" 读取故障录波数据 """ """ 读取故障录波数据 """
path_CFG = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.cfg") path_CFG = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.cfg")
path_data = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.dat") path_data = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.dat")
@@ -262,7 +262,7 @@ if __name__=='__main__':
dev_lamina.frame_write_one(0x60, 0) dev_lamina.frame_write_one(0x60, 0)
time.sleep(1) time.sleep(1)
if dev_lamina.frame_update(file_hex, makefile=True): if dev_lamina.frame_update(file_hex, makefile=True):
time.sleep(6) time.sleep(2)
dev_lamina.frame_read(0x0100, 0x20) dev_lamina.frame_read(0x0100, 0x20)
# dev_lamina.frame_write_one(0x52, 0x01) # dev_lamina.frame_write_one(0x52, 0x01)

View File

@@ -128,6 +128,10 @@ class LaminaStation(DeviceMQTT):
'data_define': ParamMap_LaminaCombiner, 'data_define': ParamMap_LaminaCombiner,
} }
def frame_read_adapter(self, id, daddr, dlen) -> bool:
""" 读取适配器数据 """
return self.frame_read(0x4000 + id * 0x200 + daddr, dlen)
def frame_read(self, daddr=0x60, dlen=0x30) -> bool: def frame_read(self, daddr=0x60, dlen=0x30) -> bool:
self.block['type'] = 'read' self.block['type'] = 'read'
self.block['data_addr'] = daddr self.block['data_addr'] = daddr
@@ -137,15 +141,15 @@ class LaminaStation(DeviceMQTT):
def frame_write_one(self, daddr=0x85, dval=-900) -> bool: def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
self.block['type'] = 'write_one' self.block['type'] = 'write_one'
self.block['data_addr'] = daddr self.block['data_addr'] = daddr
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 # item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff) self.block['data_val'] = int(dval) # * item_coff
return self._transfer_data() return self._transfer_data()
def frame_write_dual(self, daddr=0x91, dval=600) -> bool: def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
self.block['type'] = 'write_dual' self.block['type'] = 'write_dual'
self.block['data_addr'] = daddr self.block['data_addr'] = daddr
item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 # item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1
self.block['data_val'] = int(dval * item_coff) self.block['data_val'] = int(dval) # * item_coff
return self._transfer_data() return self._transfer_data()
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
@@ -163,17 +167,40 @@ if __name__ == '__main__':
"dev2": {'device_id': 'TTE0101DX2409230113', # 常来东-光伏 "dev2": {'device_id': 'TTE0101DX2409230113', # 常来东-光伏
'frame_print': True, 'frame_print': True,
'time_out': 4, 'retry': 1}, 'time_out': 4, 'retry': 1},
"dev3": {'device_id': 'TTE0101DX2406270041', # 大丰市镇区补点139 "dev3": {'device_id': 'TTE0101DX2406270041', # 大丰市镇区补点139
'frame_print': True, 'frame_print': True,
'time_out': 4, 'retry': 1}, 'time_out': 4, 'retry': 1},
"dev4": {'device_id': 'TTE0101DX2407020114', # 大丰大龙南 "dev4": {'device_id': 'TTE0101DX2407020114', # 大丰大龙南
'frame_print': True, 'frame_print': True,
'time_out': 4, 'retry': 1}, 'time_out': 4, 'retry': 1},
"dev5": {'device_id': 'TTE0101DX2407010082', # 大丰草堰镇双垛村
'frame_print': True,
'time_out': 4, 'retry': 1},
"dev6": {'device_id': 'TTE0101DX2406140040', # 张家港泗港公落地内爬单管塔
'frame_print': True,
'time_out': 4, 'retry': 1},
"dev7": {'device_id': 'TTE0101DX2406260040', # 常熟河坝落地外爬单管塔
'frame_print': True,
'time_out': 4, 'retry': 1},
"dev8": {'device_id': 'TTE0101DX2407080036', # 昆山好孩子西落地外爬单管塔
'frame_print': True,
'time_out': 4, 'retry': 1},
"dev9": {'device_id': 'TTE0101DX2406140009', # 市区东山镇
'frame_print': True,
'time_out': 4, 'retry': 1},
"dev10": {'device_id': 'TTE0101DX2406270018', # 张家港兆丰北单管塔
'frame_print': True,
'time_out': 4, 'retry': 1},
} }
dev_lamina = LaminaStation(**mode_config["dev4"]) dev_lamina = LaminaStation(**mode_config["dev8"])
dev_lamina.frame_read(0x0000, 0x20) dev_lamina.frame_read(0x0000, 0x20)
dev_lamina.frame_read(0x4100, 0x20) # dev_lamina.frame_read(0x4100, 0x20)
if not hasattr(__builtins__,"__IPYTHON__"): if not hasattr(__builtins__,"__IPYTHON__"):
pass pass
dev_lamina.frame_read(0x400E + 0x200 * (6-1), 0x20)
dev_lamina.frame_read(0x4072 + 0x200 * (6-1), 0x03)

View File

@@ -1,6 +1,8 @@
import time import time
from pathlib import Path
from serial import Serial from serial import Serial
from serial.tools import list_ports from serial.tools import list_ports
from abc import ABC, abstractmethod
from .tools import ByteConv from .tools import ByteConv
from .function import protocols from .function import protocols
@@ -119,6 +121,7 @@ class DeviceSerial:
'bytesize': 8, 'bytesize': 8,
'stopbits': 1, 'stopbits': 1,
} }
kwargs = kwargs if kwargs else None
serial_close = lambda com: com.close() if com.isOpen() else None serial_close = lambda com: com.close() if com.isOpen() else None
serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports()) serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports())
@@ -126,9 +129,13 @@ class DeviceSerial:
match (self.__com, port, kwargs): match (self.__com, port, kwargs):
case (None, str() as port, None): case (None, str() as port, None):
""" 使用默认参数打开串口 """ """ 使用默认参数打开串口 """
if not serial_port_check(port):
raise ValueError("无效串口端口: %s" % port)
self.__com = Serial(port, timeout=0, **com_config) self.__com = Serial(port, timeout=0, **com_config)
case (None, str() as port, dict() as kwargs): case (None, str() as port, dict() as kwargs):
""" 使用指定参数打开串口 """ """ 使用指定参数打开串口 """
if not serial_port_check(port):
raise ValueError("无效串口端口: %s" % port)
com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate']
com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity']
com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize']
@@ -136,19 +143,19 @@ class DeviceSerial:
self.__com = Serial(port, timeout=0, **com_config) self.__com = Serial(port, timeout=0, **com_config)
case (Serial() as com, None, None): case (Serial() as com, None, None):
""" 无参数重开串口 """ """ 无参数重开串口 """
serial_close(self.__com) serial_close(com)
com.open() com.open()
case (Serial() as com, port, None): case (Serial() as com, port, None):
""" 重新指定端口号并打开串口 """ """ 重新指定端口号并打开串口 """
serial_close(self.__com) serial_close(com)
if serial_port_check(port): if not serial_port_check(port):
raise ValueError("无效串口端口: %s" % port) raise ValueError("无效串口端口: %s" % port)
com.port = port com.port = port
com.open() com.open()
case (None, str() as port, dict() as kwargs): case (Serial() as com, str() as port, dict() as kwargs):
""" 重新指定端口号与配置并打开串口 """ """ 重新指定端口号与配置并打开串口 """
serial_close(self.__com) serial_close(com)
if serial_port_check(port): if not serial_port_check(port):
raise ValueError("无效串口端口: %s" % port) raise ValueError("无效串口端口: %s" % port)
com.port = port com.port = port
com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate']
@@ -162,6 +169,26 @@ class DeviceSerial:
return self.__com.is_open return self.__com.is_open
@abstractmethod
def frame_read(self, daddr=0x60, dlen=0x50) -> bool:
pass
@abstractmethod
def frame_write_one(self, daddr=0x85, dval=-900) -> bool:
pass
@abstractmethod
def frame_write_dual(self, daddr=0x91, dval=600) -> bool:
pass
@abstractmethod
def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool:
pass
@abstractmethod
def frame_update(self, path_file: Path, makefile: bool = False) -> bool:
pass

View File

@@ -16,6 +16,7 @@ ParamMap_LaminaAdapter = {
# 4 - str # 4 - str
# 5 - addr # 5 - addr
# 6 - float # 6 - float
# 8 - func_callback
0x00: ["硬件版本识别电压", 2, 1000], 0x00: ["硬件版本识别电压", 2, 1000],
0x01: ["输出电容电电压", 2, 10], 0x01: ["输出电容电电压", 2, 10],
0x02: ["参考电压", 2, 10], 0x02: ["参考电压", 2, 10],
@@ -128,6 +129,13 @@ ParamMap_LaminaAdapter = {
0x170: ["SN", 4, 16], 0x170: ["SN", 4, 16],
0x180: ["MES", 4, 16], 0x180: ["MES", 4, 16],
0x190: ["Datetime", 4, 16], 0x190: ["Datetime", 4, 16],
0x1A0: ["事件记录1", 8, 16],
0x1B0: ["事件记录2", 8, 16],
0x1C0: ["事件记录3", 8, 16],
0x1D0: ["事件记录4", 8, 16],
0x1E0: ["事件记录5", 8, 16],
0x1F0: ["事件记录6", 8, 16],
0x200: ["事件记录-Log", 8, 16],
} }
MemoryMap_SLCP001 = { MemoryMap_SLCP001 = {
@@ -221,6 +229,11 @@ class LaminaAdapter(DeviceSerial):
self.block['data']['data_val'] = dval self.block['data']['data_val'] = dval
return self._transfer_data() return self._transfer_data()
def frame_log(self, step='next') -> bool:
self.block['data']['type'] = 'log'
self.block['data']['step'] = step
return self._transfer_data()
def frame_update(self, path_file: Path, makefile: bool = False) -> bool: def frame_update(self, path_file: Path, makefile: bool = False) -> bool:
""" 程序升级 """ 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
@@ -761,4 +774,4 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path
offset_image = 0x056000 offset_image = 0x056000
Image[offset_image: offset_image + len(back_header)] = back_header Image[offset_image: offset_image + len(back_header)] = back_header
return bytearray(Image), main_header, back_header return bytearray(Image), main_header, back_header

View File

@@ -21,6 +21,10 @@ ParamMap_LaminaController = {
# 7 - numberList # 7 - numberList
0x00: ["绝缘检测正电阻", 6], 0x00: ["绝缘检测正电阻", 6],
0x02: ["绝缘检测负电阻", 6], 0x02: ["绝缘检测负电阻", 6],
0x04: ["负对地绝缘检测电压(动作)" , 2, 100],
0x05: ["负对地绝缘检测电压(未动作)" , 2, 100],
0x06: ["正对地绝缘检测电压(动作)" , 2, 100],
0x07: ["正对地绝缘检测电压(未动作)" , 2, 100],
0x0B: ["事件标志", 1], 0x0B: ["事件标志", 1],
0x0C: ["告警字1", 1], 0x0C: ["告警字1", 1],
0x0D: ["告警字2", 1], 0x0D: ["告警字2", 1],
@@ -117,16 +121,17 @@ ParamMap_LaminaController = {
MemoryMap_280039 = { MemoryMap_280039 = {
'image_size': 2*0x030000, # 镜像文件大小 'image_size': 2*0x030000, # 镜像文件大小
'app_size': 2*0x004000, # 应用程序大小 'boot_size': 2*0x004000, # Boot程序大小
'boot_size': 2*0x014000, # Boot程序大小 'app_size': 2*0x014000, # 应用程序大小
'boot_addr': 0x000000, # Boot程序地址 'boot_addr': 0x000000, # Boot程序地址
'main_header': 0x006000, # main信息地址 'main_header': 0x006000, # main信息地址
'back_header': 0x007000, # back信息地址 'back_header': 0x007000, # back信息地址
'main_info': 0x088000, # main版本地址
'back_info': 0x007000, # back版本地址
'main_addr': 0x008000, # main程序地址 'main_addr': 0x008000, # main程序地址
'back_addr': 0x01C000, # back程序地址 'back_addr': 0x01C000, # back程序地址
'main_info': 0x088000, # main版本地址
'back_info': 0x09C000, # back版本地址
} }
class LaminaController: class LaminaController:
@@ -553,44 +558,49 @@ class LaminaController_new(DeviceSerial):
self.flag_print = False self.flag_print = False
self.retry = 3 self.retry = 3
self.time_out = 1.5 self.time_out = 1.5
self.block['file_block_size'] = 240 try:
# 读取config status = 'init' # 初始化
self.block['type'] = 'record_cfg' self.block['file_block_size'] = 240
self.block['step'] = 'start'
frame_master = function.protocols.make_frame_modbus(self.block)
ret, pbar = True, None status = 'read_config' # 读取config
frame_data_cfg = [] self.block['type'] = 'record_cfg'
while ret: self.block['step'] = 'start'
if ret := self.__transfer_data(frame_master): ret, pbar = True, None
frame_data_cfg.append(self.output['record']) frame_data_cfg = []
if self.output['record']['seq'] == 0: while ret:
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") if ret := self._transfer_data():
self.block['step'] = 'next' frame_data_cfg.append(self.output['record'])
frame_master = function.protocols.make_frame_modbus(self.block) if self.output['record']['seq'] == 0:
elif (self.output['record']['seq']) >= self.output['record']['total']: pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading")
ret = False self.block['step'] = 'next'
pbar and pbar.update() elif (self.output['record']['seq']) >= self.output['record']['total']:
pbar and pbar.close() ret = False
pbar and pbar.update()
pbar and pbar.close()
# 读取data status = 'read_data' # 读取data
self.block['type'] = 'record_data' self.block['type'] = 'record_data'
self.block['step'] = 'start' self.block['step'] = 'start'
frame_master = function.protocols.make_frame_modbus(self.block) ret, pbar = True, None
frame_data_record = []
ret, pbar = True, None while ret:
frame_data_record = [] if ret := self._transfer_data():
while ret: frame_data_record.append(self.output['record'])
if ret := self.__transfer_data(frame_master): if self.output['record']['seq'] == 0:
frame_data_record.append(self.output['record']) pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading")
if self.output['record']['seq'] == 0: self.block['step'] = 'next'
pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") elif (self.output['record']['seq']) >= self.output['record']['total']:
self.block['step'] = 'next' ret = False
frame_master = function.protocols.make_frame_modbus(self.block) pbar and pbar.update()
elif (self.output['record']['seq']) >= self.output['record']['total']: pbar and pbar.close()
ret = False except Exception as ex:
pbar and pbar.update() """ 通用异常处理 """
pbar and pbar.close() self.flag_print, self.retry, self.time_out = param_saved
report = f'Record Fail: {status}'
report += f', Frame in {self.block["record"]["seq"]}' if status == 'read_config' or status == 'read_data' else ''
report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else ''
print(report)
return False
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3): if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3):
@@ -602,7 +612,7 @@ class LaminaController_new(DeviceSerial):
config_record = {'LinePos': []} config_record = {'LinePos': []}
pos = 0 pos = 0
if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1: if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1:
Warning("config 配置文件格式异常") raise Warning("config 配置文件格式异常")
pos += 2 pos += 2
config_record['LinePos'].append(pos) config_record['LinePos'].append(pos)
len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2]) len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2])
@@ -621,7 +631,7 @@ class LaminaController_new(DeviceSerial):
config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2 pos += 2
if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2: if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2:
Warning("config 配置文件格式异常") raise Warning("config 配置文件格式异常")
pos += 2 pos += 2
config_record['LinePos'].append(pos) config_record['LinePos'].append(pos)
config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
@@ -631,7 +641,7 @@ class LaminaController_new(DeviceSerial):
config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2 pos += 2
if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3: if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3:
Warning("config 配置文件格式异常") raise Warning("config 配置文件格式异常")
pos += 2 pos += 2
config_record['LinePos'].append(pos) config_record['LinePos'].append(pos)
config_record['ChannelDescription'] = [] config_record['ChannelDescription'] = []
@@ -648,7 +658,7 @@ class LaminaController_new(DeviceSerial):
config_record['ChannelDescription'].append(bytearray(ch_desc).decode()) config_record['ChannelDescription'].append(bytearray(ch_desc).decode())
config_record['ChannelCoefficient'].append(ch_coe) config_record['ChannelCoefficient'].append(ch_coe)
if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4: if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4:
Warning("config 配置文件格式异常") raise Warning("config 配置文件格式异常")
pos += 2 pos += 2
config_record['LinePos'].append(pos) config_record['LinePos'].append(pos)
config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
@@ -664,7 +674,7 @@ class LaminaController_new(DeviceSerial):
config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos]
pos += 2 pos += 2
if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5: if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5:
Warning("config 配置文件格式异常") raise Warning("config 配置文件格式异常")
pos += 2 pos += 2
config_record['LinePos'].append(pos) config_record['LinePos'].append(pos)
time_stamp = { time_stamp = {
@@ -726,56 +736,57 @@ class LaminaController_new(DeviceSerial):
return True return True
def frame_update(self, path_bin: Path, makefile: bool = False) -> bool: def frame_update(self, path_file: Path, makefile: bool = False) -> bool:
""" 程序升级 """ 程序升级
注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程;
""" """
if makefile:
self.block['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin)
else:
self.block['file'] = path_bin.read_bytes()
self.block['header_offset'] = 184
self.block['type'] = 'update'
param_saved = self.flag_print, self.retry, self.time_out param_saved = self.flag_print, self.retry, self.time_out
self.retry = 5 self.retry = 3
self.time_out = 0.5 self.time_out = 0.5
self.flag_print = False self.flag_print = False
try:
status = 'init' # 初始化
if not path_file.exists():
raise Exception("工程编译目标文件不存在.")
if makefile and self.make_package is not None:
self.block['file'] = self.make_package(path_file)
else:
self.block['file'] = path_file.read_bytes()
# 启动帧 self.block['type'] = 'update'
self.block['step'] = 'start' self.block['header_offset'] = 184
self.block['index'] = 0
frame_master = function.protocols.make_frame_modbus(self.block) status = 'start' # 启动帧
if not self.__transfer_data(frame_master): self.block['step'] = 'start'
self.block['index'] = 0
assert self._transfer_data()
self.block['file_block_size'] = self.output['upgrade']['length']
# 避免接收到延迟返回报文
time.sleep(self.time_out)
status = 'trans' # 文件传输
self.retry = 3
self.time_out = 1.5
self.block['step'] = 'trans'
self.block['index'] = 0
frame_total = ceil((len(self.block['file']) - self.block['header_offset']) / self.block['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block['index'] = idx
assert self._transfer_data()
status = 'end' # 结束升级
self.time_out = 1
self.block['step'] = 'end'
self.block['index'] += 1
assert self._transfer_data()
except Exception as ex:
""" 通用异常处理 """
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
print('Upgrade Fail: start') report = f'Upgrade Fail: {status}'
return False report += f', Frame in {self.block["index"]}' if status == 'trans' else ''
self.block["data"]['file_block_size'] = self.output['upgrade']['length'] report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else ''
print(report)
# 避免接收到延迟返回报文
time.sleep(self.time_out)
# 文件传输
self.retry = 3
self.time_out = 1.5
self.block["data"]['step'] = 'trans'
self.block['index'] = 0
frame_total = ceil((len(self.block["data"]['file']) - self.block['header_offset']) / self.block["data"]['file_block_size'])
for idx in tqdm(range(frame_total), desc="File Transmitting"):
self.block["data"]['index'] = idx
frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: trans data in {idx}')
return False
# 结束升级
self.time_out = 1
self.block["data"]['step'] = 'end'
self.block["data"]['index'] += 1
frame_master = function.protocols.make_frame_modbus(self.block)
if not self.__transfer_data(frame_master):
self.flag_print, self.retry, self.time_out = param_saved
print(f'Upgrade Fail: end')
return False return False
self.flag_print, self.retry, self.time_out = param_saved self.flag_print, self.retry, self.time_out = param_saved
@@ -814,7 +825,7 @@ def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray:
print("Package generated successfully.") print("Package generated successfully.")
print(f"File name: {path_hex.name}") print(f"File name: {path_hex.name}")
print(f"File Info:") print(f"File Info:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main) // 2)}]")
# 组装镜像 # 组装镜像
Image = [0xFF] * (len(main_header) + len(encrypt_main)) Image = [0xFF] * (len(main_header) + len(encrypt_main))
@@ -883,9 +894,9 @@ def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Pa
# Log打印 # Log打印
print("Merge Image generated successfully.") print("Merge Image generated successfully.")
print(f"Main File:") print(f"Main File:")
print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main) // 2)}]")
print(f"Back File:") print(f"Back File:")
print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back) // 2)}]")
# 额外文件 # 额外文件
if 'output_header' in kwargs.keys() and kwargs['output_header']: if 'output_header' in kwargs.keys() and kwargs['output_header']:

View File

@@ -29,6 +29,7 @@ frame_modbus = {
0x04: [5, 1, 2], 0x04: [5, 1, 2],
0x06: [8, 0], 0x06: [8, 0],
0x07: [8, 3, 2, {0x01: 2, 0x02: 0, 0x03: 0}], 0x07: [8, 3, 2, {0x01: 2, 0x02: 0, 0x03: 0}],
0x08: [5, 3, 2, {0x01: 1, 0x02: 0x22, 0x03: 1}],
0x10: [8, 0], 0x10: [8, 0],
0x11: [11, 2, 7] 0x11: [11, 2, 7]
} }
@@ -94,6 +95,19 @@ def make_frame_modbus(block:dict) -> bytearray:
frame.append(block['file_block_size'] // 256) frame.append(block['file_block_size'] // 256)
frame.append(block['file_block_size'] % 256) frame.append(block['file_block_size'] % 256)
elif block['type'] == 'log':
""" 事件记录系列报文 """
frame.append(0x08)
if block['step'] == 'start':
frame.append(0x01)
frame.append(block['data_len'] // 0x100 % 0x100)
frame.append(block['data_len'] % 0x100)
elif block['step'] == 'next':
frame.append(0x02)
elif block['step'] == 'end':
frame.append(0x03)
else:
raise Exception("Modbus Log Frame Step Error.")
else: else:
""" 数据读取系列报文 """ """ 数据读取系列报文 """
data_addr = block['data_addr'] data_addr = block['data_addr']
@@ -217,6 +231,12 @@ def check_frame_modbus(frame:bytes, block:dict) -> dict:
else: else:
raise Exception("Frame read data length error.") raise Exception("Frame read data length error.")
output['result'] = True output['result'] = True
elif output['code_func'] == 0x08:
if frame[2] == 0x02:
output['Regs'] = display_data(0x200, frame[5:-2], block['data_define'])
else:
raise Exception(" Log SubFunc code error.")
output['result'] = True
elif output['code_func'] == 0x06: elif output['code_func'] == 0x06:
""" 单个数据写入帧 """ """ 单个数据写入帧 """
output['result'] = True output['result'] = True
@@ -374,6 +394,32 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict
""" 正序数值表示 """ """ 正序数值表示 """
data_len = current_map[2] data_len = current_map[2]
item = list(map(lambda x: x, data[:2 * data_len])) item = list(map(lambda x: x, data[:2 * data_len]))
elif current_map[1] == 8:
""" 事件记录解析 """
data_len = current_map[2]
time_stamp, event_id, event_num = struct.unpack('<IBB', data[:6])
item = f"Event: {event_id}\n"
item += f"\ttime: {time_stamp}\n"
item += f"\tnumber: {event_num}\n"
if event_id == 2:
""" 故障记录事件 """
info_len, status, flag_fault, flag_alarm, volt_in, volt_out, curr_in, curr_induc, cbox_power_limit, flag_control = struct.unpack('<BBHHhhhhhH', data[6:6+18])
info = data[6+18:6+18+info_len].decode().strip('\x00')
item += f"\tData: {info}\n"
item += f"\t\tstatus: {status:x}\n"
item += f"\t\tflag_fault: {flag_fault:x}\n"
item += f"\t\tflag_alarm: {flag_alarm:x}\n"
item += f"\t\tvolt_in: {volt_in / 10}\n"
item += f"\t\tvolt_out: {volt_out / 10}\n"
item += f"\t\tcurr_in: {curr_in / 100}\n"
item += f"\t\tcurr_induc: {curr_induc / 100}\n"
item += f"\t\tcbox_power_limit: {cbox_power_limit / 10}\n"
item += f"\t\tflag_control: {flag_control:b}\n"
else:
info_len, = struct.unpack('<B', data[6:6+1])
info = data[7:7+info_len].decode().strip('\x00')
item += f"\tData: {info}\n"
output_data[idx] = data_label, item output_data[idx] = data_label, item
idx += data_len idx += data_len

View File

@@ -1,7 +1,7 @@
def trans_list_to_str(data: list) -> str: def trans_list_to_str(data: list, word_len=2, prefix='') -> str:
""" 标准串口字符串表示 """ """ 标准串口字符串表示 """
func_trans = lambda x: ('00' + hex(x % 256)[2:])[-2:].upper() func_trans_word = lambda x: prefix + ('0' * word_len + hex(x % (0x10 ** word_len))[2:])[-word_len:].upper()
return " ".join(map(func_trans, data)) return " ".join(map(func_trans_word, data))
def trans_str_to_list(data: str) -> list: def trans_str_to_list(data: str) -> list:
@@ -26,4 +26,14 @@ def conv_int_to_array(num: int, big_end=False):
def display_hex(data:int, length:int=2) -> str: def display_hex(data:int, length:int=2) -> str:
""" Hex字符固定最小长度表示 """ """ Hex字符固定最小长度表示 """
return f"0x{data:0{length}X}" return f"0x{data:0{length}X}"
def trans_list_to_str_by_word(data: list, big_end=True) -> str:
""" 将数据以word格式显示 """
s = [data[i+1] * 0x100 + data[i] for i in range(0, len(data), 2)]
r = " ".join(map(hex, s))
print(data)
print(trans_list_to_str(data))
print(r)
return r

View File

@@ -199,62 +199,91 @@ def Process2():
file_image3.write_bytes(data_image) file_image3.write_bytes(data_image)
def Process(type_dev: str, path_boot: Path, path_project: Path): def Process(type_dev: str, path_boot: Path, path_project: Path, makePackages: bool =True, makeImage: bool = True):
""" 镜像生成流程 """ """ 镜像生成流程 """
root_boot = path_boot root_boot = path_boot
root_main = path_project root_main = path_project
result = root_main result = root_main
# 程序文件名适配
if type_dev[:4] == 'DLSY':
program_name = 'lamina_optimizer'
else:
program_name = 'lamina_adapter'
# 正常启动镜像 # 正常启动镜像
hex_boot = root_boot / "bootloader.hex" hex_boot = root_boot / "bootloader.hex"
hex_main = root_main / "lamina_adapter.hex" hex_main = root_main / f"{program_name}.hex"
hex_back = root_main / "lamina_adapter_back.hex" hex_back = root_main / f"{program_name}_back.hex"
hex_update = hex_main hex_update = hex_main
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
file_image = result / f'{hex_main.stem}_ROM.bin' file_image = result / f'{hex_main.stem}_ROM.bin'
file_package = result / f'{hex_update.stem}.dat' file_package = result / f'{hex_update.stem}.dat'
dev_lamina = LaminaAdapter(None, type_dev=type_dev) dev_lamina = LaminaAdapter(None, type_dev=type_dev)
data_package = dev_lamina.make_package(hex_update) if makePackages:
data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True) if not hex_main.exists():
raise Exception("缺失必要程序文件")
data_package = dev_lamina.make_package(hex_update)
file_package.write_bytes(data_package)
if makeImage:
if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()):
raise Exception("缺失必要程序文件")
data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True)
file_image.write_bytes(data_image)
file_image.write_bytes(data_image) # 异常镜像-主分区md5错误
file_package.write_bytes(data_package) file_image1 = result / f'{file_image.stem}_b1.bin'
data_image_copy = data_image
if type_dev == 'SLCP001':
data_image_copy[0x0FC018: 0x0FC01A] = [0x00, 0x01]
else:
data_image_copy[0x054018: 0x05401A] = [0x00, 0x01]
file_image1.write_bytes(data_image_copy)
# 异常镜像-分区md5错误 # 异常镜像-备份分区md5错误
file_image1 = result / f'{file_image.stem}_b1.bin' file_image2 = result / f'{file_image.stem}_b2.bin'
data_image_copy = data_image data_image_copy = data_image
data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] if type_dev == 'SLCP001':
file_image1.write_bytes(data_image_copy) data_image_copy[0x0FE018: 0x0FE01A] = [0x00, 0x01]
else:
data_image_copy[0x056018: 0x05601A] = [0x00, 0x01]
file_image2.write_bytes(data_image_copy)
# 异常镜像-备份分区md5错误 # 异常镜像-分区md5错误
file_image2 = result / f'{file_image.stem}_b2.bin' file_image3 = result / f'{file_image.stem}_b3.bin'
data_image_copy = data_image data_image_copy = data_image
data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] if type_dev == 'SLCP001':
file_image2.write_bytes(data_image_copy) data_image_copy[0x0FE018: 0x0FE01A] = [0x00, 0x01]
data_image_copy[0x0FC018: 0x0FC01A] = [0x00, 0x01]
# 异常镜像-双分区md5错误 else:
file_image3 = result / f'{file_image.stem}_b3.bin' data_image_copy[0x054018: 0x05401A] = [0x00, 0x01]
data_image_copy = data_image data_image_copy[0x056018: 0x05601A] = [0x00, 0x01]
data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] file_image3.write_bytes(data_image_copy)
data_image_copy[0x056018: 0x05601A] = [0x00, 0x01]
file_image3.write_bytes(data_image_copy)
if __name__ == "__main__": if __name__ == "__main__":
path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
path_main = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug") path_boot3 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_OPTIMIZER")
path_main1 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug")
path_main2 = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug")
path_main3 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V1\lamina_adapter\Debug")
path_main4 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V3.01\lamina_adapter\Debug")
path_main5 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V2.03\lamina_adapter\Debug")
mode_config = {
'SLCP001': ('SLCP001', path_boot1, path_main3),
'SLCP101': ('SLCP101', path_boot2, path_main5),
'SLCP102': ('SLCP102', path_boot2, path_main4),
'DLSY001': ('DLSY001', path_boot3, path_main2),
'SLCP102_error': ('SLCP101', path_boot2, path_main4),
}
# Process0(path_boot1, path_main) # 适配器SLCP001 # Process0(path_boot1, path_main) # 适配器SLCP001
Process1(path_boot2, path_main) # 适配器SLCP101 # Process1(path_boot2, path_main) # 适配器SLCP101
# Process1_v2(path_boot2, path_main) # 适配器SLCP102 # Process1_v2(path_boot2, path_main) # 适配器SLCP102
# Process2() # Process2()
path_boot1 = Path(r"resource/460-PROJ_STACKLIGHT_OPTIMIZER") Process(*mode_config['SLCP101'])
path_boot2 = Path(r"resource/460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR")
path_project = Path(r"resource\SLCP101")
Process('SLCP101', path_boot2, path_project)

27
source/test_parameter.py Normal file
View File

@@ -0,0 +1,27 @@
import time
from device.DeviceSerial import DeviceSerial
from device.LaminaAdapter import ParamMap_LaminaAdapter
def test_communication(device: DeviceSerial, time_out=2):
""" 通信成功率测试 """
time_start = time.time()
param_saved = device.flag_print, device.retry, device.time_out
device.flag_print = False
device.retry = 1
try:
while True:
device.frame_read(0x0C, 0x20)
print(f"Time Stamp: {time.ctime()}")
print(f"Success Frame: {device.log['read']}")
print(f"Failed Frame: {device.log['send'] - device.log['read']}")
print(f"Max Series Failed Frame: {device.log['keep-fail']}")
time.sleep(time_out)
finally:
time_end = time.time()
print("Test Result: ")
print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}")
print(f"Time Elapsed: {time_end - time_start}")
print(f"Success Rate: {device.log['read'] / device.log['send'] * 100}%")
device.flag_print, device.retry, device.time_out = param_saved