diff --git a/spark/passed_rain_analyse.py b/spark/passed_rain_analyse.py deleted file mode 100644 index 4a1997b..0000000 --- a/spark/passed_rain_analyse.py +++ /dev/null @@ -1,25 +0,0 @@ -from pyspark.sql.types import DecimalType -from pyspark.sql import functions - -from client import spark_app -from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop - - -def passed_rain_analyse(): - print("计算各个城市过去24小时累积雨量") - app = spark_app("passed_rain_analyse") - print("start") - df = app.read.csv(data_hour_hdfs, header=True) - df_rain = df.select(df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1))) \ - .filter(df['rain1h'] < 1000) # 筛选数据,去除无效数据 - df_rain_sum = df_rain.groupBy("city_index", "city_name") \ - .agg(functions.sum("rain1h").alias("rain24h")) \ - .sort(functions.desc("rain24h")) # 分组、求和、排序 - df_rain_sum.cache() - df_rain_sum.coalesce(1).write.jdbc(mysql_jdbc_uri, "passed_rain_analyse", "ignore", mysql_jdbc_prop) - print("各个城市过去24小时累积雨量计算完毕") - return df_rain_sum.head(20) - - -if __name__ == "__main__": - passed_rain_analyse() diff --git a/spark/rain_24h_analyse.py b/spark/rain_24h_analyse.py new file mode 100644 index 0000000..901242b --- /dev/null +++ b/spark/rain_24h_analyse.py @@ -0,0 +1,30 @@ +from pyspark.sql.types import DecimalType +from pyspark.sql import functions + +from client import spark_app +from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop + + +def rain_24h_analyse(): + print("计算重庆市各个区县过去24小时累积雨量") + app = spark_app("rain_24h_analyse") + print("创建应用完成,开始读取数据") + df = app.read.csv(data_hour_hdfs, header=True) + print("读取数据完成,开始处理数据") + # 取出数据 + df_rain_data = df.select( + df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1)) + ).filter(df['rain1h'] < 9999) + # 分组、聚合、排序 + df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name") + .agg(functions.sum("rain1h").alias("rain24h")) + .sort(functions.desc("rain24h"))) + df_rain_sum_data.cache() + print("处理完成,保存数据到数据库") + df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop) + print("各个城市过去24小时累积雨量计算完毕") + return df_rain_sum_data.head(20) + + +if __name__ == "__main__": + rain_24h_analyse() diff --git a/spark/tem_2024_analyse.py b/spark/tem_2024_analyse.py new file mode 100644 index 0000000..4ae80ca --- /dev/null +++ b/spark/tem_2024_analyse.py @@ -0,0 +1,40 @@ +from pyspark.sql.types import IntegerType +from pyspark.sql import functions + +from client import spark_app +from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop + + +def tem_2024_analyse(): + print("计算重庆市各个区县 2024 年 每天的 最高气温,最低气温,平均气温") + app = spark_app("tem_2024_analyse") + print("创建应用完成,开始读取数据") + df = app.read.csv(data_day_hdfs, header=True) + print("读取数据完成,开始处理数据") + # 取出数据 + df_rain_data = df.select( + df['city'], df['date'], df['hmax'].cast(IntegerType()), df['hmin'].cast(IntegerType()) + ) + # 求平均 + df_rain_data = df_rain_data.withColumn("havg", (functions.col("hmax") + functions.col("hmin")) / 2) + # 转换 'date' 列的日期格式 + df_rain_data = df_rain_data.withColumn( + 'date', + functions.date_format(functions.from_unixtime( + functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType()) + ) + # 分组、聚合 + df_rain_data = (df_rain_data.groupBy("city", "date") + .agg(functions.max("hmax").alias("hmax"), + functions.min("hmin").alias("hmin"), + functions.round(functions.avg("havg"), 2).alias("havg")) + .sort("city", df_rain_data.date.desc())) + df_rain_data.cache() + print("处理完成,保存数据到数据库") + df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop) + print("各个区县 2024 年 每天的 最高气温,最低气温,平均气温计算完毕") + return df_rain_data.head(20) + + +if __name__ == "__main__": + tem_2024_analyse()