diff --git a/spark/aqi_now_analyse.py b/spark/aqi_now_analyse.py index a89d8df..9898fce 100644 --- a/spark/aqi_now_analyse.py +++ b/spark/aqi_now_analyse.py @@ -1,4 +1,4 @@ -from pyspark.sql.types import IntegerType, FloatType +from pyspark.sql.types import FloatType from client import spark_app from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop diff --git a/spark/client.py b/spark/client.py index d12d670..ce6900a 100644 --- a/spark/client.py +++ b/spark/client.py @@ -1,4 +1,5 @@ import os + os.environ["JAVA_HOME"] = "/opt/modules/jdk1.8.0_212" os.environ["YARN_CONF_DIR"] = "/usr/hdp/2.6.3.0-235/hadoop-yarn/etc/hadoop/" diff --git a/spark/rain_24h_analyse.py b/spark/rain_24h_analyse.py index 901242b..0712108 100644 --- a/spark/rain_24h_analyse.py +++ b/spark/rain_24h_analyse.py @@ -1,5 +1,5 @@ -from pyspark.sql.types import DecimalType from pyspark.sql import functions +from pyspark.sql.types import DecimalType from client import spark_app from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop @@ -17,8 +17,8 @@ def rain_24h_analyse(): ).filter(df['rain1h'] < 9999) # 分组、聚合、排序 df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name") - .agg(functions.sum("rain1h").alias("rain24h")) - .sort(functions.desc("rain24h"))) + .agg(functions.sum("rain1h").alias("rain24h")) + .sort(functions.desc("rain24h"))) df_rain_sum_data.cache() print("处理完成,保存数据到数据库") df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop) diff --git a/spark/tem_2024_analyse.py b/spark/tem_2024_analyse.py index 4ae80ca..22f292e 100644 --- a/spark/tem_2024_analyse.py +++ b/spark/tem_2024_analyse.py @@ -1,12 +1,12 @@ -from pyspark.sql.types import IntegerType from pyspark.sql import functions +from pyspark.sql.types import IntegerType from client import spark_app from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop def tem_2024_analyse(): - print("计算重庆市各个区县 2024 年 每天的 最高气温,最低气温,平均气温") + print("计算重庆市各个区县 2024 年 每月的 最高气温,最低气温,平均气温") app = spark_app("tem_2024_analyse") print("创建应用完成,开始读取数据") df = app.read.csv(data_day_hdfs, header=True) @@ -20,19 +20,19 @@ def tem_2024_analyse(): # 转换 'date' 列的日期格式 df_rain_data = df_rain_data.withColumn( 'date', - functions.date_format(functions.from_unixtime( - functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType()) + functions.date_format(functions.from_unixtime( + functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType()) ) # 分组、聚合 df_rain_data = (df_rain_data.groupBy("city", "date") - .agg(functions.max("hmax").alias("hmax"), - functions.min("hmin").alias("hmin"), - functions.round(functions.avg("havg"), 2).alias("havg")) - .sort("city", df_rain_data.date.desc())) + .agg(functions.max("hmax").alias("hmax"), + functions.min("hmin").alias("hmin"), + functions.round(functions.avg("havg"), 2).alias("havg")) + .sort("city", df_rain_data.date.desc())) df_rain_data.cache() print("处理完成,保存数据到数据库") df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop) - print("各个区县 2024 年 每天的 最高气温,最低气温,平均气温计算完毕") + print("各个区县 2024 年 每月的 最高气温,最低气温,平均气温计算完毕") return df_rain_data.head(20)