diff --git a/spark/aqi_now_analyse.py b/spark/aqi_now_analyse.py new file mode 100644 index 0000000..a89d8df --- /dev/null +++ b/spark/aqi_now_analyse.py @@ -0,0 +1,27 @@ +from pyspark.sql.types import IntegerType, FloatType + +from client import spark_app +from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop + + +def aqi_now_analyse(): + print("计算重庆市各个区县 现在 空气质量") + app = spark_app("aqi_now_analyse") + print("创建应用完成,开始读取数据") + df = app.read.csv(data_now_hdfs, header=True) + print("读取数据完成,开始处理数据") + # 取出数据 + df_rain_data = df.select( + df['city_index'], df['city_name'], + df['aqi'].cast(FloatType()), + df['sqiText'], + ).filter(df['aqi'] < 9999) + df_rain_data.cache() + print("处理完成,保存数据到数据库") + df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "aqi_now_analyse", "ignore", mysql_jdbc_prop) + print("各个区县 现在 空气质量 计算完毕") + return df_rain_data.head(20) + + +if __name__ == "__main__": + aqi_now_analyse() diff --git a/spark/tem_now_analyse.py b/spark/tem_now_analyse.py new file mode 100644 index 0000000..3e03fba --- /dev/null +++ b/spark/tem_now_analyse.py @@ -0,0 +1,29 @@ +from pyspark.sql.types import IntegerType, FloatType + +from client import spark_app +from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop + + +def tem_now_analyse(): + print("计算重庆市各个区县 现在 气温、湿度、风速") + app = spark_app("tem_now_analyse") + print("创建应用完成,开始读取数据") + df = app.read.csv(data_now_hdfs, header=True) + print("读取数据完成,开始处理数据") + # 取出数据 + df_rain_data = df.select( + df['city_index'], df['city_name'], + df['temperature'].cast(FloatType()), + df['humidity'].cast(IntegerType()), + df['wind_speed'].cast(FloatType()), + df['info'], + ).filter(df['info'] != "-") + df_rain_data.cache() + print("处理完成,保存数据到数据库") + df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "tem_now_analyse", "ignore", mysql_jdbc_prop) + print("各个区县 现在 气温、湿度、风速计算完毕") + return df_rain_data.head(20) + + +if __name__ == "__main__": + tem_now_analyse()