2024-06-01 02:30:14 +00:00
|
|
|
from pyspark.sql import functions
|
2024-06-01 04:39:58 +00:00
|
|
|
from pyspark.sql.types import DecimalType
|
2024-06-01 02:30:14 +00:00
|
|
|
|
|
|
|
from client import spark_app
|
|
|
|
from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
|
|
|
|
|
|
|
|
|
|
|
|
def rain_24h_analyse():
|
|
|
|
print("计算重庆市各个区县过去24小时累积雨量")
|
|
|
|
app = spark_app("rain_24h_analyse")
|
|
|
|
print("创建应用完成,开始读取数据")
|
|
|
|
df = app.read.csv(data_hour_hdfs, header=True)
|
|
|
|
print("读取数据完成,开始处理数据")
|
|
|
|
# 取出数据
|
|
|
|
df_rain_data = df.select(
|
|
|
|
df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1))
|
|
|
|
).filter(df['rain1h'] < 9999)
|
|
|
|
# 分组、聚合、排序
|
|
|
|
df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name")
|
2024-06-01 04:39:58 +00:00
|
|
|
.agg(functions.sum("rain1h").alias("rain24h"))
|
|
|
|
.sort(functions.desc("rain24h")))
|
2024-06-01 02:30:14 +00:00
|
|
|
df_rain_sum_data.cache()
|
|
|
|
print("处理完成,保存数据到数据库")
|
|
|
|
df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop)
|
|
|
|
print("各个城市过去24小时累积雨量计算完毕")
|
|
|
|
return df_rain_sum_data.head(20)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
rain_24h_analyse()
|