from pyspark.sql import functions from pyspark.sql.types import DecimalType from client import spark_app from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop def rain_24h_analyse(): print("计算重庆市各个区县过去24小时累积雨量") app = spark_app("rain_24h_analyse") print("创建应用完成,开始读取数据") df = app.read.csv(data_hour_hdfs, header=True) print("读取数据完成,开始处理数据") # 取出数据 df_rain_data = df.select( df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1)) ).filter(df['rain1h'] < 9999) # 分组、聚合、排序 df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name") .agg(functions.sum("rain1h").alias("rain24h")) .sort(functions.desc("rain24h"))) df_rain_sum_data.cache() print("处理完成,保存数据到数据库") df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop) print("各个城市过去24小时累积雨量计算完毕") return df_rain_sum_data.head(20) if __name__ == "__main__": rain_24h_analyse()