from pyspark.sql.types import FloatType from client import spark_app from data.data import data_now_hdfs, mysql_jdbc_uri, mysql_jdbc_prop def aqi_now_analyse(): print("计算重庆市各个区县 现在 空气质量") app = spark_app("aqi_now_analyse") print("创建应用完成,开始读取数据") df = app.read.csv(data_now_hdfs, header=True) print("读取数据完成,开始处理数据") # 取出数据 df_rain_data = df.select( df['city_index'], df['city_name'], df['aqi'].cast(FloatType()), df['sqiText'], ).filter(df['aqi'] < 9999) df_rain_data.cache() print("处理完成,保存数据到数据库") df_rain_data.coalesce(1).write.jdbc(mysql_jdbc_uri, "aqi_now_analyse", "ignore", mysql_jdbc_prop) print("各个区县 现在 空气质量 计算完毕") return df_rain_data.head(20) if __name__ == "__main__": aqi_now_analyse()