from flask import Flask, jsonify
import requests
import time
app = Flask(__name__)
# 可直接修改的变量
TOKEN_ONLINE = "your_token_online_value"
APP_ID = "your_app_id_value"
DEVICE_CODE = "your_device_code_value"
DESMOBIEL = "your_desmobiel_value"
HEARTBEAT_DATA = "your_heartbeat_data_value"
def simulate_login_and_get_data():
# 建立会话,自动保存和传递 cookies
session = requests.Session()
# 通用请求头
headers = {
"User-Agent": "ChinaUnicom.x CFNetwork iOS/18.0 unicom{version:[email protected]}",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
# 登录接口的 URL
login_url = "https://m.client.10010.com/mobileService/onLine.htm"
# 登录 POST 请求的表单数据
login_data = {
"reqtime": "2025-02-20 14:44:14",
"version": "[email protected]",
"token_online": TOKEN_ONLINE, # 使用可变变量
"longitude": "113.225391",
"appId": APP_ID, # 使用可变变量
"deviceId": "5cf194efdf7acaa8e86b22d0add453b8ced10a2cac56fd2934fb16db75739ed2",
"latitude": "23.421746",
"deviceModel": "iPhone",
"step": "background",
"isFirstInstall": "1",
"deviceCode": DEVICE_CODE, # 使用可变变量
}
# 构造登录请求头(需指定 Content-Type)
login_headers = headers.copy()
login_headers.update({
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive"
})
# 发送登录请求
login_resp = session.post(login_url, headers=login_headers, data=login_data)
if login_resp.status_code != 200:
return None, "登录失败,状态码:" + str(login_resp.status_code)
# 登录后使用同一个 session 发送 GET 请求获取业务数据
data_url = "https://m.client.10010.com/mobileserviceimportant/home/queryUserInfoSeven"
params = {
"version": "[email protected]",
"desmobiel": DESMOBIEL, # 使用可变变量
"showType": "0"
}
data_resp = session.get(data_url, headers=headers, params=params)
if data_resp.status_code == 200:
try:
json_data = data_resp.json()
return json_data, None
except Exception as e:
return None, "解析 JSON 失败:" + str(e)
else:
return None, "数据获取失败,状态码:" + str(data_resp.status_code)
def send_heartbeat(session):
"""
发送心跳包请求,保持会话活跃
"""
heartbeat_url = "https://playback.newbuy.chinaunicom.cn/dcnew/webinsight/data"
params = {
"time": str(int(time.time() * 1000)) # 使用当前的时间戳(单位毫秒)
}
heartbeat_headers = {
"Host": "playback.newbuy.chinaunicom.cn",
"Accept": "*/*",
"Sec-Fetch-Site": "cross-site",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Cache-Control": "max-age=0",
"Sec-Fetch-Mode": "no-cors",
"Accept-Encoding": "gzip, deflate, br",
"Origin": "https://img.client.10010.com",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:[email protected]}",
"Referer": "https://img.client.10010.com/",
"Connection": "keep-alive",
"Content-Type": "text/plain;charset=UTF-8",
"Sec-Fetch-Dest": "empty"
}
heartbeat_data = HEARTBEAT_DATA # 使用可变变量
session.post(heartbeat_url, headers=heartbeat_headers, params=params, data=heartbeat_data)
@app.route('/api/get_data', methods=['GET'])
def get_data():
# 模拟登录,获取数据
session = requests.Session()
json_data, error = simulate_login_and_get_data()
if error:
return jsonify({"error": error}), 400
# 发送心跳包,保持会话活跃
send_heartbeat(session)
# 提取所需的字段
response_data = {
"flush_date_time": json_data.get("flush_date_time", ""),
"feePersent": json_data.get("feeResource", {}).get("feePersent", ""),
"flowPersent": json_data.get("flowResource", {}).get("flowPersent", "")
}
return jsonify(response_data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9996)
二.步骤2.