概念:这些是线性回归的正则化版本,用于处理多重共线性和过拟合问题。
原理:
岭回归:在损失函数中加入L2正则项,即∑(y_i - ŷ_i)^2 + λ∑β_j^2,防止系数过大。
套索回归:加入L1正则项,即∑(y_i - ŷ_i)^2 + λ∑|β_j|,可以使一些系数为零,实现特征选择。
弹性网络:结合L1和L2正则项,即∑(y_i - ŷ_i)^2 + λ1∑|β_j| + λ2∑β_j^2。
思想:通过正则化约束模型复杂度,避免过拟合,同时套索回归可以进行特征选择。
应用:高维数据、特征选择、共线性数据。
-数据预处理:
-模型构建:
-训练:
-评估:
-可视化:
-保存结果:
定量变量预测的机器学习模型可分为传统统计模型、树基集成模型、核方法和深度学习模型四大类,每类模型通过不同机制捕捉数据模式,适用于从线性到复杂非线性关系的预测任务。
代码涵盖了从数据准备到结果保存的自动化过程,包括数据预处理、模型配置、性能评估和报告生成。
# pip install pandas numpy matplotlib seaborn scikit-learn joblib openpyxl reportlab
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.linear_model import RidgeCV, LassoCV, ElasticNetCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import os
import warnings
from pathlib import Path
import scipy.stats as stats
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
import matplotlib.font_manager as fm
import joblib
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun']
plt.rcParams['axes.unicode_minus'] = False
# 忽略警告
warnings.filterwarnings('ignore')
class RegularizationAnalysis:
def __init__(self, data_path, output_dir):
self.data_path = data_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.data = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.X_train_scaled = None
self.X_test_scaled = None
self.scaler = None
self.models = {}
self.performance_comparison = pd.DataFrame()
def load_data(self):
"""读取数据"""
self.data = pd.read_excel(self.data_path, sheet_name='示例数据')
print("数据形状:", self.data.shape)
print("\n数据前5行:")
print(self.data.head())
print("\n数据基本信息:")
print(self.data.info())
print("\n数据描述性统计:")
print(self.data.describe())
def preprocess_data(self):
"""数据预处理"""
# 处理分类变量
categorical_columns = ['结局', '肥胖程度', '教育水平', '血型', '指标8']
label_encoders = {}
for col in categorical_columns:
if col in self.data.columns:
le = LabelEncoder()
self.data[col] = le.fit_transform(self.data[col].astype(str))
label_encoders[col] = le
# 检查缺失值
missing_values = self.data.isnull().sum().sum()
if missing_values > 0:
print(f"删除 {missing_values} 个含有缺失值的行")
self.data = self.data.dropna()
return label_encoders
def split_data(self, test_size=0.3):
"""划分训练集和测试集"""
# 移除序号列
features = self.data.drop(['序号', '指标1'], axis=1, errors='ignore')
target = self.data['指标1']
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
features, target, test_size=test_size, random_state=42
)
# 数据标准化
self.scaler = StandardScaler()
self.X_train_scaled = self.scaler.fit_transform(self.X_train)
self.X_test_scaled = self.scaler.transform(self.X_test)
print(f"训练集样本量: {len(self.X_train)}")
print(f"测试集样本量: {len(self.X_test)}")
print(f"特征数量: {self.X_train.shape[1]}")
def calculate_metrics(self, y_true, y_pred):
"""计算性能指标"""
r2 = r2_score(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
return r2, rmse, mae, mse
def train_linear_regression(self):
"""训练线性回归模型"""
print("正在训练线性回归模型...")
model = LinearRegression()
model.fit(self.X_train, self.y_train)
# 预测
train_pred = model.predict(self.X_train)
test_pred = model.predict(self.X_test)
# 计算性能指标
train_r2, train_rmse, train_mae, train_mse = self.calculate_metrics(self.y_train, train_pred)
test_r2, test_rmse, test_mae, test_mse = self.calculate_metrics(self.y_test, test_pred)
best_alpha = ridge_cv.alpha_
print(f"岭回归最佳alpha: {best_alpha}")
# 使用最佳参数训练最终模型
model = Ridge(alpha=best_alpha)
model.fit(self.X_train_scaled, self.y_train)
# 预测
train_pred = model.predict(self.X_train_scaled)
test_pred = model.predict(self.X_test_scaled)
# 计算性能指标
train_r2, train_rmse, train_mae, train_mse = self.calculate_metrics(self.y_train, train_pred)
test_r2, test_rmse, test_mae, test_mse = self.calculate_metrics(self.y_test, test_pred)
# 获取系数
coef_df = pd.DataFrame({
'Variable': ['Intercept'] + list(self.X_train.columns),
'Coefficient': [model.intercept_] + list(model.coef_)
})
# 计算选择的变量数(系数不为0的变量)
selected_vars = np.sum(model.coef_ != 0)
performance = {
'Model': 'Ridge Regression',
'Train_R2': train_r2,
'Test_R2': test_r2,
'Train_RMSE': train_rmse,
'Test_RMSE': test_rmse,
'Train_MAE': train_mae,
'Test_MAE': test_mae,
'Parameters': f'alpha={best_alpha:.6f}',
'Selected_Vars': selected_vars
}
self.models['Ridge'] = model
self.ridge_cv = ridge_cv
return performance, coef_df
def train_lasso_regression(self):
"""训练Lasso回归模型"""
print("正在训练Lasso回归模型...")
# 使用交叉验证选择最佳alpha
alphas = np.logspace(-6, 1, 20)
lasso_cv = LassoCV(alphas=alphas, cv=10, random_state=42)
lasso_cv.fit(self.X_train_scaled, self.y_train)
best_alpha = lasso_cv.alpha_
print(f"Lasso回归最佳alpha: {best_alpha}")
# 使用最佳参数训练最终模型
model = Lasso(alpha=best_alpha, random_state=42)
model.fit(self.X_train_scaled, self.y_train)
# 预测
train_pred = model.predict(self.X_train_scaled)
test_pred = model.predict(self.X_test_scaled)
# 计算性能指标
train_r2, train_rmse, train_mae, train_mse = self.calculate_metrics(self.y_train, train_pred)
test_r2, test_rmse, test_mae, test_mse = self.calculate_metrics(self.y_test, test_pred)
# 获取系数
coef_df = pd.DataFrame({
'Variable': ['Intercept'] + list(self.X_train.columns),
'Coefficient': [model.intercept_] + list(model.coef_)
})
# 计算选择的变量数(系数不为0的变量)
selected_vars = np.sum(model.coef_ != 0)
performance = {
'Model': 'Lasso Regression',
'Train_R2': train_r2,
'Test_R2': test_r2,
'Train_RMSE': train_rmse,
'Test_RMSE': test_rmse,
'Train_MAE': train_mae,
'Test_MAE': test_mae,
'Parameters': f'alpha={best_alpha:.6f}',
'Selected_Vars': selected_vars
}
self.models['Lasso'] = model
self.lasso_cv = lasso_cv
return performance, coef_df
def train_elastic_net(self):
"""训练弹性网络模型"""
print("正在训练弹性网络模型...")
# 使用ElasticNetCV进行参数调优
l1_ratios = [0.1, 0.3, 0.5, 0.7, 0.9]
enet_cv = ElasticNetCV(l1_ratio=l1_ratios, cv=5, random_state=42, n_jobs=-1)
enet_cv.fit(self.X_train_scaled, self.y_train)
best_alpha = enet_cv.alpha_
best_l1_ratio = enet_cv.l1_ratio_
print(f"弹性网络最佳alpha: {best_alpha}, 最佳l1_ratio: {best_l1_ratio}")
# 使用最佳参数训练最终模型
model = ElasticNet(alpha=best_alpha, l1_ratio=best_l1_ratio, random_state=42)
model.fit(self.X_train_scaled, self.y_train)
# 预测
train_pred = model.predict(self.X_train_scaled)
test_pred = model.predict(self.X_test_scaled)
# 计算性能指标
train_r2, train_rmse, train_mae, train_mse = self.calculate_metrics(self.y_train, train_pred)
test_r2, test_rmse, test_mae, test_mse = self.calculate_metrics(self.y_test, test_pred)
# 获取系数
coef_df = pd.DataFrame({
'Variable': ['Intercept'] + list(self.X_train.columns),
'Coefficient': [model.intercept_] + list(model.coef_)
})
# 计算选择的变量数(系数不为0的变量)
selected_vars = np.sum(model.coef_ != 0)
performance = {
'Model': 'Elastic Net',
'Train_R2': train_r2,
'Test_R2': test_r2,
'Train_RMSE': train_rmse,
'Test_RMSE': test_rmse,
'Train_MAE': train_mae,
'Test_MAE': test_mae,
'Parameters': f'alpha={best_alpha:.2f}, l1_ratio={best_l1_ratio:.2f}',
'Selected_Vars': selected_vars
}
self.models['ElasticNet'] = model
self.enet_cv = enet_cv
return performance, coef_df
def plot_performance_comparison(self):
"""模型性能比较图"""
performance_long = pd.melt(
self.performance_comparison,
id_vars=['Model'],
value_vars=['Train_R2', 'Test_R2', 'Train_RMSE', 'Test_RMSE'],
var_name='Metric_Dataset',
value_name='Value'
)
# 分离指标和数据集
performance_long[['Metric', 'Dataset']] = performance_long['Metric_Dataset'].str.split('_', expand=True)
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# R²比较
r2_data = performance_long[performance_long['Metric'] == 'R2']
sns.barplot(data=r2_data, x='Model', y='Value', hue='Dataset', ax=axes[0])
axes[0].set_title('R-squared Comparison')
axes[0].set_ylabel('R-squared')
axes[0].tick_params(axis='x', rotation=45)
# RMSE比较
rmse_data = performance_long[performance_long['Metric'] == 'RMSE']
sns.barplot(data=rmse_data, x='Model', y='Value', hue='Dataset', ax=axes[1])
axes[1].set_title('RMSE Comparison')
axes[1].set_ylabel('RMSE')
axes[1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(self.output_dir / 'performance_comparison.jpg', dpi=300, bbox_inches='tight')
plt.close()
def plot_test_predictions_comparison(self):
"""测试集预测效果比较"""
test_predictions = {'Actual': self.y_test}
# 为每个模型生成预测
for name, model in self.models.items():
if name == 'Linear':
pred = model.predict(self.X_test)
else:
pred = model.predict(self.X_test_scaled)
test_predictions[name] = pred
test_df = pd.DataFrame(test_predictions)
test_long = pd.melt(test_df, id_vars=['Actual'],
var_name='Model', value_name='Predicted')
plt.figure(figsize=(10, 8))
sns.scatterplot(data=test_long, x='Actual', y='Predicted', hue='Model', alpha=0.6)
plt.plot([self.y_test.min(), self.y_test.max()],
[self.y_test.min(), self.y_test.max()], 'k--', alpha=0.5)
plt.title('Test Set Predictions Comparison')
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(self.output_dir / 'test_predictions_comparison.jpg', dpi=300, bbox_inches='tight')
plt.close()
def plot_coefficient_comparison(self, coef_data_dict):
"""系数比较图"""
# 合并所有模型的系数
all_coefs = []
for model_name, coef_df in coef_data_dict.items():
coef_df['Model'] = model_name
all_coefs.append(coef_df)
coef_comparison = pd.concat(all_coefs, ignore_index=True)
# 移除截距项
coef_comparison = coef_comparison[coef_comparison['Variable'] != 'Intercept']
# 选择系数绝对值最大的10个变量
top_vars = (coef_comparison.groupby('Variable')['Coefficient']
.apply(lambda x: max(abs(x)))
.nlargest(10)
.index)
coef_filtered = coef_comparison[coef_comparison['Variable'].isin(top_vars)]
plt.figure(figsize=(12, 8))
sns.barplot(data=coef_filtered, x='Variable', y='Coefficient', hue='Model')
plt.title('Coefficient Comparison Across Models')
plt.xlabel('Variables')
plt.ylabel('Coefficient Value')
plt.xticks(rotation=45)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(self.output_dir / 'coefficient_comparison.jpg', dpi=300, bbox_inches='tight')
plt.close()
def plot_overfitting_analysis(self):
"""过拟合分析图"""
overfitting_data = self.performance_comparison.copy()
overfitting_data['Overfitting_Gap'] = overfitting_data['Train_R2'] - overfitting_data['Test_R2']
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# R²比较
x = range(len(overfitting_data))
width = 0.35
ax1.bar([i - width / 2 for i in x], overfitting_data['Train_R2'], width, label='Training R²', alpha=0.7)
ax1.bar([i + width / 2 for i in x], overfitting_data['Test_R2'], width, label='Testing R²', alpha=0.7)
ax1.set_xlabel('Model')
ax1.set_ylabel('R-squared')
ax1.set_title('Training vs Testing R-squared')
ax1.set_xticks(x)
ax1.set_xticklabels(overfitting_data['Model'], rotation=45)
ax1.legend()
# 过拟合差距
ax2.bar(overfitting_data['Model'], overfitting_data['Overfitting_Gap'], alpha=0.7)
ax2.axhline(y=0.1, color='r', linestyle='--', alpha=0.7, label='Overfitting Threshold')
ax2.set_xlabel('Model')
ax2.set_ylabel('Overfitting Gap (Train R² - Test R²)')
ax2.set_title('Overfitting Analysis')
ax2.tick_params(axis='x', rotation=45)
ax2.legend()
plt.tight_layout()
plt.savefig(self.output_dir / 'overfitting_analysis.jpg', dpi=300, bbox_inches='tight')
plt.close()
return overfitting_data
def plot_variable_selection(self):
"""变量选择结果图"""
selection_data = self.performance_comparison[['Model', 'Selected_Vars']].copy()
selection_data['Total_Variables'] = len(self.X_train.columns)
plt.figure(figsize=(10, 6))
ax = sns.barplot(data=selection_data, x='Model', y='Selected_Vars')
plt.title('Variable Selection Results')
plt.xlabel('Model')
plt.ylabel('Number of Selected Variables')
plt.xticks(rotation=45)
# 添加数值标签
for i, v in enumerate(selection_data['Selected_Vars']):
ax.text(i, v + 0.1, str(v), ha='center', va='bottom')
plt.tight_layout()
plt.savefig(self.output_dir / 'variable_selection.jpg', dpi=300, bbox_inches='tight')
plt.close()
return selection_data
def plot_cv_curves(self):
"""交叉验证曲线比较"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 岭回归CV曲线
if hasattr(self, 'ridge_cv'):
alphas = self.ridge_cv.alphas
cv_scores = []
for alpha in alphas:
model = Ridge(alpha=alpha)
scores = cross_val_score(model, self.X_train_scaled, self.y_train,
cv=5, scoring='neg_mean_squared_error')
cv_scores.append(-np.mean(scores))
axes[0].plot(alphas, cv_scores, 'b-', label='Ridge CV')
axes[0].axvline(self.ridge_cv.alpha_, color='r', linestyle='--',
label=f'Best alpha: {self.ridge_cv.alpha_:.6f}')
axes[0].set_xscale('log')
axes[0].set_xlabel('Alpha')
axes[0].set_ylabel('Mean Squared Error')
axes[0].set_title('Ridge Regression CV Curve')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Lasso CV曲线
if hasattr(self, 'lasso_cv'):
axes[1].plot(self.lasso_cv.alphas_, self.lasso_cv.mse_path_.mean(axis=1), 'g-', label='Lasso CV')
axes[1].axvline(self.lasso_cv.alpha_, color='r', linestyle='--',
label=f'Best alpha: {self.lasso_cv.alpha_:.6f}')
axes[1].set_xscale('log')
axes[1].set_xlabel('Alpha')
axes[1].set_ylabel('Mean Squared Error')
axes[1].set_title('Lasso Regression CV Curve')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 弹性网络CV曲线
if hasattr(self, 'enet_cv'):
# 获取传入的l1_ratios列表
l1_ratios = self.enet_cv.l1_ratio
# 如果l1_ratios是标量,则转换为列表
if np.isscalar(l1_ratios):
l1_ratios = [l1_ratios]
# 计算每个l1_ratio对应的平均MSE(在折叠和alpha上取平均?)
# 实际上,mse_path_的形状为 (n_l1_ratios, n_alphas, n_folds)
# 我们先在折叠上取平均,然后在每个l1_ratio上选择最佳alpha对应的MSE
mean_mse = np.mean(self.enet_cv.mse_path_, axis=2) # 在折叠上取平均,形状 (n_l1_ratios, n_alphas)
# 然后找到每个l1_ratio对应的最小MSE
min_mse = np.min(mean_mse, axis=1) # 形状 (n_l1_ratios,)
# 找到全局最佳l1_ratio的索引
best_idx = np.argmin(min_mse)
best_l1_ratio = l1_ratios[best_idx]
axes[2].plot(l1_ratios, min_mse, 'purple', label='ElasticNet CV')
axes[2].axvline(best_l1_ratio, color='r', linestyle='--',
label=f'Best l1_ratio: {best_l1_ratio:.2f}')
axes[2].set_xlabel('L1 Ratio')
axes[2].set_ylabel('Mean Squared Error')
axes[2].set_title('ElasticNet CV Curve')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(self.output_dir / 'cv_curve_comparison.jpg', dpi=300, bbox_inches='tight')
plt.close()
def save_results(self, coef_data_dict):
"""保存结果到CSV文件"""
# 保存性能比较结果
self.performance_comparison.to_csv(
self.output_dir / '模型性能比较.csv',
index=False, encoding='utf-8-sig'
)
# 保存各模型的系数
for model_name, coef_df in coef_data_dict.items():
filename = f"{model_name.replace(' ', '_')}_coefficients.csv"
coef_df.to_csv(
self.output_dir / filename,
index=False, encoding='utf-8-sig'
)
def generate_report(self, overfitting_data, selection_data):
"""生成分析报告"""
doc = SimpleDocTemplate(str(self.output_dir / "正则化回归模型综合分析报告.pdf"),
pagesize=letter)
styles = getSampleStyleSheet()
story = []
# 标题
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=16,
spaceAfter=30,
alignment=1
)
story.append(Paragraph("正则化回归模型综合分析报告", title_style))
story.append(Paragraph(f"生成日期: {pd.Timestamp.now().strftime('%Y-%m-%d')}", styles["Normal"]))
story.append(Spacer(1, 12))
# 数据概述
story.append(Paragraph("数据概述", styles["Heading2"]))
data_info = [
["总样本量:", str(len(self.data))],
["训练集样本量:", str(len(self.X_train))],
["测试集样本量:", str(len(self.X_test))],
["特征数量:", str(len(self.X_train.columns))]
]
data_table = Table(data_info)
data_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(data_table)
story.append(Spacer(1, 12))
# 模型性能比较
story.append(Paragraph("模型性能比较", styles["Heading2"]))
perf_data = [self.performance_comparison.columns.tolist()] + self.performance_comparison.values.tolist()
perf_table = Table(perf_data)
perf_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTSIZE', (0, 1), (-1, -1), 8)
]))
story.append(perf_table)
story.append(Spacer(1, 12))
# 最佳模型识别
best_test_r2 = self.performance_comparison['Test_R2'].max()
best_model = self.performance_comparison.loc[self.performance_comparison['Test_R2'].idxmax(), 'Model']
story.append(Paragraph("最佳模型识别", styles["Heading3"]))
story.append(Paragraph(f"基于测试集R平方,最佳模型为: {best_model}", styles["Normal"]))
story.append(Paragraph(f"测试集R平方: {best_test_r2:.4f}", styles["Normal"]))
story.append(Spacer(1, 12))
# 过拟合分析
story.append(Paragraph("过拟合分析", styles["Heading3"]))
overfitting_display = overfitting_data[['Model', 'Train_R2', 'Test_R2', 'Overfitting_Gap']].round(4)
overfitting_list = [overfitting_display.columns.tolist()] + overfitting_display.values.tolist()
overfitting_table = Table(overfitting_list)
overfitting_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTSIZE', (0, 1), (-1, -1), 8)
]))
story.append(overfitting_table)
story.append(Spacer(1, 12))
# 变量选择总结
story.append(Paragraph("变量选择总结", styles["Heading3"]))
selection_display = selection_data.round(4)
selection_list = [selection_display.columns.tolist()] + selection_display.values.tolist()
selection_table = Table(selection_list)
selection_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTSIZE', (0, 1), (-1, -1), 8)
]))
story.append(selection_table)
story.append(Spacer(1, 12))
# 结论与建议
story.append(Paragraph("结论与建议", styles["Heading2"]))
story.append(Paragraph("基于综合分析,主要发现如下:", styles["Normal"]))
# 根据性能给出建议
if best_model == "Linear Regression":
story.append(Paragraph("- 线性回归表现最佳,数据可能没有严重的多重共线性问题", styles["Normal"]))
elif best_model == "Ridge Regression":
story.append(Paragraph("- 岭回归表现最佳,数据可能存在多重共线性,L2正则化有效", styles["Normal"]))
elif best_model == "Lasso Regression":
story.append(Paragraph("- Lasso回归表现最佳,特征选择对模型性能有积极影响", styles["Normal"]))
else:
story.append(Paragraph("- 弹性网络表现最佳,结合了L1和L2正则化的优势", styles["Normal"]))
# 过拟合情况
avg_overfitting_gap = overfitting_data['Overfitting_Gap'].mean()
if avg_overfitting_gap > 0.1:
story.append(Paragraph("- 存在明显的过拟合现象,建议加强正则化", styles["Normal"]))
elif avg_overfitting_gap < 0.05:
story.append(Paragraph("- 模型泛化能力良好", styles["Normal"]))
else:
story.append(Paragraph("- 模型具有一定的泛化能力,但仍有改进空间", styles["Normal"]))
# 变量选择情况
lasso_selected_vars = selection_data[selection_data['Model'] == 'Lasso Regression']['Selected_Vars'].values[0]
total_vars = len(self.X_train.columns)
if lasso_selected_vars < total_vars * 0.5:
story.append(Paragraph("- Lasso成功进行了特征选择,减少了特征数量", styles["Normal"]))
story.append(Paragraph("推荐使用策略:", styles["Normal"]))
story.append(Paragraph(f"1. 生产环境推荐使用: {best_model} 模型", styles["Normal"]))
story.append(Paragraph("2. 如需特征选择,可考虑Lasso或弹性网络", styles["Normal"]))
story.append(Paragraph("3. 如数据存在多重共线性,推荐岭回归或弹性网络", styles["Normal"]))
story.append(Paragraph("4. 可进一步尝试其他机器学习算法进行比较", styles["Normal"]))
doc.build(story)
print("PDF报告生成成功!")
def run_analysis(self):
"""运行完整分析流程"""
print("=== 开始正则化回归模型分析 ===")
# 1. 加载数据
print("1. 加载数据...")
self.load_data()
# 2. 数据预处理
print("2. 数据预处理...")
self.preprocess_data()
# 3. 划分数据集
print("3. 划分数据集...")
self.split_data()
# 4. 训练所有模型
print("4. 训练所有模型...")
coef_data_dict = {}
# 线性回归
linear_perf, linear_coef = self.train_linear_regression()
self.performance_comparison = pd.concat([self.performance_comparison,
pd.DataFrame([linear_perf])], ignore_index=True)
coef_data_dict['Linear_Regression'] = linear_coef
# 岭回归
ridge_perf, ridge_coef = self.train_ridge_regression()
self.performance_comparison = pd.concat([self.performance_comparison,
pd.DataFrame([ridge_perf])], ignore_index=True)
coef_data_dict['Ridge_Regression'] = ridge_coef
# Lasso回归
lasso_perf, lasso_coef = self.train_lasso_regression()
self.performance_comparison = pd.concat([self.performance_comparison,
pd.DataFrame([lasso_perf])], ignore_index=True)
coef_data_dict['Lasso_Regression'] = lasso_coef
# 弹性网络
enet_perf, enet_coef = self.train_elastic_net()
self.performance_comparison = pd.concat([self.performance_comparison,
pd.DataFrame([enet_perf])], ignore_index=True)
coef_data_dict['Elastic_Net'] = enet_coef
# 5. 可视化分析
print("5. 生成可视化图表...")
self.plot_performance_comparison()
self.plot_test_predictions_comparison()
self.plot_coefficient_comparison(coef_data_dict)
overfitting_data = self.plot_overfitting_analysis()
selection_data = self.plot_variable_selection()
self.plot_cv_curves()
# 6. 保存结果
print("6. 保存结果...")
self.save_results(coef_data_dict)
# 7. 生成报告
print("7. 生成分析报告...")
self.generate_report(overfitting_data, selection_data)
# 8. 保存模型
joblib.dump(self.models, self.output_dir / 'all_models.pkl')
joblib.dump(self.scaler, self.output_dir / 'scaler.pkl')
# 输出关键信息
print("\n=== 正则化回归模型分析完成 ===")
print("所有结果已保存到:", self.output_dir)
# 输出最佳模型信息
best_test_r2 = self.performance_comparison['Test_R2'].max()
best_model = self.performance_comparison.loc[self.performance_comparison['Test_R2'].idxmax(), 'Model']
best_model_selected_vars = selection_data[selection_data['Model'] == best_model]['Selected_Vars'].values[0]
print(f"\n最佳模型信息:")
print(f"模型: {best_model}")
print(f"测试集R平方: {best_test_r2:.4f}")
print(f"选择的变量数: {best_model_selected_vars}")
print(f"\n各模型测试集R平方:")
for _, row in self.performance_comparison.iterrows():
print(f"{row['Model']}: {row['Test_R2']:.4f}")
print(f"\n性能差异分析:")
print(f"训练集平均R平方: {self.performance_comparison['Train_R2'].mean():.4f}")
print(f"测试集平均R平方: {self.performance_comparison['Test_R2'].mean():.4f}")
print(f"平均过拟合差距: {overfitting_data['Overfitting_Gap'].mean():.4f}")
print(f"\n模型选择建议:")
if best_model == "Elastic Net":
print("弹性网络表现最佳,推荐在生产环境中使用。")
print("该模型平衡了特征选择和系数收缩的优势。")
elif best_model == "Lasso Regression":
print("Lasso回归表现良好,特别适合特征选择任务。")
elif best_model == "Ridge Regression":
print("岭回归表现稳定,适合处理多重共线性数据。")
else:
print("线性回归表现尚可,但正则化模型可能有改进空间。")
# 主程序
if __name__ == "__main__":
# 设置路径
desktop_path = Path.home() / "Desktop"
data_path = desktop_path / "示例数据.xlsx"
output_dir = desktop_path / "Results模型-Regularization"
# 检查数据文件是否存在
if not data_path.exists():
print(f"错误: 数据文件不存在: {data_path}")
print("请确保桌面有 '示例数据.xlsx' 文件")
else:
# 运行分析
analysis = RegularizationAnalysis(data_path, output_dir)
analysis.run_analysis()
2. 套索回归(Lasso Regression)、岭回归(Ridge Regression)、弹性网络(Elastic Net)
概念
正则化线性回归方法,通过添加惩罚项防止过拟合。
原理
- 岭回归:L2正则化,$\min \sum(y_i - \hat{y}_i)^2 + \lambda\sum\beta_j^2$
- 套索回归:L1正则化,$\min \sum(y_i - \hat{y}_i)^2 + \lambda\sum|\beta_j|$
- 弹性网络:结合L1和L2,$\min \sum(y_i - \hat{y}_i)^2 + \lambda_1\sum|\beta_j| + \lambda_2\sum\beta_j^2$
思想
通过惩罚大系数来简化模型,提高泛化能力。
应用
- 高维数据特征选择
- 多重共线性处理
- 基因组学、文本挖掘
医学统计数据分析分享交流SPSS、R语言、Python、ArcGis、Geoda、GraphPad、数据分析图表制作等心得。承接数据分析,论文返修,医学统计,机器学习,生存分析,空间分析,问卷分析业务。若有投稿和数据分析代做需求,可以直接联系我,谢谢!
“医学统计数据分析”公众号右下角;
找到“联系作者”,
可加我微信,邀请入粉丝群!
有临床流行病学数据分析
如(t检验、方差分析、χ2检验、logistic回归)、
(重复测量方差分析与配对T检验、ROC曲线)、
(非参数检验、生存分析、样本含量估计)、
(筛检试验:灵敏度、特异度、约登指数等计算)、
(绘制柱状图、散点图、小提琴图、列线图等)、
机器学习、深度学习、生存分析
等需求的同仁们,加入【临床】粉丝群。
疾控,公卫岗位的同仁,可以加一下【公卫】粉丝群,分享生态学研究、空间分析、时间序列、监测数据分析、时空面板技巧等工作科研自动化内容。
有实验室数据分析需求的同仁们,可以加入【生信】粉丝群,交流NCBI(基因序列)、UniProt(蛋白质)、KEGG(通路)、GEO(公共数据集)等公共数据库、基因组学转录组学蛋白组学代谢组学表型组学等数据分析和可视化内容。
或者可扫码直接加微信进群!!!
精品视频课程-“医学统计数据分析”视频号付费合集
在“医学统计数据分析”视频号-付费合集兑换相应课程后,获取课程理论课PPT、代码、基础数据等相关资料,请大家在【医学统计数据分析】公众号右下角,找到“联系作者”,加我微信后打包发送。感谢您的支持!!

