import time import datetime import requests import numpy as np import pandas as pd from pathlib import Path from bs4 import BeautifulSoup import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates from sqlalchemy import create_engine from sqlalchemy import MetaData, Table, Column, String, Float, Integer API_URL = "https://energy-iot.chinatowercom.cn/api/device/device/historyPerformance" API_HEADER = { "accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br, zstd", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Connection": "keep-alive", "Content-Length": "170", "content-type": "application/json;charset=UTF-8", "Cookie": "HWWAFSESID=455f2793ca86a3aaf0; HWWAFSESTIME=1734509454212; dc04ed2361044be8a9355f6efb378cf2=WyIyODM4MDM2MDcxIl0", "authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM0NjYzNDQ5LCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiJhZmZhNmY1My05ZDA4LTQ2ODUtODU3MS05YzA5ODAxMGJjZWYiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.q0X4qrgL4wRUTZL8c_5oTIUGW0Lsivxw8pYQ1iMIqLnyJrUeS7IQKNRavMhc4NEdQ9uG6ZgFVHIj80HbzH8DHCxssCPLdv9_TXksI5podU2aU6Vjh6AaN1THFYAE2uflj1saBnQ5_gKiK0-cAcXDeJNSt_u6Cd9hI1ejEUPPzO_hLg-NLzch7yIB-HPvhoDNnl0n5pSYoQpT8XaKT14HezL3VQrLII69Vme38S2dMmmkiAeIyhHQi56kXZ11K45Lu5bHXv6fDg2Mfr9VgVuTleZldiO69BAmG0h1-HqTQuGE39jtGWrrCnFduRZR6VsaOWWJy3qyqUbXWMOli1Yy1g", "Host": "energy-iot.chinatowercom.cn", "Origin": "https://energy-iot.chinatowercom.cn", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "Windows", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", } SemaMap_adapter = { 'facturer': ('0305113001', 'adapter', False, "厂家"), 'version': ('0305114001', 'adapter', False, "软件版本"), 'model': ('0305115001', 'adapter', False, "型号"), 'status': ('0305116001', 'adapter', False, "开关机状态"), 'temp': ('0305117001', 'adapter', True, "温度"), 'volt_in': ('0305118001', 'adapter', True, "输入电压"), 'curr_in': ('0305119001', 'adapter', True, "输入电流"), 'volt_out': ('0305120001', 'adapter', True, "输出电压"), 'curr_out': ('0305121001', 'adapter', True, "输出电流"), 'power_out': ('0305122001', 'adapter', True, "输出功率"), } SemaMap_meter = { 'mtr_id': ('0305123001', 'meter', False, "电表号"), 'mtr_volt': ('0305124001', 'meter', True, "直流电压"), 'mtr_curr': ('0436101001', 'meter', True, "直流总电流"), 'mtr_power': ('0436102001', 'meter', True, "总有功功率"), 'mtr_energy_total': ('0305125001', 'meter', True, "总有功电能"), 'mtr_energy_daily': ('0305126001', 'meter', True, "日有功电能"), 'mtr_energy_total_T': ('0305127001', 'meter', True, "尖时段总正向有功电能"), 'mtr_energy_total_P': ('0305128001', 'meter', True, "峰时段总正向有功电能"), 'mtr_energy_total_F': ('0305129001', 'meter', True, "平时段总正向有功电能"), 'mtr_energy_total_V': ('0305130001', 'meter', True, "谷时段总正向有功电能"), 'mtr_energy_daily_T': ('0305131001', 'meter', True, "尖时段日正向有功电能"), 'mtr_energy_daily_P': ('0305132001', 'meter', True, "峰时段日正向有功电能"), 'mtr_energy_daily_F': ('0305133001', 'meter', True, "平时段日正向有功电能"), 'mtr_energy_daily_V': ('0305134001', 'meter', True, "谷时段日正向有功电能"), } API_Map = { 'search_dev': ['https://energy-iot.chinatowercom.cn/api/device/device/page', None], 'dev_info': ['https://energy-iot.chinatowercom.cn/api/device/device/devInfo', None], 'perf_real': ['https://energy-iot.chinatowercom.cn/api/device/device/perfReal', None], 'history': ['https://energy-iot.chinatowercom.cn/api/device/device/historyPerformance', [SemaMap_adapter, SemaMap_meter]], 'page': ['https://energy-iot.chinatowercom.cn/api/device/device/page', None], 'station': ['https://energy-iot.chinatowercom.cn/api/device/station/detail/', None], } class Lamina_Data(object): """ 叠光主站数据分析 """ def __init__(self, database="sqlite:///:memory", header=None): """ 初始化 """ self.engine = create_engine(database) metadata = MetaData() metadata.reflect(bind=self.engine) if 'history' not in metadata.tables: history_table = Table( 'history', metadata, Column('dev', String(50)), Column('mid', String(50)), Column('time', Integer), Column('value', Float) ) metadata.create_all(self.engine) if 'log' not in metadata.tables: log_table = Table( 'log', metadata, Column('dev', String(50)), Column('mid', String(50)), Column('Timestamp_start', Integer), Column('Timestamp_end', Integer), ) metadata.create_all(self.engine) self.data = { 'history': pd.read_sql_table('history', self.engine), } self.api_origin = { 'domain': 'https://energy-iot.chinatowercom.cn/api', 'header': API_HEADER, } def save_history_data(self): """ 保存历史数据 """ data_memory = self.data['history'] data_file = pd.read_sql_table('history', self.engine) merged_df = pd.merge(data_memory, data_file[['dev', 'mid', 'time']], on=['dev', 'mid', 'time'], how='left', indicator=True) filtered_data_memory = merged_df[merged_df['_merge'] == 'left_only'].drop(columns='_merge') filtered_data_memory.to_sql('history', self.engine, if_exists='append', index=False) print(f"成功插入 {len(filtered_data_memory)} 条数据") return len(filtered_data_memory) def save_data(func): """ 保存函数返回数据 """ def wrapper(*args, **kwds): self: Lamina_Data = args[0] result = func(*args, **kwds) if isinstance(result, pd.DataFrame): self.data['history'] = pd.concat([self.data['history'], result], ignore_index=True) self.save_history_data() return result return wrapper def get_histoty_data_by_database(self, device_id, data_type, time_start:int, time_end:int): """ 读取历史数据 """ database = self.data['history'] filter_data = database[database['dev'] == device_id & database['mid'] == SemaMap_adapter[data_type][0] & database['time'].between(time_start, time_end)] return filter_data @save_data def get_history_data_by_net(self, device_id, data_type, time_start:int, time_end:int, header=None): """ 读取信号量历史数据, 返回接口json数据 """ if header is None: header = self.api_origin['header'] body = { "businessType": "7", "startTimestamp": int(time_start * 1000), "endTimestamp": int(time_end * 1000), "deviceCode": f"{device_id}", "mid": f"{data_type[0]}", "pageNum": 1, "pageSize": 10, "total": 0 } req = requests.post(API_URL, json=body, headers=header) json_data = req.json() if json_data['code'] == 200: """ 数据读取成功 """ print(f"Get data success, mid={data_type[0]}, len={len(json_data['data'])}") table_data = pd.DataFrame(json_data['data'], columns=['collectTime', 'mid', 'midName', 'value']) table_data['time'] = table_data['collectTime'].apply(lambda x: int(time.mktime(time.strptime(x, r"%Y-%m-%d %H:%M:%S")))) table_data['dev'] = device_id return table_data[['dev', 'mid', 'time', 'value']] else: print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}") return pd.DataFrame([], columns=['dev', 'mid', 'time', 'value']) def get_real_data_by_net(self, device_id, fsu_id=None, header=None): """ 读取设备当前数据, 返回接口json数据 """ if header is None: header = self.api_origin['header'] body = { "businessType": "7", "devType": device_id[3:7], "deviceCodes": device_id, "type": "遥测" } if device_id[3:7] != "0101": if fsu_id is None: raise ValueError(f"Missing required parameters: fsu_id={fsu_id}") body["fsuCode"] = fsu_id req = requests.post(API_Map['perf_real'][0], json=body, headers=header) json_data = req.json() if json_data['code'] == 200: """ 数据读取成功 """ print(f"Get data success, len={len(json_data['data'])}") table_data = pd.DataFrame(json_data['data']) column_name = sorted(table_data.columns) table_data['time'] = table_data['updateTime'].apply(lambda x: int(time.mktime(time.strptime(x, r"%Y-%m-%d %H:%M:%S")))) table_data = table_data[['time', *column_name]].drop(columns='updateTime') return table_data else: print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}") return pd.DataFrame([]) def get_devinfo_data_by_net(self, device_id, data_type, time_start:int, time_end:int, header=None): """ 读取设备信息, 返回接口json数据 """ if header is None: header = self.api_origin['header'] body = { "businessType": "7", "id": int(data_type), } req = requests.post(API_Map['dev_info'][0], json=body, headers=header) json_data = req.json() if json_data['code'] == 200: """ 数据读取成功 """ print(f"Get data success, len={len(json_data['data'])}") table_data = pd.DataFrame(json_data['data']) return table_data else: print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}") return pd.DataFrame([], columns=['dev', 'mid', 'time', 'value']) def spider_adapter(self, device_id:str, time_start:int, time_end:int): """ 爬取适配器数据 """ result = {} columns_adapter = list(filter(lambda x: SemaMap_adapter[x][2], SemaMap_adapter.keys())) data_adapter = pd.DataFrame([], columns=['time', *columns_adapter]) for k in columns_adapter: result[k] = self.get_history_data_by_net(device_id, SemaMap_adapter[k], time_start, time_end) if data_adapter.empty: data_adapter.time = result[k].time data_adapter[k] = result[k].value.apply(float) return data_adapter def spider_meter(self, device_id:str, time_start:int, time_end:int): """ 爬取电表数据 """ result = {} columns_meter = list(map(lambda x: x[4:], filter(lambda x: SemaMap_meter[x][2], SemaMap_meter.keys()))) data_meter = pd.DataFrame([], columns=['time', *columns_meter]) for k, v in SemaMap_meter.items(): if v[2]: result[k] = self.get_history_data_by_net(device_id, v, time_start, time_end) if data_meter.empty: data_meter.time = result[k].time data_meter[k[4:]] = result[k].value.apply(float) return data_meter def spider_station(self, device_id:str, time_start:int, time_end:int, header=None): """ 爬取站点数据 """ if header is None: header = self.api_origin['header'] body = { "devType": "", "accessPointId": "", "pageNum": 1, "pageSize": 10, "businessType": "7", "devCode": device_id, "deptIds": [] } req = requests.post(API_Map['search_dev'][0], json=body, headers=header) json_data = req.json() if json_data['code'] != 200: """ 数据读取失败 """ print(f"Get data fail, code={json_data['code']}, msg=\n\t{json_data['message']}") return pd.DataFrame([]) elif search_dev := json_data['rows']: print(f"Search device success, len={len(search_dev)}") station_id = search_dev[0]['stationCode'] else: print(f"Search device fail.") return pd.DataFrame([]) body = { "businessType": "7", "stationCode": station_id, } time.sleep(0.5) print(f"Get Data for Station: {station_id}") req = requests.post(API_Map['page'][0], json=body, headers=header) json_data = req.json() for dev in sorted(json_data['rows'], key=lambda x: x['devCode']): print(f"Dev: {dev['devTypeName']}, id={dev['devCode']}") time.sleep(0.5) fsu_id = dev['parentCode'] if 'parentCode' in dev.keys() else None self.get_real_data_by_net(dev['devCode'], fsu_id, header=header) time.sleep(0.5) match dev['devType']: case "0101": fsu_id = dev['devCode'] case "0102": self.spider_adapter(dev['devCode'], time_start, time_end) case "0103": self.spider_meter(dev['devCode'], time_start, time_end) print(f"Station Done.") def spider(self, device_id:str, time_start:int, time_end:int): """ 通用爬虫 """ if device_id[:8] == "TTE0102DX": """ 适配器数据 """ self.spider_adapter(device_id, time_start, time_end) elif device_id[:8] == "TTE0103DX": """ 电表数据 """ self.spider_meter(device_id, time_start, time_end) def graphs_adapter(self, device_id, time_start:int|str, time_end:int|str): """ 绘制图表-适配器数据 """ if type(time_start) is str: time_start = time.mktime(time.strptime(time_start, r"%Y-%m-%d %H:%M:%S")) if type(time_end) is str: time_end = time.mktime(time.strptime(time_end, r"%Y-%m-%d %H:%M:%S")) data = self.spider_adapter(device_id, time_start, time_end) self.chart_adapter(data) return data def chart_adapter(self, data_adapter): """ 绘制适配器信息图表 """ fig, ax1 = plt.subplots() ax1.plot(data_adapter['time'], data_adapter['volt_in'], color='green', label='Input Voltage') ax1.plot(data_adapter['time'], data_adapter['volt_out'], color='red', label='Output Voltage') ax2 = ax1.twinx() ax2.plot(data_adapter['time'], data_adapter['power_out'], color='gray', label='Output Power') # # 绘制斜线阴影 # for i in range(len(table_apt) - 1): # ax1.fill_between( # [table_apt['time'].iloc[i], table_apt['time'].iloc[i + 1]], # [table_apt['power_out'].iloc[i], table_apt['power_out'].iloc[i + 1]], # color='red', alpha=0.5) lines, labels = ax1.get_legend_handles_labels() shadows, shadow_labels = ax2.get_legend_handles_labels() ax1.legend(lines + shadows, labels + shadow_labels, loc='upper left') ax1.set_title('Device Data Visualization') ax1.set_xlabel('Time') ax1.set_ylabel('Voltage (V)') ax2.set_ylabel('Power (W)') plt.ioff() plt.show() plt.savefig('output.png') # plt.close() plt.ion() def sim_data_apt(times:tuple[int, int]): """ 模拟数据 """ t_start = time.mktime(time.strptime(times[0], r"%Y-%m-%d %H:%M:%S")) t_end = time.mktime(time.strptime(times[1], r"%Y-%m-%d %H:%M:%S")) count_data = (t_end - t_start) / (10 * 60) time_list = range(int(t_start), int(t_end), 20 * 60) time_list = tuple(map(lambda x: time.strftime(r"%Y-%m-%d %H:%M:%S", time.localtime(x)), time_list)) data = { 'time': time_list, 'volt_in': 10 + 10 * np.random.random(len(time_list)), 'curr_in': 1 + 2 * np.random.random(len(time_list)), 'volt_out': 54 + 2 * np.random.random(len(time_list)), } data['power_out'] = tuple(map(lambda x: x[0] * x[1], zip(data['volt_in'],data['curr_in']))) return pd.DataFrame(data) if __name__=='__main__': """ 主体调用流程 """ if hasattr(__builtins__,"__IPYTHON__"): path_db = '../result/chinatowercom.db' else: path_db = 'result/chinatowercom.db' API_HEADER['Cookie'] = "HWWAFSESTIME=1735108780906; HWWAFSESID=1c91597e07b0014c4d; dc04ed2361044be8a9355f6efb378cf2=WyIzNTI0NjE3OTgzIl0" API_HEADER['authorization'] = 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiIl0sInVzZXJfbmFtZSI6IndlYl9tYW5hZ2V8d2FuZ2xlaTQiLCJzY29wZSI6WyJhbGwiXSwiZXhwIjoxNzM1MjI2NDAyLCJ1c2VySWQiOjI0Mjg1LCJqdGkiOiJmNzA1ZTlkZC1mNTA5LTQwYzUtODFhNi0zYzdlMzhhZjE0ODgiLCJjbGllbnRfaWQiOiJ3ZWJfbWFuYWdlIn0.e8p-hKWoFyLJbtqKEvlIbLpz-_OB7Ak32d8qdTHNZEny12lUrUE0YYrWQTu0gGtT-eRNDJ62q51IUYOkM_5Ou0Qk2HLouR9-iygtgtjIno72466bv_ao5wvD2PZihXKaKet_c9mnpOqDpvaaApAU4_rk0u6Pg7uJG4stV-akaaMMqRLR-cK5ARePeyHophyGUx80kkSlnhYfGP2rJjEFva36iPaCzM6oiezObMoXWtAPw67vPS-5saTWnjYLrzxr3_s5Idk1pwWPNWfSa6Rl_YMKKiTdtWAEepyrxxOWVfMaeAQYt-ndHhxyBPjRluDTSwUViWmDidoFxkKPMQixVw' data_lamina = Lamina_Data('sqlite:///' + path_db) today = datetime.datetime.today() yesterday = today - datetime.timedelta(days=1) today_midnight = today.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) today_midnight_timestamp = time.mktime(today_midnight.timetuple()) yesterday_midnight_timestamp = time.mktime(yesterday_midnight.timetuple()) # 依据站点内设备爬取整个站点的实时与历史数据 data = data_lamina.spider_station('TTE0102DX2406272727', yesterday_midnight_timestamp, today_midnight_timestamp) # data = sim_data_apt(('2024-10-1 00:00:00', '2024-10-1 12:00:00')) # chart_apt(data) if not hasattr(__builtins__,"__IPYTHON__"): table_apt = data_lamina.graphs_adapter('TTE0102DX2406180988', '2024-11-23 00:00:00', '2024-11-26 00:00:00') while True: plt.waitforbuttonpress()