主站分析脚本;

This commit is contained in:
何 泽隆
2025-01-21 10:07:18 +08:00
parent f3847f4f34
commit 34c9c67a00

View File

@@ -1,10 +1,10 @@
import time
import logging
import datetime
import requests
import numpy as np
import pandas as pd
from pathlib import Path
from bs4 import BeautifulSoup
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
@@ -34,30 +34,29 @@ API_HEADER = {
}
SemaMap_adapter = {
'facturer': ('0305113001', 'adapter', False, "厂家"),
'version': ('0305114001', 'adapter', False, "软件版本"),
'model': ('0305115001', 'adapter', False, "型号"),
'status': ('0305116001', 'adapter', False, "开关机状态"),
'temp': ('0305117001', 'adapter', True, "温度"),
'volt_in': ('0305118001', 'adapter', True, "输入电压"),
'curr_in': ('0305119001', 'adapter', True, "输入电流"),
'volt_out': ('0305120001', 'adapter', True, "输出电压"),
'curr_out': ('0305121001', 'adapter', True, "输出电流"),
'power_out': ('0305122001', 'adapter', True, "输出功率"),
'apt_facturer': ('0305113001', 'adapter', False, "厂家"),
'apt_version': ('0305114001', 'adapter', False, "软件版本"),
'apt_model': ('0305115001', 'adapter', False, "型号"),
'apt_status': ('0305116001', 'adapter', False, "开关机状态"),
'apt_temp': ('0305117001', 'adapter', True, "温度"),
'apt_volt_in': ('0305118001', 'adapter', True, "输入电压"),
'apt_curr_in': ('0305119001', 'adapter', True, "输入电流"),
'apt_volt_out': ('0305120001', 'adapter', True, "输出电压"),
'apt_curr_out': ('0305121001', 'adapter', True, "输出电流"),
'apt_power_out': ('0305122001', 'adapter', True, "输出功率"),
}
semamap_combiner = {
'IMSI': ('0305102001', 'combiner', False, "IMSI"),
'ICCID': ('0305103001', 'combiner', False, "SIM卡ICCID"),
'MSISDN': ('0305104001', 'combiner', False, "MSISDN"),
'dev_type': ('0305101001', 'combiner', False, "系统类型"),
'facturer': ('0305107001', 'combiner', False, "汇流箱厂家"),
'model': ('0305108001', 'combiner', False, "汇流箱型号"),
'ver_software': ('0305105001', 'combiner', False, "件版本"),
'ver_hardware': ('0305106001', 'combiner', False, "硬件版本"),
'power_total': ('0305109001', 'combiner', True, "系统总功率"),
'energy_total': ('0305110001', 'combiner', True, "系统累计发电量"),
'energy_daily': ('0305111001', 'combiner', True, "系统日发电量"),
'cbr_IMSI': ('0305102001', 'combiner', False, "IMSI"),
'cbr_ICCID': ('0305103001', 'combiner', False, "SIM卡ICCID"),
'cbr_MSISDN': ('0305104001', 'combiner', False, "MSISDN"),
'cbr_dev_type': ('0305101001', 'combiner', False, "系统类型"),
'cbr_facturer': ('0305107001', 'combiner', False, "汇流箱厂家"),
'cbr_model': ('0305108001', 'combiner', False, "汇流箱型号"),
'cbr_ver_software': ('0305105001', 'combiner', False, "软件版本"),
'cbr_ver_hardware': ('0305106001', 'combiner', False, "件版本"),
'cbr_power_total': ('0305109001', 'combiner', True, "系统总功率"),
'cbr_energy_total': ('0305110001', 'combiner', True, "系统累计发电量"),
'cbr_energy_daily': ('0305111001', 'combiner', True, "系统发电量"),
}
SemaMap_meter = {
'mtr_id': ('0305123001', 'meter', False, "电表号"),
@@ -76,6 +75,44 @@ SemaMap_meter = {
'mtr_energy_daily_V': ('0305134001', 'meter', True, "谷时段日正向有功电能"),
}
Sema_Map = {
'cbr_IMSI': ('0305102001', 'TTE0101', False, "IMSI"),
'cbr_ICCID': ('0305103001', 'TTE0101', False, "SIM卡ICCID"),
'cbr_MSISDN': ('0305104001', 'TTE0101', False, "MSISDN"),
'cbr_dev_type': ('0305101001', 'TTE0101', False, "系统类型"),
'cbr_facturer': ('0305107001', 'TTE0101', False, "汇流箱厂家"),
'cbr_model': ('0305108001', 'TTE0101', False, "汇流箱型号"),
'cbr_ver_software': ('0305105001', 'TTE0101', False, "软件版本"),
'cbr_ver_hardware': ('0305106001', 'TTE0101', False, "硬件版本"),
'cbr_power_total': ('0305109001', 'TTE0101', True, "系统总功率"),
'cbr_energy_total': ('0305110001', 'TTE0101', True, "系统累计发电量"),
'cbr_energy_daily': ('0305111001', 'TTE0101', True, "系统日发电量"),
'apt_facturer': ('0305113001', 'TTE0102', False, "厂家"),
'apt_version': ('0305114001', 'TTE0102', False, "软件版本"),
'apt_model': ('0305115001', 'TTE0102', False, "型号"),
'apt_status': ('0305116001', 'TTE0102', False, "开关机状态"),
'apt_volt_in': ('0305118001', 'TTE0102', True, "输入电压"),
'apt_curr_in': ('0305119001', 'TTE0102', True, "输入电流"),
'apt_volt_out': ('0305120001', 'TTE0102', True, "输出电压"),
'apt_curr_out': ('0305121001', 'TTE0102', True, "输出电流"),
'apt_power_out': ('0305122001', 'TTE0102', True, "输出功率"),
'apt_temp': ('0305117001', 'TTE0102', True, "温度"),
'mtr_id': ('0305123001', 'TTE0103', False, "电表号"),
'mtr_volt': ('0305124001', 'TTE0103', True, "直流电压"),
'mtr_curr': ('0436101001', 'TTE0103', True, "直流总电流"),
'mtr_energy_total': ('0305125001', 'TTE0103', True, "总有功电能"),
'mtr_energy_daily': ('0305126001', 'TTE0103', True, "日有功电能"),
'mtr_power': ('0436102001', 'TTE0103', True, "总有功功率"),
'mtr_energy_total_T': ('0305127001', 'TTE0103', True, "尖时段总正向有功电能"),
'mtr_energy_total_P': ('0305128001', 'TTE0103', True, "峰时段总正向有功电能"),
'mtr_energy_total_F': ('0305129001', 'TTE0103', True, "平时段总正向有功电能"),
'mtr_energy_total_V': ('0305130001', 'TTE0103', True, "谷时段总正向有功电能"),
'mtr_energy_daily_T': ('0305131001', 'TTE0103', True, "尖时段日正向有功电能"),
'mtr_energy_daily_P': ('0305132001', 'TTE0103', True, "峰时段日正向有功电能"),
'mtr_energy_daily_F': ('0305133001', 'TTE0103', True, "平时段日正向有功电能"),
'mtr_energy_daily_V': ('0305134001', 'TTE0103', True, "谷时段日正向有功电能"),
}
API_Map = {
'refreshToken': ['https://energy-iot.chinatowercom.cn/api/auth/refreshToken', None],
'search_stn': ['https://energy-iot.chinatowercom.cn/api/device/station/list', None],
@@ -134,7 +171,7 @@ class Lamina_Data(object):
filtered_data_memory = merged_df[merged_df['_merge'] == 'left_only'].drop(columns='_merge')
filtered_data_memory.to_sql('history', self.engine, if_exists='append', index=False)
print(f"成功插入 {len(filtered_data_memory)} 条数据")
logging.critical(f"成功插入 {len(filtered_data_memory)} 条数据")
return len(filtered_data_memory)
@@ -149,14 +186,6 @@ class Lamina_Data(object):
return result
return wrapper
def get_histoty_data_by_database(self, device_id, data_type, time_start:int, time_end:int):
""" 读取历史数据 """
database = self.data['history']
filter_data = database[database['dev'] == device_id &
database['mid'] == SemaMap_adapter[data_type][0] &
database['time'].between(time_start, time_end)]
return filter_data
@save_data
def get_history_data_by_net(self, device_id, data_type, time_start:int, time_end:int, header=None):
""" 读取信号量历史数据, 返回接口json数据 """
@@ -241,32 +270,18 @@ class Lamina_Data(object):
print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}")
raise ValueError(f"{json_data['message']}")
def spider_adapter(self, device_id:str, time_start:int, time_end:int):
""" 爬取适配器数据 """
def spider_device(self, device_id:str, time_start:int, time_end:int):
""" 爬取设备数据 """
result = {}
columns_adapter = list(filter(lambda x: SemaMap_adapter[x][2], SemaMap_adapter.keys()))
data_adapter = pd.DataFrame([], columns=['time', 'device', *columns_adapter])
for k in columns_adapter:
result[k] = self.get_history_data_by_net(device_id, SemaMap_adapter[k], time_start, time_end)
if data_adapter.empty:
data_adapter.time = result[k].time
data_adapter.device = device_id
data_adapter[k] = result[k].value.apply(float)
return data_adapter
def spider_meter(self, device_id:str, time_start:int, time_end:int):
""" 爬取电表数据 """
result = {}
columns_meter = list(map(lambda x: x[4:], filter(lambda x: SemaMap_meter[x][2], SemaMap_meter.keys())))
data_meter = pd.DataFrame([], columns=['time', 'device', *columns_meter])
for k, v in SemaMap_meter.items():
if v[2]:
result[k] = self.get_history_data_by_net(device_id, v, time_start, time_end)
if data_meter.empty:
data_meter.time = result[k].time
data_meter.device = device_id
data_meter[k[4:]] = result[k].value.apply(float)
return data_meter
key_list = list(filter(lambda x: Sema_Map[x][1] == device_id[:7] and Sema_Map[x][2], Sema_Map.keys()))
data_device = pd.DataFrame([], columns=['time', 'device', *map(lambda s: s.split('_', 1)[1], key_list)])
for key in key_list:
result[key] = self.get_history_data_by_net(device_id, Sema_Map[key], time_start, time_end)
if data_device.empty:
data_device.time = result[key].time
data_device.device = device_id
data_device[key[4:]] = result[key].value.apply(float)
return data_device
def spider_search_devices(self, device_id:str, header=None):
if header is None:
@@ -360,6 +375,7 @@ class Lamina_Data(object):
dev_meter = []
dev_adapter = []
dev_combiner = []
dev_info = []
try:
for dev in sorted(json_data['rows'], key=lambda x: x['devCode']):
@@ -371,10 +387,11 @@ class Lamina_Data(object):
match dev['devType']:
case "0101":
fsu_id = dev['devCode']
dev_combiner.append(self.spider_device(dev['devCode'], time_start, time_end))
case "0102":
dev_adapter.append(self.spider_adapter(dev['devCode'], time_start, time_end))
dev_adapter.append(self.spider_device(dev['devCode'], time_start, time_end))
case "0103":
dev_meter.append(self.spider_meter(dev['devCode'], time_start, time_end))
dev_meter.append(self.spider_device(dev['devCode'], time_start, time_end))
self.save_history_data()
except Exception as e:
print(f"Get data fail, msg=\n\t{e}")
@@ -383,81 +400,13 @@ class Lamina_Data(object):
'result': True,
'station': station_id,
'information': pd.concat(dev_info, ignore_index=True),
'combiner': pd.concat(dev_combiner, ignore_index=True),
'adapter': pd.concat(dev_adapter, ignore_index=True),
'meter': pd.concat(dev_meter, ignore_index=True),
}
print(f"Station Done.")
return result
def spider(self, device_id:str, time_start:int, time_end:int):
""" 通用爬虫 """
if device_id[:8] == "TTE0102DX":
""" 适配器数据 """
self.spider_adapter(device_id, time_start, time_end)
elif device_id[:8] == "TTE0103DX":
""" 电表数据 """
self.spider_meter(device_id, time_start, time_end)
def graphs_adapter(self, device_id, time_start:int|str, time_end:int|str):
""" 绘制图表-适配器数据 """
if type(time_start) is str:
time_start = time.mktime(time.strptime(time_start, r"%Y-%m-%d %H:%M:%S"))
if type(time_end) is str:
time_end = time.mktime(time.strptime(time_end, r"%Y-%m-%d %H:%M:%S"))
data = self.spider_adapter(device_id, time_start, time_end)
self.chart_adapter(data)
return data
def chart_adapter(self, data_adapter):
""" 绘制适配器信息图表 """
fig, ax1 = plt.subplots()
ax1.plot(data_adapter['time'], data_adapter['volt_in'], color='green', label='Input Voltage')
ax1.plot(data_adapter['time'], data_adapter['volt_out'], color='red', label='Output Voltage')
ax2 = ax1.twinx()
ax2.plot(data_adapter['time'], data_adapter['power_out'], color='gray', label='Output Power')
# # 绘制斜线阴影
# for i in range(len(table_apt) - 1):
# ax1.fill_between(
# [table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]],
# [table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]],
# color='red', alpha=0.5)
lines, labels = ax1.get_legend_handles_labels()
shadows, shadow_labels = ax2.get_legend_handles_labels()
ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left')
ax1.set_title('Device Data Visualization')
ax1.set_xlabel('Time')
ax1.set_ylabel('Voltage (V)')
ax2.set_ylabel('Power (W)')
plt.ioff()
plt.show()
plt.savefig('output.png')
# plt.close()
plt.ion()
def sim_data_apt(times:tuple[int, int]):
""" 模拟数据 """
t_start = time.mktime(time.strptime(times[0], r"%Y-%m-%d %H:%M:%S"))
t_end = time.mktime(time.strptime(times[1], r"%Y-%m-%d %H:%M:%S"))
count_data = (t_end - t_start) / (10 * 60)
time_list = range(int(t_start), int(t_end), 20 * 60)
time_list = tuple(map(lambda x: time.strftime(r"%Y-%m-%d %H:%M:%S", time.localtime(x)), time_list))
data = {
'time': time_list,
'volt_in': 10 + 10 * np.random.random(len(time_list)),
'curr_in': 1 + 2 * np.random.random(len(time_list)),
'volt_out': 54 + 2 * np.random.random(len(time_list)),
}
data['power_out'] = tuple(map(lambda x: x[0] * x[1], zip(data['volt_in'],data['curr_in'])))
return pd.DataFrame(data)
def save_station_by_file1(data_lamina: Lamina_Data):
""" 依据文件爬取所需站点数据 """
time_start = datetime.datetime(2024, 12, 24, 0, 0, 0)
@@ -523,7 +472,7 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
dataset = []
df_input = df_input.set_index('点位名称')
for name in remain_station['点位名称']:
print(f"Station: {name}")
logging.info(f"Station: {name}")
time_start_timestamp = df_input['开始时间'][name].tz_localize('Asia/Shanghai').timestamp()
time_end_timestamp = df_input['结束时间'][name].tz_localize('Asia/Shanghai').timestamp()
data = data_lamina.spider_station(name, time_start_timestamp, time_end_timestamp)
@@ -535,7 +484,7 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
""" Token 失效 """
data_lamina.api_origin['header']['authorization'] = data['token']
print(f"Done.")
logging.info(f"All Station Done.")
# 使用 ExcelWriter 将多个 DataFrame 保存到不同的工作表中
df_station = pd.DataFrame([], columns=['station', '点位名称'])
@@ -555,7 +504,7 @@ def save_station_by_file2(data_lamina: Lamina_Data, file_path):
df_adapter.to_excel(writer, sheet_name='Adatper', index=False, columns=column_adapter)
df_meter.to_excel(writer, sheet_name='Meter', index=False, columns=column_meter)
print(f"数据已成功保存到 {file_output}")
logging.info(f"数据已成功保存到 {file_output}")
return result
def analysis_info(df_station: pd.DataFrame):
@@ -566,6 +515,7 @@ def analysis_info(df_station: pd.DataFrame):
for k, v in SemaMap_meter.items():
map_mid[v[0]] = v[3]
map_dev = {
'TTE0101': 'Combiner',
'TTE0102': 'Adapter',
'TTE0103': 'Meter',
}
@@ -587,28 +537,26 @@ def analysis_info(df_station: pd.DataFrame):
def analysis_info1(data_station: dict):
""" 站点spider返回数据分析 """
define_dev = {
'TTE0101': ('combiner', (3,1)),
'TTE0102': ('adapter', (3,2)),
'TTE0103': ('meter', (3,2)),
}
# 创建双色颜色过渡
color_map = mcolors.LinearSegmentedColormap.from_list("mycmap", ["blue", "red"])
alpha = 0.5
for dev_id in data_station['information']['dev'].unique():
data_dev = data_station['information'].loc[data_station['information']['dev'] == dev_id]
print(f"Device: {dev_id}")
match dev_id[:7]:
case "TTE0101": # 汇流箱
pass
case "TTE0102": # 适配器
pass
history_dev = data_station['adapter'].assign(
history_dev = data_station[define_dev[dev_id[:7]][0]].loc[
lambda df: df['device'] == dev_id
].assign(
date = lambda df: df['time'].apply(lambda x: x.date()),
id_group = lambda df: df['date'].apply(lambda x: x.toordinal()).diff().fillna(0).cumsum(),
)
case "TTE0103": # 电表
pass
history_dev = data_station['meter'].assign(
date = lambda df: df['time'].apply(lambda x: x.date()),
id_group = lambda df: df['date'].diff().ne(0).cumsum(),
)
# 按日期分组并绘制折线图
fig, axs = plt.subplots(3, 1)
logging.debug(f"Device: {dev_id}")
logging.debug(history_dev.head())
fig, axs = plt.subplots(*define_dev[dev_id[:7]][1])
axs = axs.flatten()
for date, group in history_dev.groupby('date'):
# 计算当天的起始时间
@@ -617,23 +565,16 @@ def analysis_info1(data_station: dict):
adjusted_time = group['time'] - start_time
# 计算颜色和不透明度
color = color_map(group['id_group'] / history_dev['id_group'][-1])
alpha = 0.5
color = color_map(group['id_group'].min() / history_dev['id_group'].max())
for index, key in enumerate(history_dev.columns[2:-2]):
if index >= len(axs):
break
group.set_index(adjusted_time)[key].plot(ax=axs[index], label=str(date), color=color, alpha=alpha)
axs[index-2].set_title(f"{key.replace('_', ' ').title()}")
group.set_index(adjusted_time)['volt'].plot(ax=axs[0], label=str(date), color=color, alpha=alpha)
group.set_index(adjusted_time)['curr'].plot(ax=axs[1], label=str(date), color=color, alpha=alpha)
group.set_index(adjusted_time)['power'].plot(ax=axs[2], label=str(date), color=color, alpha=alpha)
# 添加图例
axs[0].legend(title='Date')
# 添加标题和标签
axs[0].set_title('Value over Time by Date')
axs[0].set_xlabel('Timestamp')
axs[0].set_ylabel('Value')
fig.suptitle(f"{data_station['station']}_{define_dev[dev_id[:7]][0].title()} Device: {dev_id}", fontsize=16)
plt.show()
plt.savefig(Path(f"result\Analysis\{dev_id}.png"))
print(data_dev.head())
plt.savefig(Path(f"result\Analysis\{data_station['station']}_{dev_id}.png"))
if __name__=='__main__':
""" 主体调用流程 """
@@ -644,7 +585,7 @@ if __name__=='__main__':
if not hasattr(__builtins__,"__IPYTHON__") and 0:
import pickle
path_data1 = Path(r"result\Analysis\station_data1.pkl")
path_data1 = Path(r"result\Analysis\station_data2.pkl")
with open(path_data1, 'rb') as f:
loaded_data = pickle.load(f)
analysis_info1(loaded_data)
@@ -658,7 +599,7 @@ if __name__=='__main__':
file_db.touch()
API_HEADER['Cookie'] = "HWWAFSESTIME=1737167522632; HWWAFSESID=6cb0288b7bc75e5a66; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0"
API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM3MzQxNDg4LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiIwOGFlZDdjYy1hZGE2LTQ4ZWQtYmQyZS0xYjY3NGRkZmVmMWMiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.CnfJh2ie0D0dOG1yELiQPuXCwez_nzeYD8rXTL0ILSeq31kmTnhOJJTA6aI8JTEtDVgFyqC084uDR1KvDgwKL5aXXzKwCNqBxziJQbA2AuBRdDgdWXM0r_3qrBGL-0MuYB2jygJaNwue2GIh_3PYsMQGRqHBeyJ9JUgdiWYUVpmbYInSyOlY2l_QtzQTFlz8L7eUC0sDeAWSPNamvYczLas0MtuQquH6JM_-WaFfc-6TblmFp6qSxZHJT-0dy7LLTw5zpXbh3QnbjgBARCaOvzLaDtfArgU20Hq3AqAIwvTVOQFeI4jChFIRvyXwnnUDX-IrFru_sOYLX1jcc88cPA'
API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM3NDI3OTAwLCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiIwYzliOTk2ZC01OTU3LTQ5MjMtOTAzNC03YzlkMDQ4YWU3MzQiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.JcPqvOoVv06gi7l_ownVl1ubwDn1dYgkqB082gjrQlHqveXpyqeiF6MUUjlhcUFgNArttog9ZnI82jqmiBfSOkc-gdjvM-AHUvXc3DRN4dvlY9eXdeDeMTLFQh5rfmlYHEd9fgI7eRBvLAiUbDpiNuxyU2N2VV72vxGdvp5f1GeUPEmLj6lwBch5L2sWSYi2p9PwCBYX0sm5EwnL--nui1Iv2PHNos02y4h_m2C-x96L3chXV-h_vKoRWrztiEX6O40zaNwzlIcm_rSmX6GEOF4darGB9hU7aFzKBfM4wTcj-ZKae7dx3ttkuc1HD_eFL8xpDr0pvWycFzrgSlLtkw'
data_lamina = Lamina_Data('sqlite:///' + path_db)
# 依据站点内设备爬取整个站点的实时与历史数据