python_weather/zfm/get_weather.py

113 lines
3.8 KiB
Python

import csv
import json
import time
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import httpx
# 获取城市信息
def get_cities(cities_json):
cities_dict = {}
for city, provinces in cities_json.items():
city_list = []
for _, province_mes in provinces.items():
for province_name in province_mes:
area_id = cities_json[city][_][province_name]['AREAID']
city_list.append({province_name: area_id})
cities_dict[city] = city_list
return cities_dict
# 生成日期列表
def generate_year_and_month_list(start_year, start_month):
current_date = datetime.now()
start_date = datetime(start_year, start_month, 1)
year_and_month_list = []
while start_date <= current_date:
year = start_date.year
month = start_date.month
year_and_month_str = f"{year}{month:02d}"
year_and_month_list.append(year_and_month_str)
if start_date.month == 12:
start_date = datetime(start_date.year + 1, 1, 1)
else:
start_date += timedelta(days=31)
return year_and_month_list
# 发送请求
def send_request(url, params, headers):
while True:
resp = httpx.get(url, params=params, headers=headers, verify=False)
content = resp.text
if "setTimeout" in content:
# 增加延时
time.sleep(.05)
continue
encode = resp.encoding
decoded_html_content = content.encode(encode).decode('utf-8')
return decoded_html_content
# 获取天气数据
def fetch_weather_data_for_city(root_url, city, province, area_id, all_dates_str):
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Referer': 'http://www.weather.com.cn/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}
timestamp = str(int(datetime.timestamp(datetime.now())))
params = {'_': timestamp}
with open('weather_data.csv', mode='a', newline='', encoding='utf-8') as csv_file:
fieldnames = ['province', 'date', 'hgl', 'hmax', 'hmin']
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
for date in all_dates_str:
uri = f"/{date[:4]}/{area_id}_{date}.html"
content = send_request(root_url + uri, params, headers)
json_str = content[len("var fc40 = "):]
data_dict = json.loads(json_str)
for item in data_dict:
row = {
'province': province,
'date': item["date"],
'hgl': item["hgl"],
'hmax': item["hmax"],
'hmin': item["hmin"]
}
writer.writerow(row)
# 主程序入口
if __name__ == "__main__":
response = httpx.get('https://j.i8tq.com/weather2020/search/city.js')
cities_str = "\n".join(response.text.split("\n")[1:])
cities_json = json.loads(cities_str)
cities_dict = get_cities(cities_json)
all_dates_str = generate_year_and_month_list(2023, 1)
root_url = "http://d1.weather.com.cn/calendar_new"
with open('weather_data.csv', mode='w', newline='', encoding='utf-8') as csv_file:
fieldnames = ['city', 'date', 'hgl', 'hmax', 'hmin']
csv_file.write(",".join(fieldnames) + "\n")
with ThreadPoolExecutor(max_workers=10) as executor: # 使用最多 10 个线程
for city, provinces in cities_dict.items():
if city == "重庆":
for province in provinces:
for area_id in province.values():
executor.submit(fetch_weather_data_for_city, root_url, city, list(province.keys())[0], area_id,
all_dates_str)