diff --git a/source/data_analysis.py b/source/data_analysis.py index 5c5ec7c..7e3dc4f 100644 --- a/source/data_analysis.py +++ b/source/data_analysis.py @@ -1,33 +1,36 @@ import time import datetime import requests +import numpy as np import pandas as pd from pathlib import Path from bs4 import BeautifulSoup import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates +from sqlalchemy import create_engine +from sqlalchemy import MetaData, Table, Column, String, Float, Integer API_URL = "https://energy-iot.chinatowercom.cn/api/device/device/historyPerformance" -headers = { - "Accept-Encoding": "gzip, deflate, br, zstd", - "Connection": "keep-alive", - "Content-Length": "211", - "Cookie": "HWWAFSESTIME=1732173820506; HWWAFSESID=1739685743c73769ff; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0", - "Host": "energy-iot.chinatowercom.cn", - "Origin": "https://energy-iot.chinatowercom.cn", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-origin", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", - "accept": "application/json, text/plain, */*", - "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", - "authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyMjc4NDA5LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiJkODE0YTZhYy05YmJmLTQ0ZjQtYWRhYi0wMzAzNjUzNmNhNWIiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.VhJaDKwzjekwOCsw_jOF_jvg7sX45okFcxkLyWtbfFVGWVWANhKNhVqj5Dn0Qb3wUXH3e-w74sDN1RI9QADngMOGP_H7aTwI_nukj6VmjpFA7kEtOBwa6ouvPZQMa1qa3UWl21Ac6GoLu14T4TIf4kQAMTdaYAMFrwDAXAkqvIDmKKjZbnDFUjUIcj-J_Y-LfHCEBjtcz7Rp_wMO-PMA5wII6kbcNoSFiYb0djcFQyeBcIUSUTRPixPcTYBkS-IhNrsOePIWlpNYMHbPxZdrZkV4M65BmBn4A9MUjWYHm7iIut8WVMdCXR4Sxp9m0mJHXR_IPWES4O7aBcuMkOmjyw", - "content-type": "application/json;charset=UTF-8", - "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", - "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": "Windows", +API_HEADER = { + "Accept-Encoding": "gzip, deflate, br, zstd", + "Connection": "keep-alive", + "Content-Length": "170", + "Cookie": "HWWAFSESTIME=1732496660747; HWWAFSESID=880ee8eff3a2d23536; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0", + "authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyNTQ4Nzc5LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiI1Y2I3ZDA2ZC1hNzczLTQ5ZDQtYmRiMy05ZjdiOWNmMjY3ZTYiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.Gpk9eYc3W4gcGLTG4aLSEL7E-VD-qPYsELcyyPuZ29e9STfa18WNcbosdIDulbH-5LDqY6i6aFAWpVm41bIgwzLZpAhlp2dthJcoBn9a90wWO4nz-Xi-MK9MVhNR0C28fyxv7h8jP_LRQkBi-pAzm2j65gkSTL3Vakky6znawZ5pY42U74lF-SnYdGyNFK3Ryy3xeMG4N1Pu_OJFdjXdGANUJZcXg1gh8WrERPo2SvuiCzOVTuUQZO7lRc8qGyILTKC-snz5CoonHaCWMaRzcV5VtCGd-yBncpyMQN9rGPqCiiLmuyTG29aZh5UfR_mloGCQHacqRmYSq3uGkPodBg", + "Host": "energy-iot.chinatowercom.cn", + "Origin": "https://energy-iot.chinatowercom.cn", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-origin", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", + "accept": "application/json, text/plain, */*", + "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "content-type": "application/json;charset=UTF-8", + "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": "Windows", } SemaMap = { @@ -39,73 +42,148 @@ SemaMap = { 'apt_power_out': ('0305122001', 'adapter', "输出功率"), } -def get_history_data(device_id, data_type, times: tuple[int, int]): - """ 读取信号量历史数据, 返回接口json数据 """ - body = { - "startTimestamp": times[0], - "endTimestamp": times[1], - "deviceCode": f"{device_id}", - "mid": f"{data_type}", - "businessType": "7", - "pageNum": 2, - "pageSize": 5, - "total": 0 - } - req = requests.post(API_URL, data=body, headers=headers) - return req.json() +class Lamina_Data(object): + """ 叠光主站数据分析 """ + def __init__(self, database="sqlite:///:memory", header=None): + """ 初始化 """ + self.engine = create_engine(database) -def adapter_status_graphs(device_id, times: tuple[int, int]): - """ 获取数据, 绘制图表 """ - data_volt_in = get_history_data(device_id, SemaMap["apt_volt_in"], times) - data_curr_in = get_history_data(device_id, SemaMap["apt_curr_in"], times) - data_volt_out = get_history_data(device_id, SemaMap["apt_volt_out"], times) - data_power_out = get_history_data(device_id, SemaMap["apt_power_out"], times) + metadata = MetaData() + metadata.reflect(bind=self.engine) + if 'history' not in metadata.tables: + history_table = Table( + 'history', metadata, + Column('dev', String(50)), + Column('mid', String(50)), + Column('time', Integer), + Column('value', Float) + ) + metadata.create_all(self.engine) - data_apt = [] - for item in zip(data_volt_in['data'], data_volt_out['data'], data_curr_in['data'], data_power_out['data']): - print(item) - piont_time = time.mktime(time.strptime(item[0]['collectTime'], r"%Y-%m-%d %H:%M:%S")) - point_apt = { - 'time': int(piont_time), - 'volt_in': item[0]['value'], - 'volt_out': item[1]['value'], - 'curr_in': item[2]['value'], - 'power_out': item[3]['value'], + self.data = { + 'history': pd.read_sql_table('history', self.engine), } - data_apt.append(point_apt) - table_apt = pd.DataFrame(data_apt) - # 图表绘制 - chart_apt(table_apt) + self.api_origin = { + 'domain': 'https://energy-iot.chinatowercom.cn/api', + 'header': API_HEADER, + } -def chart_apt(table_apt): - """ 绘制适配器信息图表 """ - fig, ax1 = plt.subplots(figsize=(12, 6)) - ax1.plot(table_apt['time'], table_apt['volt_in'], color='green', label='Input Voltage') - ax1.plot(table_apt['time'], table_apt['volt_out'], color='red', label='Output Voltage') - # ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M:%S')) + def save_history_data(self): + """ 保存历史数据 """ - # 设置x轴的主要刻度定位器为自动日期定位器,这使得x轴上的刻度根据数据自动选择最合适的日期格式 - ax1.xaxis.set_major_locator(mdates.AutoDateLocator()) + data_memory = self.data['history'] + data_file = pd.read_sql_table('history', self.engine) - ax2 = ax1.twinx() - # 绘制斜线阴影 - for i in range(len(table_apt) - 1): - ax1.fill_between( - [table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]], - [table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]], - color='red', alpha=0.5) + # 合并新数据和现有数据 + combined_df = pd.concat([data_file, data_memory], ignore_index=True) + + # 通过 'dev', 'mid', 'time' 三列进行去重 + combined_df.drop_duplicates(subset=['dev', 'mid', 'time'], inplace=True) + + # 将去重后的数据插入数据库 + combined_df.to_sql('history', self.engine, if_exists='replace', index=False) - lines, labels = ax1.get_legend_handles_labels() - shadows, shadow_labels = ax2.get_legend_handles_labels() - ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left') + print(f"成功插入 {len(combined_df)} 条数据") - ax1.set_title('Device Data Visualization') - ax1.set_xlabel('Time') - ax1.set_ylabel('Voltage (V)') - ax2.set_ylabel('Power (W)') + return len(combined_df) - plt.show() + def get_histoty_data(self, device_id, data_type, time_start:int, time_end:int): + """ 读取历史数据 """ + flag = 0 + time_Interval = 20 * 60 + database = self.data['history'] + filter_data = database[database['dev'] == device_id & + database['mid'] == SemaMap[data_type][0] & + database['time'].between(time_start, time_end)] + if filter_data.shape[1] == 0 or (filter_data['time'].min() - time_start > time_Interval) or (time_end - filter_data['time'].max() > time_Interval): + """ 需要从网页获取数据 """ + flag = 1 + + return filter_data + + def get_history_data_by_net(self, device_id, data_type, time_start:int, time_end:int, header=None): + """ 读取信号量历史数据, 返回接口json数据 """ + if header is None: + header = self.api_origin['header'] + + body = { + "businessType": "7", + "startTimestamp": int(time_start * 1000), + "endTimestamp": int(time_end * 1000), + "deviceCode": f"{device_id}", + "mid": f"{data_type[0]}", + "pageNum": 1, + "pageSize": 10, + "total": 0 + } + + req = requests.post(API_URL, json=body, headers=header) + json_data = req.json() + if json_data['code'] == 200: + """ 数据读取成功 """ + print(f"Get data success, len={len(json_data['data'])}") + table_data = pd.DataFrame(json_data['data'], columns=['collectTime', 'mid', 'midName', 'value']) + table_data['time'] = table_data['collectTime'].apply(lambda x: int(time.mktime(time.strptime(x, r"%Y-%m-%d %H:%M:%S")))) + table_data['dev'] = device_id + return table_data[['dev', 'mid', 'time', 'value']] + else: + print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['msg']}") + return pd.DataFrame([], columns=['dev', 'mid', 'time', 'value']) + + def graphs_adapter(self, device_id, time_start:int|str, time_end:int|str): + """ 绘制图表-适配器数据 """ + if type(time_start) is str: + time_start = time.mktime(time.strptime(time_start, r"%Y-%m-%d %H:%M:%S")) + if type(time_end) is str: + time_end = time.mktime(time.strptime(time_end, r"%Y-%m-%d %H:%M:%S")) + data_volt_in = self.get_history_data_by_net(device_id, SemaMap["apt_volt_in"], time_start, time_end) + data_curr_in = self.get_history_data_by_net(device_id, SemaMap["apt_curr_in"], time_start, time_end) + data_volt_out = self.get_history_data_by_net(device_id, SemaMap["apt_volt_out"], time_start, time_end) + data_power_out = self.get_history_data_by_net(device_id, SemaMap["apt_power_out"], time_start, time_end) + + combined_df = pd.concat([data_volt_in, data_curr_in, data_volt_out, data_power_out], ignore_index=True) + self.data['history'] = pd.concat([self.data['history'], combined_df], ignore_index=True) + self.save_history_data() + + data_adapter = pd.DataFrame([], columns=['time', 'volt_in', 'volt_out', 'curr_in', 'power_out']) + data_adapter.time = data_volt_in.time + data_adapter.volt_in = data_volt_in.value + data_adapter.volt_out = data_volt_out.value + data_adapter.curr_in = data_curr_in.value + data_adapter.power_out = data_power_out.value + + return data_adapter + + def chart_adapter(self, data_adapter): + """ 绘制适配器信息图表 """ + fig, ax1 = plt.subplots() + ax1.plot(data_adapter['time'], data_adapter['volt_in'], color='green', label='Input Voltage') + ax1.plot(data_adapter['time'], data_adapter['volt_out'], color='red', label='Output Voltage') + + ax2 = ax1.twinx() + ax2.plot(data_adapter['time'], data_adapter['power_out'], color='gray', label='Output Power') + # # 绘制斜线阴影 + # for i in range(len(table_apt) - 1): + # ax1.fill_between( + # [table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]], + # [table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]], + # color='red', alpha=0.5) + + lines, labels = ax1.get_legend_handles_labels() + shadows, shadow_labels = ax2.get_legend_handles_labels() + ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left') + + ax1.set_title('Device Data Visualization') + ax1.set_xlabel('Time') + ax1.set_ylabel('Voltage (V)') + ax2.set_ylabel('Power (W)') + + plt.ioff() + plt.show() + plt.savefig('output.png') + # plt.close() + plt.ion() def sim_data_apt(times:tuple[int, int]): """ 模拟数据 """ @@ -124,13 +202,17 @@ def sim_data_apt(times:tuple[int, int]): return pd.DataFrame(data) + if __name__=='__main__': - import numpy as np - data = sim_data_apt(('2024-10-1 00:00:00', '2024-10-1 12:00:00')) + # API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzMyNTQzMzIyLCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiI2MGZlZjYxNC0zM2QyLTQ5NjMtYjM5Mi1mNTJlYzJhM2RmYTEiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.jumI9T8swSPCKITrCU06DWRea0tpvwCidw6jFP-F26JH0fRCp801fbzPs5kHkrgh-sm_ayzO2mD9NCkC2hoC-YDQs8HCvXHZXwtYpn3qJ6vpWRQrMr8Vwk8HnuxbFUn-8Ccs88hHOa8rtrEVsglfl0gqU1C8LL1rsHxgYI3MAwwU_eHo74jNjSHuIn79iq4Gyxc6lial6Pb6WWOw1erqKRGRQPcfUtAXMjwXbQ03vFk6G-As_u353yKvznsGR4A70hFHkJJGugK3en8kJO3_g3sE6ddcx0CYicucRLckro4Md_S8mIPkw2RgGEK38vvEo2fqiyT8E67dC5D4_zMw5A' + data_lamina = Lamina_Data('sqlite:///result/chinatowercom.db') - chart_apt(data) + # data = sim_data_apt(('2024-10-1 00:00:00', '2024-10-1 12:00:00')) + # chart_apt(data) - plt.waitforbuttonpress() + table_apt = data_lamina.graphs_adapter('TTE0102DX2406180988', '2024-11-23 00:00:00', '2024-11-26 00:00:00') - pass \ No newline at end of file + data_lamina.chart_adapter(table_apt) + while True: + plt.waitforbuttonpress() diff --git a/source/data_inverter.py b/source/data_inverter.py new file mode 100644 index 0000000..d574c4f --- /dev/null +++ b/source/data_inverter.py @@ -0,0 +1,89 @@ +import numpy as np +import pandas as pd +from pathlib import Path +import matplotlib.pyplot as plt + + +def fft_anlyies(data, time_step=0.0002) -> pd.DataFrame: + """ 生成FFT数据 """ + frequencies = np.fft.fftfreq(data.shape[0], d=time_step) # 计算频率轴 + result = pd.DataFrame([], index=frequencies) + for col_name in data: + """ 分列处理 """ + fft_result = np.fft.fft(data[col_name]) + fft_magnitude = np.abs(fft_result) # 获取幅度谱 + result[col_name + '频谱'] = fft_magnitude + return result + +def RecordingAnalysis(data_origin: pd.DataFrame): + """ 光伏逆变器录波数据分析 """ + graph_map = { + '电网电压': ['A相电网电压', 'B相电网电压', 'C相电网电压'], + '滤波电容电压': ['A相滤波电容电压', 'B相滤波电容电压', 'C相滤波电容电压'], + '母线电压': ['上母线电压', '下母线电压'], + '母线电流': ['上母线电流', '下母线电流'], + '逆变电流': ['A相逆变电流', 'B相逆变电流', 'C相逆变电流'], + '负载电流': ['A相负载电流', 'B相负载电流', 'C相负载电流'], + '调制比': ['A相调制比', 'B相调制比', 'C相调制比'], + '电压前馈': ['A相电压前馈', 'B相电压前馈', 'C相电压前馈'], + '逆变电流参考值': ['A相逆变电流参考值', 'B相逆变电流参考值', 'C相逆变电流参考值'], + '谐振控制输出': ['谐振控制输出'], + '有源阻尼': ['有源阻尼'], + '零序电压': ['零序电压'], + '相角': ['相角'], + '故障状态': ['状态标签', '故障录波标志位'], + } + data_origin.index = data_origin.index * time_step + for key, value in graph_map.items(): + ax = data_origin[value].plot(title=key) + if key == '故障状态': + threshold = 1e-5 + diff = data_origin[value].iloc[:-1].reset_index(drop=True) - data_origin[value].iloc[1:].reset_index(drop=True) + diff = diff.replace(0, pd.NA).dropna(how='all') + for x, d in diff.T.items(): + + if threshold is None or d > threshold: + Y = data_origin.loc[x, value] + for y in Y: + ax.annotate(f'{y:.2f}', (x, y), textcoords="jump points", xytext=(0,5), ha='center') + + if key == '电网电压' or key == '滤波电容电压': + """ 三相运行数据 """ + data_fft = fft_anlyies(data_origin[value], time_step=time_step) + data_fft.plot(title=(key + ' FFT频谱图'), xlabel = '频率 (Hz)', ylabel = '幅度') + ax_fft = data_fft.plot(title=(key + ' FFT频谱图'), xlabel = '频率 (Hz)', ylabel = '幅度', xlim=(-200, 200)) + + + + +if __name__ == "__main__": + # plt中文显示 + plt.rcParams['font.sans-serif'] = ['SimHei'] + # 坐标轴负数显示 + plt.rcParams['axes.unicode_minus'] = False + + time_step = 1 / 5000 + data = pd.read_excel(Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\pv_inviter\19__24_12_06_15_19_03.xlsx")) + # Index([ + # 'A相电网电压', 'B相电网电压', 'C相电网电压', + # 'A相滤波电容电压', 'B相滤波电容电压', 'C相滤波电容电压', + # '上母线电压', '下母线电压', + # '上母线电流', '下母线电流', + # 'A相逆变电流', 'B相逆变电流', 'C相逆变电流', + # 'A相负载电流', 'B相负载电流', 'C相负载电流', + # 'A相调制比', 'B相调制比', 'C相调制比', + # 'A相电压前馈', 'B相电压前馈', 'C相电压前馈', + # 'A相逆变电流参考值', 'B相逆变电流参考值', 'C相逆变电流参考值', + # '谐振控制输出', + # '有源阻尼', + # '零序电压', + # '相角', + # '状态标签', '故障录波标志位' + # ], + # dtype='object') + + RecordingAnalysis(data) + + if not hasattr(__builtins__,"__IPYTHON__"): + plt.waitforbuttonpress() + diff --git a/source/dev_LaminaAdapter.py b/source/dev_LaminaAdapter.py index 24df4ce..9838f7c 100644 --- a/source/dev_LaminaAdapter.py +++ b/source/dev_LaminaAdapter.py @@ -139,25 +139,29 @@ if __name__=='__main__': "Debug": {'com_name': 'COM3', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], 'frame_print': True, - 'time_out': 0.1, 'retry': 1, 'retry_sub': 10}, + 'time_out': 0.5, 'retry': 1, 'retry_sub': 10}, "HPLC": {'com_name': 'COM9', 'baudrate': 9600, 'parity': 'E', 'bytesize': 8, 'stopbits': 1, - # 'addr_645': [0x11, 0x01, 0x18, 0x06, 0x24, 0x02], + 'addr_645': trans_str_to_list("02 01 00 00 24 20"), 'frame_print': True, 'time_out': 3, 'time_gap': 0.1, 'retry': 3, 'retry_sub': 10}, } - dev_lamina = LaminaAdapter(**mode_config['Debug']) + dev_lamina = LaminaAdapter(type_dev="SLCP001", **mode_config['Debug']) dev_lamina.frame_read(0x0100, 0x20) + # dev_lamina.frame_write_one(0x51, 0x01) + # dev_lamina.frame_read(0x1A0, 0x20) + # dev_lamina.frame_log() + # dev_lamina.frame_read(0x1A0, 0x20) if not hasattr(__builtins__,"__IPYTHON__"): # 工程-即时转换 - file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") + # file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug\lamina_adapter.hex") + file_hex = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V1\lamina_adapter\Debug\lamina_adapter.hex") # file_hex = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug\lamina_optimizer.hex") if not file_hex.exists(): raise Exception("工程编译目标文件不存在.") - file_package = make_Pakeage(file_hex, GeneratePackage_SLCP102_p460) version = "SLCP101_241030_2000_V2.03" addr = [0x24, 0x09, 0x12, 0x00, 0x00, 0x00] @@ -170,7 +174,7 @@ if __name__=='__main__': ret = dev_lamina.frame_read(0x0100, 0x20) time.sleep(1) - dev_lamina.frame_update(file_package) + dev_lamina.frame_update(file_hex, makefile=True) time.sleep(3) diff --git a/source/dev_LaminaController.py b/source/dev_LaminaController.py index da43ecc..10e4a49 100644 --- a/source/dev_LaminaController.py +++ b/source/dev_LaminaController.py @@ -2,7 +2,7 @@ import time from pathlib import Path from datetime import datetime from tenacity import retry, stop_after_attempt, wait_fixed -from device.LaminaController import LaminaController +from device.LaminaController import LaminaController, LaminaController_new from device.LaminaController import GeneratePackage_DLSP001_p280039 from device.LaminaController import GenerateImage_DLSP001_p280039 from device.tools.ByteConv import trans_list_to_str @@ -67,10 +67,10 @@ def test_record(path_CFG=None, path_data=None): text_cfg += f"1\r\n" path_CFG.write_text(text_cfg) - # Data File + # Data File text_record = "" for point in data: - # line_record = f"{point['index']},{point['timestamp'] * 1000}," + # line_record = f"{point['index']},{point['timestamp'] * 1000}," line_record = f"{point['index']},{30}," for data_alg in point['ChAlg']: line_record += f"{data_alg}," @@ -233,17 +233,17 @@ if __name__=='__main__': "Debug": {'com_name': 'COM8', 'baudrate': 115200, 'parity': 'N', 'bytesize': 8, 'stopbits': 1, # 'addr_645': [0x01, 0x02, 0x03, 0x04, 0x05, 0x06], 'frame_print': True, - 'time_out': 0.5, 'retry': 3}, + 'time_out': 0.5, 'time_gap': 0.02, 'retry': 3}, } - dev_lamina = LaminaController(**mode_config['Debug']) + dev_lamina = LaminaController_new(**mode_config['Debug']) dev_lamina.frame_read(0x0100, 0x20) # dev_lamina.frame_read(0x0A, 0x20) # dev_lamina.frame_read(0x60, 0x60) - if not hasattr(__builtins__,"__IPYTHON__") and 0: # + if not hasattr(__builtins__,"__IPYTHON__"): # and 0 """ 读取故障录波数据 """ path_CFG = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.cfg") path_data = Path(r"D:\WorkSpace\UserTool\SelfTool\FrameParser\test\p280039\result\record4.dat") @@ -262,7 +262,7 @@ if __name__=='__main__': dev_lamina.frame_write_one(0x60, 0) time.sleep(1) if dev_lamina.frame_update(file_hex, makefile=True): - time.sleep(6) + time.sleep(2) dev_lamina.frame_read(0x0100, 0x20) # dev_lamina.frame_write_one(0x52, 0x01) diff --git a/source/dev_station.py b/source/dev_station.py index dc4fb28..ceeae27 100644 --- a/source/dev_station.py +++ b/source/dev_station.py @@ -128,6 +128,10 @@ class LaminaStation(DeviceMQTT): 'data_define': ParamMap_LaminaCombiner, } + def frame_read_adapter(self, id, daddr, dlen) -> bool: + """ 读取适配器数据 """ + return self.frame_read(0x4000 + id * 0x200 + daddr, dlen) + def frame_read(self, daddr=0x60, dlen=0x30) -> bool: self.block['type'] = 'read' self.block['data_addr'] = daddr @@ -137,15 +141,15 @@ class LaminaStation(DeviceMQTT): def frame_write_one(self, daddr=0x85, dval=-900) -> bool: self.block['type'] = 'write_one' self.block['data_addr'] = daddr - item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 - self.block['data_val'] = int(dval * item_coff) + # item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval) # * item_coff return self._transfer_data() def frame_write_dual(self, daddr=0x91, dval=600) -> bool: self.block['type'] = 'write_dual' self.block['data_addr'] = daddr - item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 - self.block['data_val'] = int(dval * item_coff) + # item_coff = self.block['data_define'][daddr][2] if len(self.block['data_define'][daddr]) > 2 else 1 + self.block['data_val'] = int(dval) # * item_coff return self._transfer_data() def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: @@ -163,17 +167,40 @@ if __name__ == '__main__': "dev2": {'device_id': 'TTE0101DX2409230113', # 常来东-光伏 'frame_print': True, 'time_out': 4, 'retry': 1}, + "dev3": {'device_id': 'TTE0101DX2406270041', # 大丰市镇区补点139 'frame_print': True, 'time_out': 4, 'retry': 1}, "dev4": {'device_id': 'TTE0101DX2407020114', # 大丰大龙南 'frame_print': True, 'time_out': 4, 'retry': 1}, + "dev5": {'device_id': 'TTE0101DX2407010082', # 大丰草堰镇双垛村 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + + "dev6": {'device_id': 'TTE0101DX2406140040', # 张家港泗港公落地内爬单管塔 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev7": {'device_id': 'TTE0101DX2406260040', # 常熟河坝落地外爬单管塔 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev8": {'device_id': 'TTE0101DX2407080036', # 昆山好孩子西落地外爬单管塔 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev9": {'device_id': 'TTE0101DX2406140009', # 市区东山镇 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, + "dev10": {'device_id': 'TTE0101DX2406270018', # 张家港兆丰北单管塔 + 'frame_print': True, + 'time_out': 4, 'retry': 1}, } - dev_lamina = LaminaStation(**mode_config["dev4"]) + dev_lamina = LaminaStation(**mode_config["dev8"]) dev_lamina.frame_read(0x0000, 0x20) - dev_lamina.frame_read(0x4100, 0x20) + # dev_lamina.frame_read(0x4100, 0x20) if not hasattr(__builtins__,"__IPYTHON__"): pass + dev_lamina.frame_read(0x400E + 0x200 * (6-1), 0x20) + + dev_lamina.frame_read(0x4072 + 0x200 * (6-1), 0x03) diff --git a/source/device/DeviceSerial.py b/source/device/DeviceSerial.py index 49f3428..75a02ef 100644 --- a/source/device/DeviceSerial.py +++ b/source/device/DeviceSerial.py @@ -1,6 +1,8 @@ import time +from pathlib import Path from serial import Serial from serial.tools import list_ports +from abc import ABC, abstractmethod from .tools import ByteConv from .function import protocols @@ -119,6 +121,7 @@ class DeviceSerial: 'bytesize': 8, 'stopbits': 1, } + kwargs = kwargs if kwargs else None serial_close = lambda com: com.close() if com.isOpen() else None serial_port_check = lambda port: port.upper() in (com.name for com in list_ports.comports()) @@ -126,9 +129,13 @@ class DeviceSerial: match (self.__com, port, kwargs): case (None, str() as port, None): """ 使用默认参数打开串口 """ + if not serial_port_check(port): + raise ValueError("无效串口端口: %s" % port) self.__com = Serial(port, timeout=0, **com_config) case (None, str() as port, dict() as kwargs): """ 使用指定参数打开串口 """ + if not serial_port_check(port): + raise ValueError("无效串口端口: %s" % port) com_config['baudrate'] = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] com_config['parity'] = kwargs['parity'] if 'parity' in kwargs.keys() else com_config['parity'] com_config['bytesize'] = kwargs['bytesize'] if 'bytesize' in kwargs.keys() else com_config['bytesize'] @@ -136,19 +143,19 @@ class DeviceSerial: self.__com = Serial(port, timeout=0, **com_config) case (Serial() as com, None, None): """ 无参数重开串口 """ - serial_close(self.__com) + serial_close(com) com.open() case (Serial() as com, port, None): """ 重新指定端口号并打开串口 """ - serial_close(self.__com) - if serial_port_check(port): + serial_close(com) + if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) com.port = port com.open() - case (None, str() as port, dict() as kwargs): + case (Serial() as com, str() as port, dict() as kwargs): """ 重新指定端口号与配置并打开串口 """ - serial_close(self.__com) - if serial_port_check(port): + serial_close(com) + if not serial_port_check(port): raise ValueError("无效串口端口: %s" % port) com.port = port com.baudrate = kwargs['baudrate'] if 'baudrate' in kwargs.keys() else com_config['baudrate'] @@ -162,6 +169,26 @@ class DeviceSerial: return self.__com.is_open + @abstractmethod + def frame_read(self, daddr=0x60, dlen=0x50) -> bool: + pass + + @abstractmethod + def frame_write_one(self, daddr=0x85, dval=-900) -> bool: + pass + + @abstractmethod + def frame_write_dual(self, daddr=0x91, dval=600) -> bool: + pass + + @abstractmethod + def frame_write_str(self, daddr=0x82, dval=[0x06, 0x05, 0x04, 0x03, 0x02, 0x01]) -> bool: + pass + + @abstractmethod + def frame_update(self, path_file: Path, makefile: bool = False) -> bool: + pass + diff --git a/source/device/LaminaAdapter.py b/source/device/LaminaAdapter.py index f722aba..7985de2 100644 --- a/source/device/LaminaAdapter.py +++ b/source/device/LaminaAdapter.py @@ -16,6 +16,7 @@ ParamMap_LaminaAdapter = { # 4 - str # 5 - addr # 6 - float + # 8 - func_callback 0x00: ["硬件版本识别电压", 2, 1000], 0x01: ["输出电容电电压", 2, 10], 0x02: ["参考电压", 2, 10], @@ -128,6 +129,13 @@ ParamMap_LaminaAdapter = { 0x170: ["SN", 4, 16], 0x180: ["MES", 4, 16], 0x190: ["Datetime", 4, 16], + 0x1A0: ["事件记录1", 8, 16], + 0x1B0: ["事件记录2", 8, 16], + 0x1C0: ["事件记录3", 8, 16], + 0x1D0: ["事件记录4", 8, 16], + 0x1E0: ["事件记录5", 8, 16], + 0x1F0: ["事件记录6", 8, 16], + 0x200: ["事件记录-Log", 8, 16], } MemoryMap_SLCP001 = { @@ -221,6 +229,11 @@ class LaminaAdapter(DeviceSerial): self.block['data']['data_val'] = dval return self._transfer_data() + def frame_log(self, step='next') -> bool: + self.block['data']['type'] = 'log' + self.block['data']['step'] = step + return self._transfer_data() + def frame_update(self, path_file: Path, makefile: bool = False) -> bool: """ 程序升级 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; @@ -761,4 +774,4 @@ def GenerateImage_DLSY001_p460(path_boot: Path, path_main: Path, path_back: Path offset_image = 0x056000 Image[offset_image: offset_image + len(back_header)] = back_header - return bytearray(Image), main_header, back_header \ No newline at end of file + return bytearray(Image), main_header, back_header diff --git a/source/device/LaminaController.py b/source/device/LaminaController.py index e5f36f7..ce22b71 100644 --- a/source/device/LaminaController.py +++ b/source/device/LaminaController.py @@ -21,6 +21,10 @@ ParamMap_LaminaController = { # 7 - numberList 0x00: ["绝缘检测正电阻", 6], 0x02: ["绝缘检测负电阻", 6], + 0x04: ["负对地绝缘检测电压(动作)" , 2, 100], + 0x05: ["负对地绝缘检测电压(未动作)" , 2, 100], + 0x06: ["正对地绝缘检测电压(动作)" , 2, 100], + 0x07: ["正对地绝缘检测电压(未动作)" , 2, 100], 0x0B: ["事件标志", 1], 0x0C: ["告警字1", 1], 0x0D: ["告警字2", 1], @@ -117,16 +121,17 @@ ParamMap_LaminaController = { MemoryMap_280039 = { 'image_size': 2*0x030000, # 镜像文件大小 - 'app_size': 2*0x004000, # 应用程序大小 - 'boot_size': 2*0x014000, # Boot程序大小 + 'boot_size': 2*0x004000, # Boot程序大小 + 'app_size': 2*0x014000, # 应用程序大小 'boot_addr': 0x000000, # Boot程序地址 'main_header': 0x006000, # main信息地址 'back_header': 0x007000, # back信息地址 - 'main_info': 0x088000, # main版本地址 - 'back_info': 0x007000, # back版本地址 'main_addr': 0x008000, # main程序地址 'back_addr': 0x01C000, # back程序地址 + + 'main_info': 0x088000, # main版本地址 + 'back_info': 0x09C000, # back版本地址 } class LaminaController: @@ -553,44 +558,49 @@ class LaminaController_new(DeviceSerial): self.flag_print = False self.retry = 3 self.time_out = 1.5 - self.block['file_block_size'] = 240 - # 读取config - self.block['type'] = 'record_cfg' - self.block['step'] = 'start' - frame_master = function.protocols.make_frame_modbus(self.block) + try: + status = 'init' # 初始化 + self.block['file_block_size'] = 240 - ret, pbar = True, None - frame_data_cfg = [] - while ret: - if ret := self.__transfer_data(frame_master): - frame_data_cfg.append(self.output['record']) - if self.output['record']['seq'] == 0: - pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") - self.block['step'] = 'next' - frame_master = function.protocols.make_frame_modbus(self.block) - elif (self.output['record']['seq']) >= self.output['record']['total']: - ret = False - pbar and pbar.update() - pbar and pbar.close() + status = 'read_config' # 读取config + self.block['type'] = 'record_cfg' + self.block['step'] = 'start' + ret, pbar = True, None + frame_data_cfg = [] + while ret: + if ret := self._transfer_data(): + frame_data_cfg.append(self.output['record']) + if self.output['record']['seq'] == 0: + pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Config Reading") + self.block['step'] = 'next' + elif (self.output['record']['seq']) >= self.output['record']['total']: + ret = False + pbar and pbar.update() + pbar and pbar.close() - # 读取data - self.block['type'] = 'record_data' - self.block['step'] = 'start' - frame_master = function.protocols.make_frame_modbus(self.block) - - ret, pbar = True, None - frame_data_record = [] - while ret: - if ret := self.__transfer_data(frame_master): - frame_data_record.append(self.output['record']) - if self.output['record']['seq'] == 0: - pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") - self.block['step'] = 'next' - frame_master = function.protocols.make_frame_modbus(self.block) - elif (self.output['record']['seq']) >= self.output['record']['total']: - ret = False - pbar and pbar.update() - pbar and pbar.close() + status = 'read_data' # 读取data + self.block['type'] = 'record_data' + self.block['step'] = 'start' + ret, pbar = True, None + frame_data_record = [] + while ret: + if ret := self._transfer_data(): + frame_data_record.append(self.output['record']) + if self.output['record']['seq'] == 0: + pbar = tqdm(total=self.output['record']['total'] + 1, desc="Record Data Reading") + self.block['step'] = 'next' + elif (self.output['record']['seq']) >= self.output['record']['total']: + ret = False + pbar and pbar.update() + pbar and pbar.close() + except Exception as ex: + """ 通用异常处理 """ + self.flag_print, self.retry, self.time_out = param_saved + report = f'Record Fail: {status}' + report += f', Frame in {self.block["record"]["seq"]}' if status == 'read_config' or status == 'read_data' else '' + report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else '' + print(report) + return False self.flag_print, self.retry, self.time_out = param_saved if (len(frame_data_record) != 32) or (len(frame_data_cfg) != 3): @@ -602,7 +612,7 @@ class LaminaController_new(DeviceSerial): config_record = {'LinePos': []} pos = 0 if data_cfg_bare[pos+1] != 0xF1 or data_cfg_bare[pos] != 0xF1: - Warning("config 配置文件格式异常") + raise Warning("config 配置文件格式异常") pos += 2 config_record['LinePos'].append(pos) len_faultword = (data_cfg_bare[3] * 0x100 + data_cfg_bare[2]) @@ -621,7 +631,7 @@ class LaminaController_new(DeviceSerial): config_record['Standard'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] pos += 2 if data_cfg_bare[pos+1] != 0xF2 or data_cfg_bare[pos] != 0xF2: - Warning("config 配置文件格式异常") + raise Warning("config 配置文件格式异常") pos += 2 config_record['LinePos'].append(pos) config_record['ChannelNum'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] @@ -631,7 +641,7 @@ class LaminaController_new(DeviceSerial): config_record['ChNum_Dgt'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] pos += 2 if data_cfg_bare[pos+1] != 0xF3 or data_cfg_bare[pos] != 0xF3: - Warning("config 配置文件格式异常") + raise Warning("config 配置文件格式异常") pos += 2 config_record['LinePos'].append(pos) config_record['ChannelDescription'] = [] @@ -648,7 +658,7 @@ class LaminaController_new(DeviceSerial): config_record['ChannelDescription'].append(bytearray(ch_desc).decode()) config_record['ChannelCoefficient'].append(ch_coe) if data_cfg_bare[pos+1] != 0xF4 or data_cfg_bare[pos] != 0xF4: - Warning("config 配置文件格式异常") + raise Warning("config 配置文件格式异常") pos += 2 config_record['LinePos'].append(pos) config_record['SysFreq'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] @@ -664,7 +674,7 @@ class LaminaController_new(DeviceSerial): config_record['TimeFactor'] = data_cfg_bare[pos+1] * 0x100 + data_cfg_bare[pos] pos += 2 if data_cfg_bare[pos+1] != 0xF5 or data_cfg_bare[pos] != 0xF5: - Warning("config 配置文件格式异常") + raise Warning("config 配置文件格式异常") pos += 2 config_record['LinePos'].append(pos) time_stamp = { @@ -726,56 +736,57 @@ class LaminaController_new(DeviceSerial): return True - def frame_update(self, path_bin: Path, makefile: bool = False) -> bool: + def frame_update(self, path_file: Path, makefile: bool = False) -> bool: """ 程序升级 注意: 在使用单板升级测试时, 需要关闭低电压检测功能, 否则无法启动升级流程; """ - if makefile: - self.block['file'], file_bin = GeneratePackage_DLSP001_p280039(path_bin) - else: - self.block['file'] = path_bin.read_bytes() - self.block['header_offset'] = 184 - self.block['type'] = 'update' param_saved = self.flag_print, self.retry, self.time_out - self.retry = 5 + self.retry = 3 self.time_out = 0.5 self.flag_print = False + try: + status = 'init' # 初始化 + if not path_file.exists(): + raise Exception("工程编译目标文件不存在.") + if makefile and self.make_package is not None: + self.block['file'] = self.make_package(path_file) + else: + self.block['file'] = path_file.read_bytes() - # 启动帧 - self.block['step'] = 'start' - self.block['index'] = 0 - frame_master = function.protocols.make_frame_modbus(self.block) - if not self.__transfer_data(frame_master): + self.block['type'] = 'update' + self.block['header_offset'] = 184 + + status = 'start' # 启动帧 + self.block['step'] = 'start' + self.block['index'] = 0 + assert self._transfer_data() + + self.block['file_block_size'] = self.output['upgrade']['length'] + # 避免接收到延迟返回报文 + time.sleep(self.time_out) + + status = 'trans' # 文件传输 + self.retry = 3 + self.time_out = 1.5 + self.block['step'] = 'trans' + self.block['index'] = 0 + frame_total = ceil((len(self.block['file']) - self.block['header_offset']) / self.block['file_block_size']) + for idx in tqdm(range(frame_total), desc="File Transmitting"): + self.block['index'] = idx + assert self._transfer_data() + + status = 'end' # 结束升级 + self.time_out = 1 + self.block['step'] = 'end' + self.block['index'] += 1 + assert self._transfer_data() + except Exception as ex: + """ 通用异常处理 """ self.flag_print, self.retry, self.time_out = param_saved - print('Upgrade Fail: start') - return False - self.block["data"]['file_block_size'] = self.output['upgrade']['length'] - - # 避免接收到延迟返回报文 - time.sleep(self.time_out) - - # 文件传输 - self.retry = 3 - self.time_out = 1.5 - self.block["data"]['step'] = 'trans' - self.block['index'] = 0 - frame_total = ceil((len(self.block["data"]['file']) - self.block['header_offset']) / self.block["data"]['file_block_size']) - for idx in tqdm(range(frame_total), desc="File Transmitting"): - self.block["data"]['index'] = idx - frame_master = function.protocols.make_frame_modbus(self.block) - if not self.__transfer_data(frame_master): - self.flag_print, self.retry, self.time_out = param_saved - print(f'Upgrade Fail: trans data in {idx}') - return False - - # 结束升级 - self.time_out = 1 - self.block["data"]['step'] = 'end' - self.block["data"]['index'] += 1 - frame_master = function.protocols.make_frame_modbus(self.block) - if not self.__transfer_data(frame_master): - self.flag_print, self.retry, self.time_out = param_saved - print(f'Upgrade Fail: end') + report = f'Upgrade Fail: {status}' + report += f', Frame in {self.block["index"]}' if status == 'trans' else '' + report += f'\n Error by {ex}' if not isinstance(ex, AssertionError) else '' + print(report) return False self.flag_print, self.retry, self.time_out = param_saved @@ -814,7 +825,7 @@ def GeneratePackage(type_dev: str, path_hex: Path, **kwargs) -> bytearray: print("Package generated successfully.") print(f"File name: {path_hex.name}") print(f"File Info:") - print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main) // 2)}]") # 组装镜像 Image = [0xFF] * (len(main_header) + len(encrypt_main)) @@ -883,9 +894,9 @@ def GenerateImage(type_dev: str, path_boot: Path, path_main: Path, path_back: Pa # Log打印 print("Merge Image generated successfully.") print(f"Main File:") - print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main))}]") + print(f"\t header_length={len(main_header)}, bin_length={len(bin_main)}[{hex(len(bin_main) // 2)}]") print(f"Back File:") - print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back))}]") + print(f"\t header_length={len(back_header)}, bin_length={len(bin_back)}[{hex(len(bin_back) // 2)}]") # 额外文件 if 'output_header' in kwargs.keys() and kwargs['output_header']: diff --git a/source/device/function/protocols.py b/source/device/function/protocols.py index d80dfb5..3611941 100644 --- a/source/device/function/protocols.py +++ b/source/device/function/protocols.py @@ -29,6 +29,7 @@ frame_modbus = { 0x04: [5, 1, 2], 0x06: [8, 0], 0x07: [8, 3, 2, {0x01: 2, 0x02: 0, 0x03: 0}], + 0x08: [5, 3, 2, {0x01: 1, 0x02: 0x22, 0x03: 1}], 0x10: [8, 0], 0x11: [11, 2, 7] } @@ -94,6 +95,19 @@ def make_frame_modbus(block:dict) -> bytearray: frame.append(block['file_block_size'] // 256) frame.append(block['file_block_size'] % 256) + elif block['type'] == 'log': + """ 事件记录系列报文 """ + frame.append(0x08) + if block['step'] == 'start': + frame.append(0x01) + frame.append(block['data_len'] // 0x100 % 0x100) + frame.append(block['data_len'] % 0x100) + elif block['step'] == 'next': + frame.append(0x02) + elif block['step'] == 'end': + frame.append(0x03) + else: + raise Exception("Modbus Log Frame Step Error.") else: """ 数据读取系列报文 """ data_addr = block['data_addr'] @@ -217,6 +231,12 @@ def check_frame_modbus(frame:bytes, block:dict) -> dict: else: raise Exception("Frame read data length error.") output['result'] = True + elif output['code_func'] == 0x08: + if frame[2] == 0x02: + output['Regs'] = display_data(0x200, frame[5:-2], block['data_define']) + else: + raise Exception(" Log SubFunc code error.") + output['result'] = True elif output['code_func'] == 0x06: """ 单个数据写入帧 """ output['result'] = True @@ -374,6 +394,32 @@ def display_data(address: int, data: bytes, modbus_map: dict=modbus_map) -> dict """ 正序数值表示 """ data_len = current_map[2] item = list(map(lambda x: x, data[:2 * data_len])) + elif current_map[1] == 8: + """ 事件记录解析 """ + data_len = current_map[2] + time_stamp, event_id, event_num = struct.unpack(' str: +def trans_list_to_str(data: list, word_len=2, prefix='') -> str: """ 标准串口字符串表示 """ - func_trans = lambda x: ('00' + hex(x % 256)[2:])[-2:].upper() - return " ".join(map(func_trans, data)) + func_trans_word = lambda x: prefix + ('0' * word_len + hex(x % (0x10 ** word_len))[2:])[-word_len:].upper() + return " ".join(map(func_trans_word, data)) def trans_str_to_list(data: str) -> list: @@ -26,4 +26,14 @@ def conv_int_to_array(num: int, big_end=False): def display_hex(data:int, length:int=2) -> str: """ Hex字符固定最小长度表示 """ - return f"0x{data:0{length}X}" \ No newline at end of file + return f"0x{data:0{length}X}" + +def trans_list_to_str_by_word(data: list, big_end=True) -> str: + """ 将数据以word格式显示 """ + s = [data[i+1] * 0x100 + data[i] for i in range(0, len(data), 2)] + r = " ".join(map(hex, s)) + print(data) + print(trans_list_to_str(data)) + print(r) + + return r diff --git a/source/post_work.py b/source/post_work.py index 8251150..60e04ec 100644 --- a/source/post_work.py +++ b/source/post_work.py @@ -199,62 +199,91 @@ def Process2(): file_image3.write_bytes(data_image) -def Process(type_dev: str, path_boot: Path, path_project: Path): +def Process(type_dev: str, path_boot: Path, path_project: Path, makePackages: bool =True, makeImage: bool = True): """ 镜像生成流程 """ root_boot = path_boot root_main = path_project result = root_main + # 程序文件名适配 + if type_dev[:4] == 'DLSY': + program_name = 'lamina_optimizer' + else: + program_name = 'lamina_adapter' + # 正常启动镜像 hex_boot = root_boot / "bootloader.hex" - hex_main = root_main / "lamina_adapter.hex" - hex_back = root_main / "lamina_adapter_back.hex" + hex_main = root_main / f"{program_name}.hex" + hex_back = root_main / f"{program_name}_back.hex" hex_update = hex_main - if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()): - raise Exception("缺失必要程序文件") file_image = result / f'{hex_main.stem}_ROM.bin' file_package = result / f'{hex_update.stem}.dat' dev_lamina = LaminaAdapter(None, type_dev=type_dev) - data_package = dev_lamina.make_package(hex_update) - data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True) + if makePackages: + if not hex_main.exists(): + raise Exception("缺失必要程序文件") + data_package = dev_lamina.make_package(hex_update) + file_package.write_bytes(data_package) + if makeImage: + if (not hex_boot.exists()) or (not hex_main.exists()) or (not hex_back.exists()): + raise Exception("缺失必要程序文件") + data_image = dev_lamina.make_image(hex_boot, hex_main, hex_back, output_header=True, output_bin=True) + file_image.write_bytes(data_image) - file_image.write_bytes(data_image) - file_package.write_bytes(data_package) + # 异常镜像-主分区md5错误 + file_image1 = result / f'{file_image.stem}_b1.bin' + data_image_copy = data_image + if type_dev == 'SLCP001': + data_image_copy[0x0FC018: 0x0FC01A] = [0x00, 0x01] + else: + data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] + file_image1.write_bytes(data_image_copy) - # 异常镜像-主分区md5错误 - file_image1 = result / f'{file_image.stem}_b1.bin' - data_image_copy = data_image - data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] - file_image1.write_bytes(data_image_copy) + # 异常镜像-备份分区md5错误 + file_image2 = result / f'{file_image.stem}_b2.bin' + data_image_copy = data_image + if type_dev == 'SLCP001': + data_image_copy[0x0FE018: 0x0FE01A] = [0x00, 0x01] + else: + data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] + file_image2.write_bytes(data_image_copy) - # 异常镜像-备份分区md5错误 - file_image2 = result / f'{file_image.stem}_b2.bin' - data_image_copy = data_image - data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] - file_image2.write_bytes(data_image_copy) - - # 异常镜像-双分区md5错误 - file_image3 = result / f'{file_image.stem}_b3.bin' - data_image_copy = data_image - data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] - data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] - file_image3.write_bytes(data_image_copy) + # 异常镜像-双分区md5错误 + file_image3 = result / f'{file_image.stem}_b3.bin' + data_image_copy = data_image + if type_dev == 'SLCP001': + data_image_copy[0x0FE018: 0x0FE01A] = [0x00, 0x01] + data_image_copy[0x0FC018: 0x0FC01A] = [0x00, 0x01] + else: + data_image_copy[0x054018: 0x05401A] = [0x00, 0x01] + data_image_copy[0x056018: 0x05601A] = [0x00, 0x01] + file_image3.write_bytes(data_image_copy) if __name__ == "__main__": - path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") - path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") - path_main = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug") + path_boot1 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\4A0-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") + path_boot2 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") + path_boot3 = Path(r"D:\WorkingProject\LightStackAdapter\software\umon\460-PROJ_STACKLIGHT_OPTIMIZER") + path_main1 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\lamina_adapter\Debug") + path_main2 = Path(r"D:\WorkingProject\LightStackOptimizer\software\lamina_optimizer\lamina_optimizer\Debug") + path_main3 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V1\lamina_adapter\Debug") + path_main4 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V3.01\lamina_adapter\Debug") + path_main5 = Path(r"D:\WorkingProject\LightStackAdapter\software\lamina_adapter\release\device_V2.03\lamina_adapter\Debug") + + mode_config = { + 'SLCP001': ('SLCP001', path_boot1, path_main3), + 'SLCP101': ('SLCP101', path_boot2, path_main5), + 'SLCP102': ('SLCP102', path_boot2, path_main4), + 'DLSY001': ('DLSY001', path_boot3, path_main2), + 'SLCP102_error': ('SLCP101', path_boot2, path_main4), + } # Process0(path_boot1, path_main) # 适配器SLCP001 - Process1(path_boot2, path_main) # 适配器SLCP101 + # Process1(path_boot2, path_main) # 适配器SLCP101 # Process1_v2(path_boot2, path_main) # 适配器SLCP102 # Process2() - path_boot1 = Path(r"resource/460-PROJ_STACKLIGHT_OPTIMIZER") - path_boot2 = Path(r"resource/460-PROJ_STACKLIGHT_PARALLEL_ADAPTOR") - path_project = Path(r"resource\SLCP101") - Process('SLCP101', path_boot2, path_project) \ No newline at end of file + Process(*mode_config['SLCP101']) diff --git a/source/test_parameter.py b/source/test_parameter.py new file mode 100644 index 0000000..1a25f59 --- /dev/null +++ b/source/test_parameter.py @@ -0,0 +1,27 @@ +import time +from device.DeviceSerial import DeviceSerial +from device.LaminaAdapter import ParamMap_LaminaAdapter + + +def test_communication(device: DeviceSerial, time_out=2): + """ 通信成功率测试 """ + time_start = time.time() + param_saved = device.flag_print, device.retry, device.time_out + device.flag_print = False + device.retry = 1 + try: + while True: + device.frame_read(0x0C, 0x20) + print(f"Time Stamp: {time.ctime()}") + print(f"Success Frame: {device.log['read']}") + print(f"Failed Frame: {device.log['send'] - device.log['read']}") + print(f"Max Series Failed Frame: {device.log['keep-fail']}") + time.sleep(time_out) + finally: + time_end = time.time() + print("Test Result: ") + print(f"Time Start: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_start))}, \tTime End: {time.strftime(r'%Y-%m-%d %H:%M:%S', time.localtime(time_end))}") + print(f"Time Elapsed: {time_end - time_start}") + print(f"Success Rate: {device.log['read'] / device.log['send'] * 100}%") + device.flag_print, device.retry, device.time_out = param_saved +