This commit is contained in:
xtao 2024-06-01 12:39:58 +08:00
parent 6baccbf23d
commit 5303741624
4 changed files with 14 additions and 13 deletions

View File

@ -1,4 +1,4 @@
from pyspark.sql.types import IntegerType, FloatType from pyspark.sql.types import FloatType
from client import spark_app from client import spark_app
from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop

View File

@ -1,4 +1,5 @@
import os import os
os.environ["JAVA_HOME"] = "/opt/modules/jdk1.8.0_212" os.environ["JAVA_HOME"] = "/opt/modules/jdk1.8.0_212"
os.environ["YARN_CONF_DIR"] = "/usr/hdp/2.6.3.0-235/hadoop-yarn/etc/hadoop/" os.environ["YARN_CONF_DIR"] = "/usr/hdp/2.6.3.0-235/hadoop-yarn/etc/hadoop/"

View File

@ -1,5 +1,5 @@
from pyspark.sql.types import DecimalType
from pyspark.sql import functions from pyspark.sql import functions
from pyspark.sql.types import DecimalType
from client import spark_app from client import spark_app
from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop from data.data import data_hour_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
@ -17,8 +17,8 @@ def rain_24h_analyse():
).filter(df['rain1h'] < 9999) ).filter(df['rain1h'] < 9999)
# 分组、聚合、排序 # 分组、聚合、排序
df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name") df_rain_sum_data = (df_rain_data.groupBy("city_index", "city_name")
.agg(functions.sum("rain1h").alias("rain24h")) .agg(functions.sum("rain1h").alias("rain24h"))
.sort(functions.desc("rain24h"))) .sort(functions.desc("rain24h")))
df_rain_sum_data.cache() df_rain_sum_data.cache()
print("处理完成,保存数据到数据库") print("处理完成,保存数据到数据库")
df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop) df_rain_sum_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "rain_24h_analyse", "ignore", mysql_jdbc_prop)

View File

@ -1,12 +1,12 @@
from pyspark.sql.types import IntegerType
from pyspark.sql import functions from pyspark.sql import functions
from pyspark.sql.types import IntegerType
from client import spark_app from client import spark_app
from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop from data.data import data_day_hdfs, mysql_jdbc_uri, mysql_jdbc_prop
def tem_2024_analyse(): def tem_2024_analyse():
print("计算重庆市各个区县 2024 年 每的 最高气温,最低气温,平均气温") print("计算重庆市各个区县 2024 年 每的 最高气温,最低气温,平均气温")
app = spark_app("tem_2024_analyse") app = spark_app("tem_2024_analyse")
print("创建应用完成,开始读取数据") print("创建应用完成,开始读取数据")
df = app.read.csv(data_day_hdfs, header=True) df = app.read.csv(data_day_hdfs, header=True)
@ -20,19 +20,19 @@ def tem_2024_analyse():
# 转换 'date' 列的日期格式 # 转换 'date' 列的日期格式
df_rain_data = df_rain_data.withColumn( df_rain_data = df_rain_data.withColumn(
'date', 'date',
functions.date_format(functions.from_unixtime( functions.date_format(functions.from_unixtime(
functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType()) functions.unix_timestamp('date', 'yyyyMMdd')), 'yyyyMM').cast(IntegerType())
) )
# 分组、聚合 # 分组、聚合
df_rain_data = (df_rain_data.groupBy("city", "date") df_rain_data = (df_rain_data.groupBy("city", "date")
.agg(functions.max("hmax").alias("hmax"), .agg(functions.max("hmax").alias("hmax"),
functions.min("hmin").alias("hmin"), functions.min("hmin").alias("hmin"),
functions.round(functions.avg("havg"), 2).alias("havg")) functions.round(functions.avg("havg"), 2).alias("havg"))
.sort("city", df_rain_data.date.desc())) .sort("city", df_rain_data.date.desc()))
df_rain_data.cache() df_rain_data.cache()
print("处理完成,保存数据到数据库") print("处理完成,保存数据到数据库")
df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop) df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_2024_analyse", "ignore", mysql_jdbc_prop)
print("各个区县 2024 年 每的 最高气温,最低气温,平均气温计算完毕") print("各个区县 2024 年 每的 最高气温,最低气温,平均气温计算完毕")
return df_rain_data.head(20) return df_rain_data.head(20)