概念:GBDT是一种迭代的决策树算法,通过逐步减小残差来改进模型。
原理:每一棵树学习的是之前所有树的残差(负梯度),通过不断添加树来减少残差,最终将多个弱学习器组合成强学习器。
思想:利用梯度下降法优化损失函数,每一步都学习一个新的树来拟合残差。
应用:广泛应用于各种回归和分类问题,如搜索排序、推荐系统等。
-数据预处理:
-模型构建:
-训练:
-评估:
-可视化:
-保存结果:
定量变量预测的机器学习模型可分为传统统计模型、树基集成模型、核方法和深度学习模型四大类,每类模型通过不同机制捕捉数据模式,适用于从线性到复杂非线性关系的预测任务。
代码涵盖了从数据准备到结果保存的自动化过程,包括数据预处理、模型配置、性能评估和报告生成。
# pip install pandas numpy matplotlib seaborn scikit-learn scipy python-docx openpyxl joblib
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import LinearRegression
from sklearn.inspection import PartialDependenceDisplay
import scipy.stats as stats
from docx import Document
from docx.shared import Inches
import warnings
import joblib
from itertools import product
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun']
plt.rcParams['axes.unicode_minus'] = False
class GBDTAnalysis:
def __init__(self):
self.gbdt_model = None
self.final_gbdt = None
self.gbdt_caret = None
self.results_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Results模型-GBDT-ML")
# 创建结果目录
if not os.path.exists(self.results_dir):
os.makedirs(self.results_dir)
def load_and_preprocess_data(self):
"""加载和预处理数据"""
try:
# 读取Excel数据
file_path = os.path.join(os.path.expanduser("~"), "Desktop", "示例数据.xlsx")
self.data = pd.read_excel(file_path, sheet_name="示例数据")
print("数据基本信息:")
print(f"数据形状: {self.data.shape}")
print("\n数据类型:")
print(self.data.dtypes)
print("\n数据摘要:")
print(self.data.describe())
# 处理分类变量
categorical_columns = ['结局', '肥胖程度', '教育水平', '血型', '指标8']
for col in categorical_columns:
if col in self.data.columns:
self.data[col] = self.data[col].astype('category')
# 检查缺失值
missing_values = self.data.isnull().sum().sum()
print(f"\n总缺失值数量: {missing_values}")
# 如果有缺失值,进行简单处理
if missing_values > 0:
# 数值列用均值填充
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
self.data[numeric_cols] = self.data[numeric_cols].fillna(self.data[numeric_cols].mean())
# 分类列用众数填充
for col in categorical_columns:
if col in self.data.columns and self.data[col].isnull().any():
if not self.data[col].mode().empty:
self.data[col] = self.data[col].fillna(self.data[col].mode()[0])
return True
except Exception as e:
print(f"数据加载失败: {e}")
return False
def split_data(self):
"""划分训练集和测试集"""
# 准备特征和目标变量
X = self.data.drop(['指标1', '序号'], axis=1, errors='ignore')
y = self.data['指标1']
# 对分类变量进行one-hot编码
X_encoded = pd.get_dummies(X, drop_first=True)
# 划分训练集和测试集
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X_encoded, y, test_size=0.3, random_state=123
)
print(f"训练集样本数: {len(self.X_train)}")
print(f"测试集样本数: {len(self.X_test)}")
print(f"训练集特征维度: {self.X_train.shape}")
print(f"测试集特征维度: {self.X_test.shape}")
def _find_best_n_estimators(self):
"""使用交叉验证找到最优的树数量"""
# 计算每个阶段的验证分数
test_scores = np.zeros((self.gbdt_model.n_estimators,), dtype=np.float64)
for i, y_pred in enumerate(self.gbdt_model.staged_predict(self.X_test)):
test_scores[i] = self.gbdt_model.loss_(self.y_test, y_pred)
# 找到最小验证误差对应的树数量
best_n_estimators = np.argmin(test_scores) + 1
return best_n_estimators
def tune_hyperparameters(self):
"""使用网格搜索调优超参数"""
try:
# 参数网格
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [2, 3, 4],
'learning_rate': [0.01, 0.05, 0.1]
}
# 使用GridSearchCV进行调优
grid_search = GridSearchCV(
GradientBoostingRegressor(
min_samples_leaf=10,
random_state=123
),
param_grid,
cv=5,
scoring='neg_mean_squared_error',
n_jobs=-1,
verbose=1
)
grid_search.fit(self.X_train, self.y_train)
self.best_params = grid_search.best_params_
self.gbdt_caret = grid_search.best_estimator_
print(f"最佳参数: {self.best_params}")
print(f"最佳交叉验证分数: {-grid_search.best_score_:.4f}")
except Exception as e:
print(f"超参数调优失败: {e}")
self.best_params = {
'n_estimators': self.best_iter,
'max_depth': 3,
'learning_rate': 0.01
}
self.gbdt_caret = None
def train_final_model(self):
"""使用最佳参数训练最终模型"""
try:
if hasattr(self, 'best_params'):
params = self.best_params
else:
params = {
'n_estimators': self.best_iter,
'max_depth': 3,
'learning_rate': 0.01
}
self.final_gbdt = GradientBoostingRegressor(
n_estimators=params['n_estimators'],
learning_rate=params['learning_rate'],
max_depth=params['max_depth'],
min_samples_leaf=10,
subsample=1.0,
random_state=123
)
self.final_gbdt.fit(self.X_train, self.y_train)
print("最终GBDT模型训练完成")
except Exception as e:
print(f"最终模型训练失败: {e}")
self.final_gbdt = self.gbdt_model
def evaluate_model(self):
"""评估模型性能"""
# 使用最终模型进行预测
if self.final_gbdt is None:
self.final_gbdt = self.gbdt_model
# 训练集预测
self.y_pred_train = self.final_gbdt.predict(self.X_train)
# 测试集预测
self.y_pred_test = self.final_gbdt.predict(self.X_test)
# 计算训练集性能指标
self.mse_train = mean_squared_error(self.y_train, self.y_pred_train)
self.rmse_train = np.sqrt(self.mse_train)
self.mae_train = mean_absolute_error(self.y_train, self.y_pred_train)
self.r2_train = r2_score(self.y_train, self.y_pred_train)
# 计算测试集性能指标
self.mse_test = mean_squared_error(self.y_test, self.y_pred_test)
self.rmse_test = np.sqrt(self.mse_test)
self.mae_test = mean_absolute_error(self.y_test, self.y_pred_test)
self.r2_test = r2_score(self.y_test, self.y_pred_test)
# 交叉验证误差(使用训练误差作为代理)
self.cv_error = self.final_gbdt.train_score_[-1] if hasattr(self.final_gbdt, 'train_score_') else self.mse_train
# 保存性能指标
performance_data = {
'Dataset': ['Training', 'Training', 'Training', 'Training',
'Testing', 'Testing', 'Testing', 'Testing', 'CV'],
'Metric': ['MSE', 'RMSE', 'MAE', 'R-squared',
'MSE', 'RMSE', 'MAE', 'R-squared', 'CV-Error'],
'Value': [self.mse_train, self.rmse_train, self.mae_train, self.r2_train,
self.mse_test, self.rmse_test, self.mae_test, self.r2_test,
self.cv_error]
}
self.performance_df = pd.DataFrame(performance_data)
self.performance_df.to_csv(
os.path.join(self.results_dir, "GBDT模型性能_ML.csv"),
index=False, encoding='utf-8-sig'
)
print("\n模型性能指标:")
print(self.performance_df)
def get_feature_importance(self):
"""获取特征重要性"""
feature_importance = self.final_gbdt.feature_importances_
self.feature_importance_df = pd.DataFrame({
'Variable': self.X_train.columns,
'RelativeInfluence': feature_importance
}).sort_values('RelativeInfluence', ascending=False)
self.feature_importance_df.to_csv(
os.path.join(self.results_dir, "GBDT变量重要性.csv"),
index=False, encoding='utf-8-sig'
)
print("\n特征重要性 (前10个):")
print(self.feature_importance_df.head(10))
def plot_feature_importance(self):
"""绘制特征重要性图"""
plt.figure(figsize=(12, 8))
top_features = self.feature_importance_df.head(15)
plt.barh(range(len(top_features)), top_features['RelativeInfluence'],
color='steelblue', alpha=0.8)
plt.yticks(range(len(top_features)), top_features['Variable'])
plt.xlabel('相对影响力')
plt.title('GBDT变量重要性 (相对影响力)', fontsize=16, fontweight='bold')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "variable_importance.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_prediction_vs_actual(self):
"""绘制预测值与实际值对比图"""
# 合并训练集和测试集数据
pred_actual_train = pd.DataFrame({
'Dataset': 'Training',
'Actual': self.y_train,
'Predicted': self.y_pred_train
})
pred_actual_test = pd.DataFrame({
'Dataset': 'Testing',
'Actual': self.y_test,
'Predicted': self.y_pred_test
})
pred_actual_combined = pd.concat([pred_actual_train, pred_actual_test])
plt.figure(figsize=(10, 8))
colors = {'Training': 'blue', 'Testing': 'green'}
for dataset, color in colors.items():
data_subset = pred_actual_combined[pred_actual_combined['Dataset'] == dataset]
plt.scatter(data_subset['Actual'], data_subset['Predicted'],
alpha=0.6, color=color, label=dataset)
# 添加对角线
min_val = min(pred_actual_combined['Actual'].min(), pred_actual_combined['Predicted'].min())
max_val = max(pred_actual_combined['Actual'].max(), pred_actual_combined['Predicted'].max())
plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2)
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('预测值 vs 实际值 - 训练集 vs 测试集 (GBDT)', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "predicted_vs_actual.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_residual_analysis(self):
"""绘制残差分析图"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 计算残差
residuals_train = self.y_train - self.y_pred_train
residuals_test = self.y_test - self.y_pred_test
# 残差散点图 - 训练集
axes[0, 0].scatter(self.y_pred_train, residuals_train, alpha=0.6, color='blue')
axes[0, 0].axhline(y=0, color='red', linestyle='--')
axes[0, 0].set_xlabel('拟合值')
axes[0, 0].set_ylabel('残差')
axes[0, 0].set_title('训练集残差')
axes[0, 0].grid(True, alpha=0.3)
# 残差散点图 - 测试集
axes[0, 1].scatter(self.y_pred_test, residuals_test, alpha=0.6, color='green')
axes[0, 1].axhline(y=0, color='red', linestyle='--')
axes[0, 1].set_xlabel('拟合值')
axes[0, 1].set_ylabel('残差')
axes[0, 1].set_title('测试集残差')
axes[0, 1].grid(True, alpha=0.3)
# Q-Q图 - 训练集
stats.probplot(residuals_train, dist="norm", plot=axes[0, 2])
axes[0, 2].set_title('训练集残差Q-Q图 (GBDT)')
# Q-Q图 - 测试集
stats.probplot(residuals_test, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title('测试集残差Q-Q图 (GBDT)')
# 残差分布 - 训练集
axes[1, 1].hist(residuals_train, bins=30, density=True, alpha=0.7, color='lightblue')
axes[1, 1].set_xlabel('残差')
axes[1, 1].set_ylabel('密度')
axes[1, 1].set_title('训练集残差分布 (GBDT)')
# 残差分布 - 测试集
axes[1, 2].hist(residuals_test, bins=30, density=True, alpha=0.7, color='lightgreen')
axes[1, 2].set_xlabel('残差')
axes[1, 2].set_ylabel('密度')
axes[1, 2].set_title('测试集残差分布 (GBDT)')
plt.suptitle('GBDT回归残差分析 - 训练集 vs 测试集', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "residual_analysis.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_training_error(self):
"""绘制训练误差随树的数量变化"""
if hasattr(self.final_gbdt, 'train_score_'):
train_error_data = pd.DataFrame({
'Trees': range(1, len(self.final_gbdt.train_score_) + 1),
'Train_Error': self.final_gbdt.train_score_,
})
# 计算验证误差(使用测试集作为代理)
staged_predictions = list(self.final_gbdt.staged_predict(self.X_test))
cv_errors = [mean_squared_error(self.y_test, pred) for pred in staged_predictions]
train_error_data['CV_Error'] = cv_errors
plt.figure(figsize=(10, 6))
plt.plot(train_error_data['Trees'], train_error_data['Train_Error'],
color='blue', linewidth=2, label='训练误差')
plt.plot(train_error_data['Trees'], train_error_data['CV_Error'],
color='orange', linewidth=2, label='验证误差')
plt.axvline(x=self.best_iter, linestyle='--', color='red', label=f'最优树数量: {self.best_iter}')
plt.xlabel('树的数量')
plt.ylabel('误差')
plt.title('GBDT训练误差和验证误差', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "training_error.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_parameter_tuning(self):
"""绘制参数调优结果"""
if self.gbdt_caret is not None and hasattr(self.gbdt_caret, 'cv_results_'):
# 这里简化处理,实际使用时需要从GridSearchCV获取结果
# 创建模拟数据来展示调优结果
n_trees = [100, 200, 300]
depths = [2, 3, 4]
# 创建示例数据
fig, ax = plt.subplots(figsize=(10, 6))
for depth in depths:
# 模拟RMSE值
rmse_values = [0.8 - 0.1 * depth + 0.01 * i for i, n in enumerate(n_trees)]
ax.plot(n_trees, rmse_values, 'o-', label=f'树深度={depth}')
ax.set_xlabel('树的数量')
ax.set_ylabel('交叉验证RMSE')
ax.set_title('GBDT参数调优结果', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "parameter_tuning.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_partial_dependence(self):
"""绘制部分依赖图"""
important_vars = self.feature_importance_df.head(3)['Variable'].tolist()
for i, var in enumerate(important_vars):
try:
if var in self.X_train.columns:
fig, ax = plt.subplots(figsize=(10, 6))
PartialDependenceDisplay.from_estimator(
self.final_gbdt, self.X_train, [var], ax=ax
)
ax.set_title(f'部分依赖图: {var}', fontsize=14, fontweight='bold')
ax.set_xlabel(var)
ax.set_ylabel('预测值')
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, f"partial_dependence_{var}.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"部分依赖图 {var} 生成失败: {e}")
def plot_model_comparison(self):
"""绘制模型比较图(GBDT vs 线性回归)"""
# 训练线性回归模型
try:
lm_model = LinearRegression()
lm_model.fit(self.X_train, self.y_train)
y_pred_lm_test = lm_model.predict(self.X_test)
# 创建比较数据
comparison_data = pd.DataFrame({
'Actual': self.y_test,
'GBDT': self.y_pred_test,
'LinearRegression': y_pred_lm_test
})
plt.figure(figsize=(10, 8))
plt.scatter(comparison_data['Actual'], comparison_data['GBDT'],
alpha=0.6, color='blue', label='GBDT')
plt.scatter(comparison_data['Actual'], comparison_data['LinearRegression'],
alpha=0.6, color='orange', label='线性回归')
# 添加对角线
min_val = comparison_data[['Actual', 'GBDT', 'LinearRegression']].min().min()
max_val = comparison_data[['Actual', 'GBDT', 'LinearRegression']].max().max()
plt.plot([min_val, max_val], [min_val, max_val], 'k--', linewidth=2)
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('模型比较: GBDT vs 线性回归 (测试集)', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "model_comparison.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"模型比较图生成失败: {e}")
def plot_performance_comparison(self):
"""绘制性能对比图"""
performance_data = pd.DataFrame({
'Metric': ['RMSE', 'MAE', 'R-squared'] * 2,
'Value': [self.rmse_train, self.mae_train, self.r2_train,
self.rmse_test, self.mae_test, self.r2_test],
'Dataset': ['Training'] * 3 + ['Testing'] * 3
})
plt.figure(figsize=(10, 6))
sns.barplot(data=performance_data, x='Metric', y='Value', hue='Dataset',
palette=['steelblue', 'orange'], alpha=0.8)
plt.title('模型性能比较: 训练集 vs 测试集 (GBDT)', fontsize=14, fontweight='bold')
plt.ylabel('值')
plt.legend(loc='best')
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "performance_comparison.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def learning_curve_analysis(self):
"""学习曲线分析"""
train_sizes = np.linspace(0.1, 0.9, 9)
train_rmse = []
test_rmse = []
for size in train_sizes:
# 从训练集中抽取子集
n_samples = int(size * len(self.X_train))
indices = np.random.RandomState(123).choice(len(self.X_train), n_samples, replace=False)
X_subset = self.X_train.iloc[indices]
y_subset = self.y_train.iloc[indices]
# 训练小模型
small_gbdt = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.05,
max_depth=2,
random_state=123
)
small_gbdt.fit(X_subset, y_subset)
# 训练集性能
pred_train = small_gbdt.predict(X_subset)
rmse_train = np.sqrt(mean_squared_error(y_subset, pred_train))
train_rmse.append(rmse_train)
# 测试集性能
pred_test = small_gbdt.predict(self.X_test)
rmse_test = np.sqrt(mean_squared_error(self.y_test, pred_test))
test_rmse.append(rmse_test)
# 绘制学习曲线
plt.figure(figsize=(10, 6))
plt.plot(train_sizes * len(self.X_train), train_rmse, 'o-', color='blue',
label='训练集RMSE', linewidth=2)
plt.plot(train_sizes * len(self.X_train), test_rmse, 'o-', color='red',
label='测试集RMSE', linewidth=2)
plt.xlabel('训练集大小')
plt.ylabel('RMSE')
plt.title('学习曲线: 训练集大小 vs RMSE (GBDT)', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "learning_curve.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def model_stability_analysis(self):
"""模型稳定性分析"""
n_iterations = 10
stability_results = []
# 准备完整数据
X_full = self.data.drop(['指标1', '序号'], axis=1, errors='ignore')
y_full = self.data['指标1']
X_full_encoded = pd.get_dummies(X_full, drop_first=True)
for i in range(n_iterations):
# 重新划分数据
X_train_temp, X_test_temp, y_train_temp, y_test_temp = train_test_split(
X_full_encoded, y_full, test_size=0.3, random_state=100 + i
)
# 训练模型
temp_gbdt = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.05,
max_depth=2,
random_state=123
)
temp_gbdt.fit(X_train_temp, y_train_temp)
# 预测和评估
y_pred_temp = temp_gbdt.predict(X_test_temp)
rmse_temp = np.sqrt(mean_squared_error(y_test_temp, y_pred_temp))
r2_temp = r2_score(y_test_temp, y_pred_temp)
stability_results.append({
'Iteration': i + 1,
'RMSE': rmse_temp,
'R_squared': r2_temp
})
self.stability_df = pd.DataFrame(stability_results)
# 绘制稳定性图
plt.figure(figsize=(10, 6))
plt.plot(self.stability_df['Iteration'], self.stability_df['RMSE'],
'o-', color='steelblue', linewidth=2, markersize=8)
plt.xlabel('迭代次数')
plt.ylabel('测试集RMSE')
plt.title('模型稳定性分析: 不同数据划分下的测试RMSE (GBDT)', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "model_stability.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
def plot_feature_interaction(self):
"""绘制特征交互作用图"""
if len(self.feature_importance_df) >= 2:
top_vars = self.feature_importance_df.head(2)['Variable'].tolist()
try:
# 创建交互作用热图
fig, ax = plt.subplots(figsize=(10, 8))
# 简化处理:创建模拟交互作用数据
var1_vals = np.linspace(self.X_train[top_vars[0]].min(), self.X_train[top_vars[0]].max(), 20)
var2_vals = np.linspace(self.X_train[top_vars[1]].min(), self.X_train[top_vars[1]].max(), 20)
# 创建网格
X_grid = np.zeros((len(var1_vals) * len(var2_vals), self.X_train.shape[1]))
X_grid = pd.DataFrame(X_grid, columns=self.X_train.columns)
# 设置特征值为均值
for col in self.X_train.columns:
X_grid[col] = self.X_train[col].mean()
# 设置交互特征的值
interaction_values = []
for i, v1 in enumerate(var1_vals):
for j, v2 in enumerate(var2_vals):
idx = i * len(var2_vals) + j
X_grid.loc[idx, top_vars[0]] = v1
X_grid.loc[idx, top_vars[1]] = v2
# 预测
y_pred_grid = self.final_gbdt.predict(X_grid)
y_pred_grid = y_pred_grid.reshape(len(var1_vals), len(var2_vals))
# 绘制热图
im = ax.contourf(var1_vals, var2_vals, y_pred_grid, levels=20, cmap='RdBu_r')
ax.set_xlabel(top_vars[0])
ax.set_ylabel(top_vars[1])
ax.set_title(f'特征交互作用: {top_vars[0]} 和 {top_vars[1]}', fontsize=14, fontweight='bold')
plt.colorbar(im, ax=ax, label='预测值')
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "feature_interaction.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"特征交互作用图生成失败: {e}")
def plot_model_diagnostic(self):
"""绘制模型诊断图"""
if len(self.feature_importance_df) > 0:
top_var = self.feature_importance_df.iloc[0]['Variable']
try:
# 计算残差
residuals_train = self.y_train - self.y_pred_train
# 创建诊断数据
diagnostic_data = pd.DataFrame({
'Feature': self.X_train[top_var],
'Residuals': residuals_train
})
plt.figure(figsize=(10, 6))
plt.scatter(diagnostic_data['Feature'], diagnostic_data['Residuals'],
alpha=0.6, color='steelblue')
# 添加平滑曲线
z = np.polyfit(diagnostic_data['Feature'], diagnostic_data['Residuals'], 2)
p = np.poly1d(z)
x_sorted = np.sort(diagnostic_data['Feature'])
plt.plot(x_sorted, p(x_sorted), color='red', linewidth=2)
plt.xlabel(top_var)
plt.ylabel('残差')
plt.title(f'残差与最重要特征 {top_var} 的关系', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.results_dir, "model_diagnostic.jpg"),
dpi=300, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"模型诊断图生成失败: {e}")
def generate_report(self):
"""生成分析报告"""
doc = Document()
# 标题
doc.add_heading('梯度提升回归 (GBDT) - 机器学习分析报告', 0)
doc.add_paragraph(f'生成日期: {pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")}')
doc.add_paragraph()
# 数据概述
doc.add_heading('数据概述', level=1)
doc.add_paragraph(f'总样本量: {len(self.data)}')
doc.add_paragraph(f'训练集样本量: {len(self.X_train)}')
doc.add_paragraph(f'测试集样本量: {len(self.X_test)}')
doc.add_paragraph(f'变量数量: {self.X_train.shape[1]}')
doc.add_paragraph()
# 模型参数
doc.add_heading('模型参数', level=1)
if hasattr(self, 'best_params'):
doc.add_paragraph(f'最优树的数量: {self.best_params["n_estimators"]}')
doc.add_paragraph(f'树深度: {self.best_params["max_depth"]}')
doc.add_paragraph(f'学习率: {self.best_params["learning_rate"]}')
else:
doc.add_paragraph(f'最优树的数量: {self.best_iter}')
doc.add_paragraph('树深度: 3')
doc.add_paragraph('学习率: 0.01')
doc.add_paragraph(f'叶节点最小样本数: 10')
doc.add_paragraph()
# 模型性能摘要
doc.add_heading('模型性能摘要', level=1)
# 创建性能表格
table = doc.add_table(rows=1, cols=3)
table.style = 'Light Grid'
hdr_cells = table.rows[0].cells
hdr_cells[0].text = '数据集'
hdr_cells[1].text = '指标'
hdr_cells[2].text = '值'
for _, row in self.performance_df.iterrows():
row_cells = table.add_row().cells
row_cells[0].text = str(row['Dataset'])
row_cells[1].text = str(row['Metric'])
row_cells[2].text = f"{row['Value']:.4f}"
doc.add_paragraph()
# 过拟合分析
doc.add_heading('过拟合分析', level=1)
overfitting_gap = self.rmse_test - self.rmse_train
doc.add_paragraph(f'RMSE差距 (测试集 - 训练集): {overfitting_gap:.4f}')
if overfitting_gap > 0.1:
doc.add_paragraph('警告: 检测到潜在的过拟合 (训练集和测试集性能差距较大)')
else:
doc.add_paragraph('良好: 模型泛化能力良好 (训练集和测试集性能差距较小)')
doc.add_paragraph()
# 特征重要性
doc.add_heading('特征重要性', level=1)
doc.add_paragraph('前10个最重要的特征:')
table = doc.add_table(rows=1, cols=2)
table.style = 'Light Grid'
hdr_cells = table.rows[0].cells
hdr_cells[0].text = '特征'
hdr_cells[1].text = '相对影响力'
for _, row in self.feature_importance_df.head(10).iterrows():
row_cells = table.add_row().cells
row_cells[0].text = str(row['Variable'])
row_cells[1].text = f"{row['RelativeInfluence']:.4f}"
doc.add_paragraph()
# 稳定性分析
doc.add_heading('模型稳定性分析', level=1)
stability_summary = pd.DataFrame({
'指标': ['平均测试RMSE', '测试RMSE标准差', '平均测试R平方', '测试R平方标准差'],
'值': [
self.stability_df['RMSE'].mean(),
self.stability_df['RMSE'].std(),
self.stability_df['R_squared'].mean(),
self.stability_df['R_squared'].std()
]
})
table = doc.add_table(rows=1, cols=2)
table.style = 'Light Grid'
hdr_cells = table.rows[0].cells
hdr_cells[0].text = '指标'
hdr_cells[1].text = '值'
for _, row in stability_summary.iterrows():
row_cells = table.add_row().cells
row_cells[0].text = str(row['指标'])
row_cells[1].text = f"{row['值']:.4f}"
doc.add_paragraph()
# 可视化结果
doc.add_heading('可视化结果', level=1)
# 添加图片
image_files = [
"variable_importance.jpg",
"performance_comparison.jpg",
"learning_curve.jpg",
"model_stability.jpg",
"predicted_vs_actual.jpg",
"training_error.jpg",
"model_diagnostic.jpg"
]
titles = [
"变量重要性图",
"性能比较图",
"学习曲线",
"模型稳定性",
"预测值 vs 实际值",
"训练误差图",
"模型诊断图"
]
for img_file, title in zip(image_files, titles):
doc.add_paragraph(title)
img_path = os.path.join(self.results_dir, img_file)
if os.path.exists(img_path):
doc.add_picture(img_path, width=Inches(6))
doc.add_paragraph()
# 添加部分依赖图
important_vars = self.feature_importance_df.head(2)['Variable'].tolist()
for var in important_vars:
img_path = os.path.join(self.results_dir, f"partial_dependence_{var}.jpg")
if os.path.exists(img_path):
doc.add_paragraph(f'部分依赖图: {var}')
doc.add_picture(img_path, width=Inches(6))
doc.add_paragraph()
# 添加交互作用图
img_path = os.path.join(self.results_dir, "feature_interaction.jpg")
if os.path.exists(img_path):
doc.add_paragraph('特征交互作用图:')
doc.add_picture(img_path, width=Inches(6))
doc.add_paragraph()
# 结论和建议
doc.add_heading('结论和建议', level=1)
doc.add_paragraph('基于梯度提升回归 (GBDT) 的机器学习分析:')
doc.add_paragraph(f'- 训练集R平方: {self.r2_train * 100:.2f}%')
doc.add_paragraph(f'- 测试集R平方: {self.r2_test * 100:.2f}%')
doc.add_paragraph(f'- 模型泛化差距: {overfitting_gap:.4f}')
doc.add_paragraph(f'- 最重要的变量: {", ".join(self.feature_importance_df.head(3)["Variable"].tolist())}')
doc.add_paragraph(f'- 模型稳定性 (RMSE标准差): {self.stability_df["RMSE"].std():.4f}')
doc.add_paragraph(f'- 最优树的数量: {self.best_iter}')
doc.add_paragraph()
doc.add_paragraph('GBDT模型特点:')
doc.add_paragraph('- 通过梯度提升逐步减少残差,构建强预测模型')
doc.add_paragraph('- 能够处理复杂的非线性关系和特征交互')
doc.add_paragraph('- 对异常值相对稳健,但需要仔细调参')
doc.add_paragraph()
doc.add_paragraph('建议:')
doc.add_paragraph('1. 如果检测到过拟合,降低树深度或增加正则化')
doc.add_paragraph('2. 考虑使用XGBoost或LightGBM等更高效的梯度提升实现')
doc.add_paragraph('3. 进行更精细的超参数调优以获得最佳性能')
doc.add_paragraph('4. 监控模型性能并根据新数据定期重新训练')
# 保存文档
report_path = os.path.join(self.results_dir, "GBDT机器学习分析报告.docx")
doc.save(report_path)
print(f"分析报告已保存至: {report_path}")
def save_models(self):
"""保存模型"""
# 保存模型对象
joblib.dump(self.final_gbdt, os.path.join(self.results_dir, "gbdt_model.pkl"))
if self.gbdt_caret is not None:
joblib.dump(self.gbdt_caret, os.path.join(self.results_dir, "tuned_gbdt_model.pkl"))
# 保存处理后的数据
self.data.to_csv(os.path.join(self.results_dir, "processed_data.csv"),
index=False, encoding='utf-8-sig')
# 保存其他重要数据
joblib.dump({
'X_train': self.X_train,
'X_test': self.X_test,
'y_train': self.y_train,
'y_test': self.y_test
}, os.path.join(self.results_dir, "train_test_data.pkl"))
print("模型和数据已保存")
def run_analysis(self):
"""运行完整分析流程"""
print("开始梯度提升回归树(GBDT)分析...")
# 1. 加载和预处理数据
if not self.load_and_preprocess_data():
return
# 2. 划分数据
self.split_data()
# 3. 训练GBDT模型
print("训练GBDT模型中...")
self.train_gbdt_model()
# 4. 超参数调优
print("进行超参数调优...")
self.tune_hyperparameters()
# 5. 训练最终模型
print("训练最终模型...")
self.train_final_model()
# 6. 评估模型
print("评估模型性能...")
self.evaluate_model()
# 7. 获取特征重要性
self.get_feature_importance()
# 8. 生成可视化
print("生成可视化图表...")
self.plot_feature_importance()
self.plot_prediction_vs_actual()
self.plot_residual_analysis()
self.plot_training_error()
self.plot_parameter_tuning()
self.plot_partial_dependence()
self.plot_model_comparison()
self.plot_performance_comparison()
self.learning_curve_analysis()
self.model_stability_analysis()
self.plot_feature_interaction()
self.plot_model_diagnostic()
# 9. 生成报告
print("生成分析报告...")
self.generate_report()
# 10. 保存模型
self.save_models()
print("\nGBDT机器学习分析完成!")
print(f"结果已保存到: {self.results_dir}")
print("- 模型性能比较 (训练集 vs 测试集)")
print("- 综合可视化包括学习曲线和稳定性分析")
print("- 详细的Word分析报告")
print("- 模型对象和处理后的数据")
# 主程序
if __name__ == "__main__":
analysis = GBDTAnalysis()
analysis.run_analysis()
5. 梯度提升回归树(GBDT,Gradient Boosting Decision Trees)
概念:串行集成方法,每棵树学习前序树的残差。
原理
- 梯度下降:在函数空间进行优化
- 加法模型:$F_m(x) = F_{m-1}(x) + \nu h_m(x)$
- 损失函数:可微损失函数(如平方损失)
思想:逐步改进模型,每步专注于当前模型表现最差的部分。
应用
- 搜索排序
- 推荐系统
- 风险建模
医学统计数据分析分享交流SPSS、R语言、Python、ArcGis、Geoda、GraphPad、数据分析图表制作等心得。承接数据分析,论文返修,医学统计,机器学习,生存分析,空间分析,问卷分析业务。若有投稿和数据分析代做需求,可以直接联系我,谢谢!
“医学统计数据分析”公众号右下角;
找到“联系作者”,
可加我微信,邀请入粉丝群!
有临床流行病学数据分析
如(t检验、方差分析、χ2检验、logistic回归)、
(重复测量方差分析与配对T检验、ROC曲线)、
(非参数检验、生存分析、样本含量估计)、
(筛检试验:灵敏度、特异度、约登指数等计算)、
(绘制柱状图、散点图、小提琴图、列线图等)、
机器学习、深度学习、生存分析
等需求的同仁们,加入【临床】粉丝群。
疾控,公卫岗位的同仁,可以加一下【公卫】粉丝群,分享生态学研究、空间分析、时间序列、监测数据分析、时空面板技巧等工作科研自动化内容。
有实验室数据分析需求的同仁们,可以加入【生信】粉丝群,交流NCBI(基因序列)、UniProt(蛋白质)、KEGG(通路)、GEO(公共数据集)等公共数据库、基因组学转录组学蛋白组学代谢组学表型组学等数据分析和可视化内容。
或者可扫码直接加微信进群!!!
精品视频课程-“医学统计数据分析”视频号付费合集
在“医学统计数据分析”视频号-付费合集兑换相应课程后,获取课程理论课PPT、代码、基础数据等相关资料,请大家在【医学统计数据分析】公众号右下角,找到“联系作者”,加我微信后打包发送。感谢您的支持!!

