python_weather/spark/aqi_now_analyse.py
2024-06-01 11:02:02 +08:00

28 lines
939 B
Python

from pyspark.sql.types import IntegerType, FloatType
from client import spark_app
from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
def aqi_now_analyse():
print("计算重庆市各个区县 现在 空气质量")
app = spark_app("aqi_now_analyse")
print("创建应用完成,开始读取数据")
df = app.read.csv(data_now_hdfs, header=True)
print("读取数据完成,开始处理数据")
# 取出数据
df_rain_data = df.select(
df['city_index'], df['city_name'],
df['aqi'].cast(FloatType()),
df['sqiText'],
).filter(df['aqi'] < 9999)
df_rain_data.cache()
print("处理完成,保存数据到数据库")
df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "aqi_now_analyse", "ignore", mysql_jdbc_prop)
print("各个区县 现在 空气质量 计算完毕")
return df_rain_data.head(20)
if __name__ == "__main__":
aqi_now_analyse()