系统功能介绍
系统技术介绍
大数据框架:Hadoop+Spark(本次没用Hive,支持定制)
开发语言:Python+Java(两个版本都支持)
后端框架:Django+Spring Boot(Spring+SpringMVC+Mybatis)(两个版本都支持)
前端:Vue+ElementUI+Echarts+HTML+CSS+JavaScript+jQuery
详细技术点:Hadoop、HDFS、Spark、Spark SQL、Pandas、NumPy
数据库:MySQL
系统背景与意义
系统演示视频
系统演示图片
大屏上
大屏下
车辆保值率分析
地区热门车型分析
新能源车市场分析
整体市场行情分析
系统部分代码
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, avg, count, desc, regexp_extract, when, splitfrom pyspark.sql.types import DoubleTypeimport mysql.connectordef analyze_brand_market_share():spark = SparkSession.builder.appName("CarDataAnalysis").config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.coalescePartitions.enabled", "true").getOrCreate()car_df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/car_analysis").option("dbtable", "car_data").option("user", "root").option("password", "123456").load()car_df_with_brand = car_df.withColumn("brand", regexp_extract(col("title"), r"^([^\s]+)", 1))brand_count = car_df_with_brand.groupBy("brand").agg(count("*").alias("vehicle_count")).filter(col("vehicle_count") > 50)total_vehicles = car_df_with_brand.count()brand_market_share = brand_count.withColumn("market_share_percent", (col("vehicle_count") / total_vehicles * 100).cast(DoubleType()))top_brands = brand_market_share.orderBy(desc("vehicle_count")).limit(20)brand_level_analysis = car_df_with_brand.groupBy("brand", "vehicle_level").agg(count("*").alias("level_count")).filter(col("level_count") > 10)brand_fuel_analysis = car_df_with_brand.groupBy("brand", "fuel_type").agg(count("*").alias("fuel_count")).filter(col("fuel_count") > 5)result_data = top_brands.collect()mysql_conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='car_analysis')cursor = mysql_conn.cursor()cursor.execute("DELETE FROM brand_analysis_result")for row in result_data:cursor.execute("INSERT INTO brand_analysis_result (brand_name, vehicle_count, market_share) VALUES (%s, %s, %s)", (row['brand'], row['vehicle_count'], row['market_share_percent']))mysql_conn.commit()cursor.close()mysql_conn.close()spark.stop()return result_datadef calculate_brand_retention_rate():spark = SparkSession.builder.appName("CarRetentionAnalysis").config("spark.sql.adaptive.enabled", "true").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()car_df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/car_analysis").option("dbtable", "car_data").option("user", "root").option("password", "123456").load()filtered_df = car_df.filter((col("price") > 0) & (col("new_car_price") > 0) & (col("price") <= col("new_car_price")))car_df_with_brand = filtered_df.withColumn("brand", regexp_extract(col("title"), r"^([^\s]+)", 1))retention_df = car_df_with_brand.withColumn("retention_rate", (col("price") / col("new_car_price") * 100).cast(DoubleType()))brand_retention = retention_df.groupBy("brand").agg(avg("retention_rate").alias("avg_retention_rate"), count("*").alias("sample_count")).filter(col("sample_count") >= 30)brand_retention_filtered = brand_retention.filter(col("avg_retention_rate").between(10, 95))top_retention_brands = brand_retention_filtered.orderBy(desc("avg_retention_rate")).limit(15)age_retention_analysis = retention_df.withColumn("car_age_group", when(col("car_age") <= 1, "0-1年").when(col("car_age") <= 3, "1-3年").when(col("car_age") <= 5, "3-5年").otherwise("5年以上"))age_brand_retention = age_retention_analysis.groupBy("brand", "car_age_group").agg(avg("retention_rate").alias("age_avg_retention"), count("*").alias("age_sample_count")).filter(col("age_sample_count") >= 10)mileage_retention_analysis = retention_df.withColumn("mileage_group", when(col("mileage") <= 30000, "低里程").when(col("mileage") <= 80000, "中里程").otherwise("高里程"))mileage_brand_retention = mileage_retention_analysis.groupBy("brand", "mileage_group").agg(avg("retention_rate").alias("mileage_avg_retention"), count("*").alias("mileage_sample_count")).filter(col("mileage_sample_count") >= 10)result_data = top_retention_brands.collect()mysql_conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='car_analysis')cursor = mysql_conn.cursor()cursor.execute("DELETE FROM retention_analysis_result")for row in result_data:cursor.execute("INSERT INTO retention_analysis_result (brand_name, avg_retention_rate, sample_count) VALUES (%s, %s, %s)", (row['brand'], round(row['avg_retention_rate'], 2), row['sample_count']))mysql_conn.commit()cursor.close()mysql_conn.close()spark.stop()return result_datadef analyze_new_energy_market():spark = SparkSession.builder.appName("NewEnergyCarAnalysis").config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.skewJoin.enabled", "true").getOrCreate()car_df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/car_analysis").option("dbtable", "car_data").option("user", "root").option("password", "123456").load()new_energy_df = car_df.filter(col("fuel_type").isin(["纯电动", "插电混动", "增程式"]))fuel_type_analysis = new_energy_df.groupBy("fuel_type").agg(count("*").alias("vehicle_count"), avg("price").alias("avg_price")).orderBy(desc("vehicle_count"))total_new_energy = new_energy_df.count()fuel_market_share = fuel_type_analysis.withColumn("market_share_percent", (col("vehicle_count") / total_new_energy * 100).cast(DoubleType()))new_energy_with_brand = new_energy_df.withColumn("brand", regexp_extract(col("title"), r"^([^\s]+)", 1))brand_fuel_analysis = new_energy_with_brand.groupBy("brand", "fuel_type").agg(count("*").alias("brand_fuel_count"), avg("price").alias("brand_avg_price")).filter(col("brand_fuel_count") >= 5)pure_electric_df = new_energy_df.filter(col("fuel_type") == "纯电动").filter(col("pure_electric_range") > 0)range_price_analysis = pure_electric_df.withColumn("range_group", when(col("pure_electric_range") <= 300, "短续航").when(col("pure_electric_range") <= 500, "中续航").otherwise("长续航"))range_analysis_result = range_price_analysis.groupBy("range_group").agg(count("*").alias("range_count"), avg("price").alias("range_avg_price"), avg("pure_electric_range").alias("avg_range"))battery_capacity_analysis = new_energy_df.filter(col("battery_capacity") > 0).withColumn("capacity_group", when(col("battery_capacity") <= 50, "小容量").when(col("battery_capacity") <= 80, "中容量").otherwise("大容量"))capacity_analysis_result = battery_capacity_analysis.groupBy("capacity_group").agg(count("*").alias("capacity_count"), avg("price").alias("capacity_avg_price"), avg("battery_capacity").alias("avg_capacity"))new_energy_retention = new_energy_with_brand.filter((col("price") > 0) & (col("new_car_price") > 0)).withColumn("retention_rate", (col("price") / col("new_car_price") * 100).cast(DoubleType()))brand_retention_new_energy = new_energy_retention.groupBy("brand").agg(avg("retention_rate").alias("ne_avg_retention"), count("*").alias("ne_sample_count")).filter(col("ne_sample_count") >= 10).orderBy(desc("ne_avg_retention"))result_data = fuel_market_share.collect()mysql_conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='car_analysis')cursor = mysql_conn.cursor()cursor.execute("DELETE FROM new_energy_analysis_result")for row in result_data:cursor.execute("INSERT INTO new_energy_analysis_result (fuel_type, vehicle_count, avg_price, market_share) VALUES (%s, %s, %s, %s)", (row['fuel_type'], row['vehicle_count'], round(row['avg_price'], 2), round(row['market_share_percent'], 2)))mysql_conn.commit()cursor.close()mysql_conn.close()spark.stop()return result_data


