pyspark 计算各个区县 2024 年 每天的 最高气温,最低气温,平均气温

This commit is contained in:
xtao 2024-06-01 10:30:14 +08:00
parent 19c06ec71e
commit 2f75f646dd
3 changed files with 70 additions and 25 deletions

View File

@ -1,25 +0,0 @@
from pyspark.sql.types import DecimalType
from pyspark.sql import functions
from client import spark_app
from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
def passed_rain_analyse():
print("计算各个城市过去24小时累积雨量")
app = spark_app("passed_rain_analyse")
print("start")
df = app.read.csv(data_hour_hdfs, header=True)
df_rain = df.select(df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1))) \
.filter(df['rain1h'] < 1000) # 筛选数据,去除无效数据
df_rain_sum = df_rain.groupBy("city_index", "city_name") \
.agg(functions.sum("rain1h").alias("rain24h")) \
.sort(functions.desc("rain24h")) # 分组、求和、排序
df_rain_sum.cache()
df_rain_sum.coalesce(1).write.jdbc(mysql_jdbc_uri, "passed_rain_analyse", "ignore", mysql_jdbc_prop)
print("各个城市过去24小时累积雨量计算完毕")
return df_rain_sum.head(20)
if __name__ == "__main__":
passed_rain_analyse()

30
spark/rain_24h_analyse.py Normal file
View File

@ -0,0 +1,30 @@
from pyspark.sql.types import DecimalType
from pyspark.sql import functions
from client import spark_app
from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
def rain_24h_analyse():
print("计算重庆市各个区县过去24小时累积雨量")
app = spark_app("rain_24h_analyse")
print("创建应用完成,开始读取数据")
df = app.read.csv(data_hour_hdfs, header=True)
print("读取数据完成,开始处理数据")
# 取出数据
df_rain_data = df.select(
df['city_index'], df['city_name'], df['rain1h'].cast(DecimalType(scale=1))
).filter(df['rain1h'] < 9999)
# 分组、聚合、排序
df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name")
.agg(functions.sum("rain1h").alias("rain24h"))
.sort(functions.desc("rain24h")))
df_rain_sum_data.cache()
print("处理完成,保存数据到数据库")
df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop)
print("各个城市过去24小时累积雨量计算完毕")
return df_rain_sum_data.head(20)
if __name__ == "__main__":
rain_24h_analyse()

40
spark/tem_2024_analyse.py Normal file
View File

@ -0,0 +1,40 @@
from pyspark.sql.types import IntegerType
from pyspark.sql import functions
from client import spark_app
from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
def tem_2024_analyse():
print("计算重庆市各个区县 2024 年 每天的 最高气温,最低气温,平均气温")
app = spark_app("tem_2024_analyse")
print("创建应用完成,开始读取数据")
df = app.read.csv(data_day_hdfs, header=True)
print("读取数据完成,开始处理数据")
# 取出数据
df_rain_data = df.select(
df['city'], df['date'], df['hmax'].cast(IntegerType()), df['hmin'].cast(IntegerType())
)
# 求平均
df_rain_data = df_rain_data.withColumn("havg", (functions.col("hmax") + functions.col("hmin")) / 2)
# 转换 'date' 列的日期格式
df_rain_data = df_rain_data.withColumn(
'date',
functions.date_format(functions.from_unixtime(
functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType())
)
# 分组、聚合
df_rain_data = (df_rain_data.groupBy("city", "date")
.agg(functions.max("hmax").alias("hmax"),
functions.min("hmin").alias("hmin"),
functions.round(functions.avg("havg"), 2).alias("havg"))
.sort("city", df_rain_data.date.desc()))
df_rain_data.cache()
print("处理完成,保存数据到数据库")
df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop)
print("各个区县 2024 年 每天的 最高气温,最低气温,平均气温计算完毕")
return df_rain_data.head(20)
if __name__ == "__main__":
tem_2024_analyse()