41 lines
1.7 KiB
Python
41 lines
1.7 KiB
Python
from pyspark.sql import functions
|
|
from pyspark.sql.types import IntegerType
|
|
|
|
from client import spark_app
|
|
from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
|
|
|
|
|
|
def tem_2024_analyse():
|
|
print("计算重庆市各个区县 2024 年 每月的 最高气温,最低气温,平均气温")
|
|
app = spark_app("tem_2024_analyse")
|
|
print("创建应用完成,开始读取数据")
|
|
df = app.read.csv(data_day_hdfs, header=True)
|
|
print("读取数据完成,开始处理数据")
|
|
# 取出数据
|
|
df_rain_data = df.select(
|
|
df['city'], df['date'], df['hmax'].cast(IntegerType()), df['hmin'].cast(IntegerType())
|
|
)
|
|
# 求平均
|
|
df_rain_data = df_rain_data.withColumn("havg", (functions.col("hmax") + functions.col("hmin")) / 2)
|
|
# 转换 'date' 列的日期格式
|
|
df_rain_data = df_rain_data.withColumn(
|
|
'date',
|
|
functions.date_format(functions.from_unixtime(
|
|
functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType())
|
|
)
|
|
# 分组、聚合
|
|
df_rain_data = (df_rain_data.groupBy("city", "date")
|
|
.agg(functions.max("hmax").alias("hmax"),
|
|
functions.min("hmin").alias("hmin"),
|
|
functions.round(functions.avg("havg"), 2).alias("havg"))
|
|
.sort("city", df_rain_data.date.desc()))
|
|
df_rain_data.cache()
|
|
print("处理完成,保存数据到数据库")
|
|
df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop)
|
|
print("各个区县 2024 年 每月的 最高气温,最低气温,平均气温计算完毕")
|
|
return df_rain_data.head(20)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tem_2024_analyse()
|