概念:线性回归是一种通过线性组合来建模因变量和自变量之间关系的方法。
原理:假设因变量Y与自变量X之间存在线性关系,即Y = β0 + β1X1 + ... + βnXn + ε,其中ε为误差项。通过最小二乘法估计参数β。
思想:找到一条直线(或超平面),使得所有样本点到该直线的距离(残差)的平方和最小。
应用:广泛应用于经济学、金融学、生物学等领域,用于预测和因果关系分析。
-数据预处理:
-模型构建:
-训练:
-评估:
-可视化:
-保存结果:
定量变量预测的机器学习模型可分为传统统计模型、树基集成模型、核方法和深度学习模型四大类,每类模型通过不同机制捕捉数据模式,适用于从线性到复杂非线性关系的预测任务。
代码涵盖了从数据准备到结果保存的自动化过程,包括数据预处理、模型配置、性能评估和报告生成。
# pip install pandas numpy matplotlib seaborn scikit-learn statsmodels openpyxl reportlab
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import LabelEncoder
from statsmodels.stats.outliers_influence import variance_inflation_factor
import statsmodels.api as sm
import os
import warnings
from pathlib import Path
import scipy.stats as stats
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
import matplotlib.font_manager as fm
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun']
plt.rcParams['axes.unicode_minus'] = False
# 忽略警告
warnings.filterwarnings('ignore')
class LinearRegressionAnalysis:
def __init__(self, data_path, output_dir):
self.data_path = data_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.data = None
self.model = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.train_predictions = None
self.test_predictions = None
def load_data(self):
"""读取数据"""
self.data = pd.read_excel(self.data_path, sheet_name='示例数据')
print("数据形状:", self.data.shape)
print("\n数据前5行:")
print(self.data.head())
print("\n数据基本信息:")
print(self.data.info())
print("\n数据描述性统计:")
print(self.data.describe())
def preprocess_data(self):
"""数据预处理"""
# 处理分类变量
categorical_columns = ['结局', '肥胖程度', '教育水平', '血型', '指标8']
label_encoders = {}
for col in categorical_columns:
if col in self.data.columns:
le = LabelEncoder()
self.data[col] = le.fit_transform(self.data[col].astype(str))
label_encoders[col] = le
# 检查缺失值
missing_values = self.data.isnull().sum().sum()
if missing_values > 0:
print(f"删除 {missing_values} 个含有缺失值的行")
self.data = self.data.dropna()
return label_encoders
def split_data(self, test_size=0.3):
"""划分训练集和测试集"""
# 移除序号列
features = self.data.drop(['序号', '指标1'], axis=1, errors='ignore')
target = self.data['指标1']
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
features, target, test_size=test_size, random_state=42
)
print(f"训练集样本量: {len(self.X_train)}")
print(f"测试集样本量: {len(self.X_test)}")
def calculate_mse(pred, actual):
return mean_squared_error(actual, pred)
performance_metrics = pd.DataFrame({
'Dataset': ['Training', 'Testing'],
'R2': [
calculate_r2(self.train_predictions, self.y_train),
calculate_r2(self.test_predictions, self.y_test)
],
'RMSE': [
calculate_rmse(self.train_predictions, self.y_train),
calculate_rmse(self.test_predictions, self.y_test)
],
'MAE': [
calculate_mae(self.train_predictions, self.y_train),
calculate_mae(self.test_predictions, self.y_test)
],
'MSE': [
calculate_mse(self.train_predictions, self.y_train),
calculate_mse(self.test_predictions, self.y_test)
]
})
return performance_metrics
def plot_residual_analysis(self):
"""残差分析可视化"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 训练集残差
train_residuals = self.y_train - self.train_predictions
axes[0, 0].scatter(self.train_predictions, train_residuals, alpha=0.6)
axes[0, 0].axhline(y=0, color='red', linestyle='--')
axes[0, 0].set_title('Training Set - Residual Plot')
axes[0, 0].set_xlabel('Fitted Values')
axes[0, 0].set_ylabel('Residuals')
# 测试集残差
test_residuals = self.y_test - self.test_predictions
axes[0, 1].scatter(self.test_predictions, test_residuals, alpha=0.6, color='orange')
axes[0, 1].axhline(y=0, color='red', linestyle='--')
axes[0, 1].set_title('Test Set - Residual Plot')
axes[0, 1].set_xlabel('Fitted Values')
axes[0, 1].set_ylabel('Residuals')
# QQ图 - 训练集
stats.probplot(train_residuals, dist="norm", plot=axes[0, 2])
axes[0, 2].set_title('Training Set - Q-Q Plot of Residuals')
# QQ图 - 测试集
stats.probplot(test_residuals, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title('Test Set - Q-Q Plot of Residuals')
# 残差直方图 - 训练集
axes[1, 1].hist(train_residuals, bins=30, density=True, alpha=0.7, color='lightblue')
axes[1, 1].set_title('Training Set - Distribution of Residuals')
axes[1, 1].set_xlabel('Residuals')
axes[1, 1].set_ylabel('Density')
# 残差直方图 - 测试集
axes[1, 2].hist(test_residuals, bins=30, density=True, alpha=0.7, color='lightcoral')
axes[1, 2].set_title('Test Set - Distribution of Residuals')
axes[1, 2].set_xlabel('Residuals')
axes[1, 2].set_ylabel('Density')
plt.tight_layout()
plt.savefig(self.output_dir / 'residual_analysis.jpg', dpi=300, bbox_inches='tight')
plt.close()
return train_residuals, test_residuals
def plot_prediction_vs_actual(self):
"""预测值 vs 实际值可视化"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 训练集
ax1.scatter(self.y_train, self.train_predictions, alpha=0.6, color='blue')
ax1.plot([self.y_train.min(), self.y_train.max()],
[self.y_train.min(), self.y_train.max()], 'red', linestyle='--')
ax1.set_title('Training Set - Predicted vs Actual Values')
ax1.set_xlabel('Actual Values')
ax1.set_ylabel('Predicted Values')
# 测试集
ax2.scatter(self.y_test, self.test_predictions, alpha=0.6, color='orange')
ax2.plot([self.y_test.min(), self.y_test.max()],
[self.y_test.min(), self.y_test.max()], 'red', linestyle='--')
ax2.set_title('Test Set - Predicted vs Actual Values')
ax2.set_xlabel('Actual Values')
ax2.set_ylabel('Predicted Values')
plt.tight_layout()
plt.savefig(self.output_dir / 'predicted_vs_actual.jpg', dpi=300, bbox_inches='tight')
plt.close()
def plot_performance_comparison(self, performance_metrics):
"""性能指标比较图"""
performance_long = performance_metrics.melt(id_vars=['Dataset'],
var_name='Metric',
value_name='Value')
plt.figure(figsize=(10, 6))
sns.barplot(data=performance_long, x='Metric', y='Value', hue='Dataset')
plt.title('Model Performance Comparison')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(self.output_dir / 'performance_comparison.jpg', dpi=300, bbox_inches='tight')
plt.close()
def plot_coefficients(self):
"""系数重要性可视化"""
coef_data = pd.DataFrame({
'Variable': ['const'] + list(self.X_train.columns),
'Coefficient': [self.sm_model.params[0]] + list(self.sm_model.params[1:]),
'Std_Error': [self.sm_model.bse[0]] + list(self.sm_model.bse[1:])
})
# 移除截距项进行可视化
coef_plot_data = coef_data[coef_data['Variable'] != 'const'].sort_values('Coefficient')
plt.figure(figsize=(10, 8))
y_pos = np.arange(len(coef_plot_data))
plt.barh(y_pos, coef_plot_data['Coefficient'], xerr=coef_plot_data['Std_Error'],
alpha=0.7, color='steelblue')
plt.axvline(x=0, color='red', linestyle='--')
plt.yticks(y_pos, coef_plot_data['Variable'])
plt.xlabel('Coefficient Estimate')
plt.title('Variable Coefficient Estimates (Training Set)')
plt.tight_layout()
plt.savefig(self.output_dir / 'coefficient_plot.jpg', dpi=300, bbox_inches='tight')
plt.close()
return coef_data
def calculate_vif(self):
"""计算VIF"""
# 添加常数项
X_with_const = sm.add_constant(self.X_train)
vif_data = pd.DataFrame()
vif_data['Variable'] = X_with_const.columns
vif_data['VIF'] = [variance_inflation_factor(X_with_const.values, i)
for i in range(X_with_const.shape[1])]
# 可视化VIF
plt.figure(figsize=(10, 6))
vif_plot_data = vif_data[vif_data['Variable'] != 'const'].sort_values('VIF')
sns.barplot(data=vif_plot_data, y='Variable', x='VIF', palette='viridis')
plt.axvline(x=5, color='red', linestyle='--', label='VIF=5')
plt.axvline(x=10, color='darkred', linestyle='--', label='VIF=10')
plt.title('Variance Inflation Factor (VIF)')
plt.legend()
plt.tight_layout()
plt.savefig(self.output_dir / 'vif_plot.jpg', dpi=300, bbox_inches='tight')
plt.close()
return vif_data
def plot_learning_curve(self):
"""学习曲线分析"""
learning_curve_data = []
sample_sizes = np.linspace(0.1, 0.9, 9)
for size in sample_sizes:
n_samples = int(size * len(self.X_train))
if n_samples > 0:
# 随机抽取子集
indices = np.random.choice(len(self.X_train), n_samples, replace=False)
X_subset = self.X_train.iloc[indices]
y_subset = self.y_train.iloc[indices]
# 训练模型
temp_model = LinearRegression()
temp_model.fit(X_subset, y_subset)
# 训练集性能
train_pred = temp_model.predict(X_subset)
train_rmse = np.sqrt(mean_squared_error(y_subset, train_pred))
# 测试集性能
test_pred = temp_model.predict(self.X_test)
test_rmse = np.sqrt(mean_squared_error(self.y_test, test_pred))
learning_curve_data.extend([
{'SampleSize': n_samples, 'Dataset': 'Training', 'RMSE': train_rmse},
{'SampleSize': n_samples, 'Dataset': 'Testing', 'RMSE': test_rmse}
])
learning_df = pd.DataFrame(learning_curve_data)
plt.figure(figsize=(10, 6))
for dataset in ['Training', 'Testing']:
data = learning_df[learning_df['Dataset'] == dataset]
plt.plot(data['SampleSize'], data['RMSE'], marker='o', label=dataset, linewidth=2)
plt.title('Learning Curve')
plt.xlabel('Training Sample Size')
plt.ylabel('RMSE')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(self.output_dir / 'learning_curve.jpg', dpi=300, bbox_inches='tight')
plt.close()
return learning_df
def plot_error_distribution(self, train_residuals, test_residuals):
"""预测误差分布比较"""
error_comparison = pd.DataFrame({
'Error': np.concatenate([train_residuals, test_residuals]),
'Dataset': (['Training'] * len(train_residuals) +
['Testing'] * len(test_residuals))
})
plt.figure(figsize=(10, 6))
sns.kdeplot(data=error_comparison, x='Error', hue='Dataset', fill=True, alpha=0.5)
plt.title('Prediction Error Distribution Comparison')
plt.xlabel('Prediction Error')
plt.ylabel('Density')
plt.tight_layout()
plt.savefig(self.output_dir / 'error_distribution.jpg', dpi=300, bbox_inches='tight')
plt.close()
def save_results(self, performance_metrics, coef_data, vif_data):
"""保存结果到CSV文件"""
# 保存性能指标
performance_metrics.to_csv(self.output_dir / '模型性能指标.csv',
index=False, encoding='utf-8-sig')
# 保存系数结果
coef_data.to_csv(self.output_dir / '线性回归系数结果.csv',
index=False, encoding='utf-8-sig')
# 保存VIF结果
vif_data.to_csv(self.output_dir / 'VIF结果.csv',
index=False, encoding='utf-8-sig')
# 保存详细性能指标
performance_details = pd.DataFrame({
'Metric': ['R-squared', 'Adjusted R-squared', 'RMSE', 'MAE', 'MSE'],
'Training': [
performance_metrics.loc[0, 'R2'],
self.sm_model.rsquared_adj,
performance_metrics.loc[0, 'RMSE'],
performance_metrics.loc[0, 'MAE'],
performance_metrics.loc[0, 'MSE']
],
'Testing': [
performance_metrics.loc[1, 'R2'],
np.nan, # 测试集没有调整R方
performance_metrics.loc[1, 'RMSE'],
performance_metrics.loc[1, 'MAE'],
performance_metrics.loc[1, 'MSE']
]
})
performance_details.to_csv(self.output_dir / '详细性能指标.csv',
index=False, encoding='utf-8-sig')
def generate_report(self, performance_metrics, coef_data, vif_data):
"""生成分析报告"""
doc = SimpleDocTemplate(str(self.output_dir / "机器学习线性回归分析报告.pdf"),
pagesize=letter)
styles = getSampleStyleSheet()
story = []
# 标题
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=16,
spaceAfter=30,
alignment=1 # 居中
)
story.append(Paragraph("机器学习线性回归分析报告", title_style))
story.append(Paragraph(f"生成日期: {pd.Timestamp.now().strftime('%Y-%m-%d')}", styles["Normal"]))
story.append(Spacer(1, 12))
# 数据概述
story.append(Paragraph("数据概述", styles["Heading2"]))
data_info = [
["总样本量:", str(len(self.data))],
["训练集样本量:", str(len(self.X_train))],
["测试集样本量:", str(len(self.X_test))],
["变量数量:", str(len(self.X_train.columns))]
]
data_table = Table(data_info)
data_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(data_table)
story.append(Spacer(1, 12))
# 模型性能摘要
story.append(Paragraph("模型性能摘要", styles["Heading2"]))
perf_data = [performance_metrics.columns.tolist()] + performance_metrics.values.tolist()
perf_table = Table(perf_data)
perf_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(perf_table)
story.append(Spacer(1, 12))
# 回归系数
story.append(Paragraph("回归系数", styles["Heading2"]))
coef_display = coef_data.round(4)
coef_list = [coef_display.columns.tolist()] + coef_display.values.tolist()
coef_table = Table(coef_list)
coef_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTSIZE', (0, 1), (-1, -1), 8)
]))
story.append(coef_table)
story.append(Spacer(1, 12))
# 多重共线性检查
if len(vif_data) > 0:
story.append(Paragraph("多重共线性检查(VIF)", styles["Heading2"]))
vif_display = vif_data.round(4)
vif_list = [vif_display.columns.tolist()] + vif_display.values.tolist()
vif_table = Table(vif_list)
vif_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(vif_table)
story.append(Spacer(1, 12))
# 结论
story.append(Paragraph("结论", styles["Heading2"]))
story.append(Paragraph("基于机器学习线性回归分析,主要发现如下:", styles["Normal"]))
train_r2 = performance_metrics.loc[0, 'R2']
test_r2 = performance_metrics.loc[1, 'R2']
train_test_gap = train_r2 - test_r2
conclusions = [
f"- 训练集R平方: {train_r2:.4f}",
f"- 测试集R平方: {test_r2:.4f}",
f"- 训练集RMSE: {performance_metrics.loc[0, 'RMSE']:.4f}",
f"- 测试集RMSE: {performance_metrics.loc[1, 'RMSE']:.4f}",
"模型泛化能力分析:"
]
for conclusion in conclusions:
story.append(Paragraph(conclusion, styles["Normal"]))
if train_test_gap > 0.1:
story.append(Paragraph("- 存在明显的过拟合现象,训练集性能远高于测试集", styles["Normal"]))
elif train_test_gap < 0.05:
story.append(Paragraph("- 模型泛化能力良好,训练集和测试集性能接近", styles["Normal"]))
else:
story.append(Paragraph("- 模型具有一定的泛化能力,但仍有改进空间", styles["Normal"]))
story.append(Paragraph("建议进一步进行特征工程和模型调优。", styles["Normal"]))
doc.build(story)
print("PDF报告生成成功!")
def run_analysis(self):
"""运行完整分析流程"""
print("=== 开始机器学习线性回归分析 ===")
# 1. 加载数据
print("1. 加载数据...")
self.load_data()
# 2. 数据预处理
print("2. 数据预处理...")
self.preprocess_data()
# 3. 划分数据集
print("3. 划分数据集...")
self.split_data()
# 4. 训练模型
print("4. 训练模型...")
self.train_model()
# 5. 计算性能指标
print("5. 计算性能指标...")
performance_metrics = self.calculate_metrics()
# 6. 可视化分析
print("6. 生成可视化图表...")
train_residuals, test_residuals = self.plot_residual_analysis()
self.plot_prediction_vs_actual()
self.plot_performance_comparison(performance_metrics)
coef_data = self.plot_coefficients()
vif_data = self.calculate_vif()
learning_curve_data = self.plot_learning_curve()
self.plot_error_distribution(train_residuals, test_residuals)
# 7. 保存结果
print("7. 保存结果...")
self.save_results(performance_metrics, coef_data, vif_data)
# 8. 生成报告
print("8. 生成分析报告...")
self.generate_report(performance_metrics, coef_data, vif_data)
# 获取训练集和测试集的R²值
train_r2 = performance_metrics.loc[0, 'R2']
test_r2 = performance_metrics.loc[1, 'R2']
train_test_gap = train_r2 - test_r2
# 输出关键性能指标
print("\n=== 机器学习线性回归分析完成 ===")
print("所有结果已保存到:", self.output_dir)
print("\n关键性能指标:")
print(f"训练集R平方: {train_r2:.4f}")
print(f"测试集R平方: {test_r2:.4f}")
print(f"性能差异: {train_test_gap:.4f}")
if train_test_gap > 0.1:
print("模型状态: 存在过拟合")
elif train_test_gap < 0.05:
print("模型状态: 泛化能力良好")
else:
print("模型状态: 有一定泛化能力")
# 主程序
if __name__ == "__main__":
# 设置路径
desktop_path = Path.home() / "Desktop"
data_path = desktop_path / "示例数据.xlsx"
output_dir = desktop_path / "Results模型-ML-Linear"
# 检查数据文件是否存在
if not data_path.exists():
print(f"错误: 数据文件不存在: {data_path}")
print("请确保桌面有 '示例数据.xlsx' 文件")
else:
# 运行分析
analysis = LinearRegressionAnalysis(data_path, output_dir)
analysis.run_analysis()
1. 线性回归(Linear Regression)
原理:通过线性方程 \( y = \beta_0 + \beta_1x_1 + \cdots + \beta_nx_n + \epsilon \) 拟合因变量与自变量的关系,最小化残差平方和(OLS)。
核心思想:假设变量间存在线性因果关系,适用于特征与目标变量呈直线趋势的场景。
应用范围:房价预测、销量预估等低维数据,如通过广告投入预测产品销量,模型形式简单且可解释性强。
局限:无法捕捉非线性关系,对异常值敏感。
医学统计数据分析分享交流SPSS、R语言、Python、ArcGis、Geoda、GraphPad、数据分析图表制作等心得。承接数据分析,论文返修,医学统计,机器学习,生存分析,空间分析,问卷分析业务。若有投稿和数据分析代做需求,可以直接联系我,谢谢!
“医学统计数据分析”公众号右下角;
找到“联系作者”,
可加我微信,邀请入粉丝群!
有临床流行病学数据分析
如(t检验、方差分析、χ2检验、logistic回归)、
(重复测量方差分析与配对T检验、ROC曲线)、
(非参数检验、生存分析、样本含量估计)、
(筛检试验:灵敏度、特异度、约登指数等计算)、
(绘制柱状图、散点图、小提琴图、列线图等)、
机器学习、深度学习、生存分析
等需求的同仁们,加入【临床】粉丝群。
疾控,公卫岗位的同仁,可以加一下【公卫】粉丝群,分享生态学研究、空间分析、时间序列、监测数据分析、时空面板技巧等工作科研自动化内容。
有实验室数据分析需求的同仁们,可以加入【生信】粉丝群,交流NCBI(基因序列)、UniProt(蛋白质)、KEGG(通路)、GEO(公共数据集)等公共数据库、基因组学转录组学蛋白组学代谢组学表型组学等数据分析和可视化内容。
或者可扫码直接加微信进群!!!
精品视频课程-“医学统计数据分析”视频号付费合集
在“医学统计数据分析”视频号-付费合集兑换相应课程后,获取课程理论课PPT、代码、基础数据等相关资料,请大家在【医学统计数据分析】公众号右下角,找到“联系作者”,加我微信后打包发送。感谢您的支持!!

