赞
踩
构建机器学习模型过程中,往往会涉及很多步骤:数据处理、特征构造、特征筛选、算法选取等等;下面介绍通过pipeline管道将这些常用步骤整合,快速搭建基线模型:
# 数据预处理 from sklearn.impute import SimpleImputer,KNNImputer # 缺失值处理 from category_encoders import CatBoostEncoder, OrdinalEncoder, CountEncoder, OneHotEncoder # 类别编码 from sklearn.preprocessing import PowerTransformer, QuantileTransformer # 数据偏态处理 from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler, RobustScaler # 标准化 # 特征构造 from sklearn.preprocessing import PolynomialFeatures # 多项式 # 特征筛选 from sklearn.feature_selection import f_classif, chi2,f_regression,mutual_info_classif # 过滤式 from sklearn.feature_selection import VarianceThreshold,GenericUnivariateSelect,SelectKBest from sklearn.feature_selection import RFE, RFECV # 包裹式 from sklearn.feature_selection import SelectFromModel # 嵌入式 from sklearn.decomposition import PCA # 降维 # 算法模型 from sklearn.linear_model import LogisticRegression,Lasso,Ridge from sklearn.neighbors import KNeighborsClassifier from sklearn.naive_bayes import CategoricalNB from sklearn.svm import LinearSVC,SVR,SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.neural_network import MLPClassifier import lightgbm as lgb import xgboost as xgb import catboost as ctb # 调参 from sklearn.model_selection import GridSearchCV, RandomizedSearchCV # 流程整合 from sklearn.pipeline import make_pipeline, Pipeline from sklearn.compose import ColumnTransformer # 管道Pipeline class ModelPipeline: """ 功能:快速搭建基线模型 主要方法: imputer(self,**):缺失值填充 encoder_method(self,**):类别编码 concat(self,**):管道合并 scaler_method(self,**):标准化 normal_method(self,**):偏态处理 select_feature(self,**):特征筛选 select_model(self,**):模型算法选取 """ def __init__(self, num_cols, cat_cols): # 区分数值跟类别特征 self.num_cols = num_cols self.cat_cols = cat_cols print(f'数值特征共{len(self.num_cols)}个,离散特征共{len(self.cat_cols)}个') # 缺失值填充 def imputer(self, fill_na='simple'): if fill_na == 'simple': num_pipe = Pipeline(steps=[('imputer', SimpleImputer(strategy='median'))]) cat_pipe = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent'))]) elif fill_na == 'use_value': num_pipe = Pipeline(steps=[('imputer', SimpleImputer(strategy='constant',fill_value=-1))]) cat_pipe = Pipeline(steps=[('imputer', SimpleImputer(strategy='constant',fill_value=-1))]) elif fill_na == 'knn': num_pipe = Pipeline(steps=[('imputer', KNNImputer(n_neighbors=3, weights="uniform"))]) cat_pipe = Pipeline(steps=[('imputer', KNNImputer(n_neighbors=3, weights="uniform"))]) return num_pipe, cat_pipe # 类别编码 def encoder_method(self, fill_na='simple', encoder=None): num_pipe, cat_pipe = self.imputer(fill_na=fill_na) if encoder == 'catboost': cat_pipe.steps.append(('encoder', CatBoostEncoder())) elif encoder == 'ordina': cat_pipe.steps.append(('encoder', OrdinalEncoder())) elif encoder == 'count': cat_pipe.steps.append(('encoder', CountEncoder(min_group_size=0))) elif encoder == 'onehot': cat_pipe.steps.append(('encoder', OneHotEncoder())) return num_pipe, cat_pipe # 合并管道 def concat(self, fill_na='simple', encoder=None): num_pipe, cat_pipe = self.encoder_method(fill_na=fill_na, encoder=encoder) column_pipe = ColumnTransformer([('num', num_pipe, self.num_cols),('cat', cat_pipe, self.cat_cols)]) concat_pipe = Pipeline(steps=[('column',column_pipe)]) return concat_pipe # 标准化 def scaler_method(self, fill_na='simple', encoder=None, scaler=None): concat_pipe = self.concat(fill_na=fill_na, encoder=encoder) if scaler == 'minmax': concat_pipe.steps.append(('scaler', MinMaxScaler())) elif scaler == 'standar': concat_pipe.steps.append(('scaler', StandardScaler())) elif scaler == 'maxabs': concat_pipe.steps.append(('scaler', MaxAbsScaler())) elif scaler == 'robust': concat_pipe.steps.append(('scaler', RobustScaler())) return concat_pipe # 偏态处理 def normal_method(self, fill_na='simple', encoder=None,scaler=None, normal=None): concat_pipe = self.scaler_method(fill_na=fill_na, encoder=encoder, scaler=scaler) if normal == 'power': concat_pipe.steps.append(('normal', PowerTransformer())) # box-cox 只能用于正数 elif normal == 'quantile': concat_pipe.steps.append(('normal', QuantileTransformer(output_distribution="normal"))) return concat_pipe # 特征筛选 def select_feature(self,fill_na='simple',encoder=None,scaler=None,normal=None,select=None): concat_pipe = self.normal_method(fill_na=fill_na, encoder=encoder,scaler=scaler,normal=normal) if select == 'filter': concat_pipe.steps.append(('select',GenericUnivariateSelect(f_classif, mode='k_best', param=50))) elif select == 'wraps': concat_pipe.steps.append(('select',RFECV(SVC(), step=0.5, min_features_to_select=50,cv=3))) elif select == 'embed': concat_pipe.steps.append(('select',SelectFromModel(LogisticRegression(),max_features=50))) return concat_pipe # 模型选择 def select_model(self,fill_na='simple',encoder=None,scaler=None,normal=None,select=None,model='lgb'): concat_pipe = self.select_feature(fill_na=fill_na,encoder=encoder,scaler=scaler,normal=normal,select=select) if model == 'lgb': concat_pipe.steps.append(('model',lgb.LGBMClassifier())) elif model == 'rfc': concat_pipe.steps.append(('model',RandomForestClassifier())) elif model == 'ctb': concat_pipe.steps.append(('model',ctb.CatBoostClassifier(silent=True))) elif model == 'xgb': concat_pipe.steps.append(('model',xgb.XGBClassifier(use_label_encoder=False, eval_metric="logloss"))) elif model == 'mlp': concat_pipe.steps.append(('model',MLPClassifier())) return concat_pipe
框架调用,使用贝叶斯调节参数:
# 参数优化 import hyperopt from hyperopt import hp, fmin, tpe, Trials, partial, STATUS_OK from hyperopt.early_stop import no_progress_loss from sklearn.model_selection import cross_val_score, cross_validate # 实例化框架 传入数值、类别特征 model_class = ModelPipeline(num_cols=select_num, cat_cols=select_cat) lgb_pipe = model_class.select_model(fill_na='simple',encoder='ordina',scaler=None,normal=None,select=None,model='lgb') ctb_pipe = model_class.select_model(fill_na='simple',encoder='ordina',scaler=None,normal=None,select=None,model='ctb') xgb_pipe = model_class.select_model(fill_na='simple',encoder='ordina',scaler=None,normal=None,select=None,model='xgb') # 定义目标函数 # lgb def hy_obj_lgb(params): gbm = lgb.LGBMClassifier( boosting_type = 'dart', num_leaves = int(params['num_leaves']), max_depth = int(params['max_depth']), learning_rate = params['learning_rate'], n_estimators = int(params['n_estimators']), subsample = 0.8, min_child_samples = int(params['min_child_samples']), colsample_bytree = 0.8, reg_alpha = int(params['reg_alpha']), class_weight = params['class_weight'], n_jobs=8 ) lgb_pipe.set_params(model=gbm) # 替换模型 lgb_pipe.fit(new_train_data,y_train) train_pred = lgb_pipe.predict(new_train_data) train_f1 = f1_score(y_train, train_pred) val_pred = lgb_pipe.predict(new_val_data) # 使用验证集F1作为优化指标 val_f1 = f1_score(y_val, val_pred) generalization = 1-abs(val_f1-train_f1) / max(val_f1, train_f1) return {'loss': -val_f1,'status': STATUS_OK, 'model': lgb_pipe} # 定义搜索参数 class_weight_list_lgb = [{0:1,1:i+1} for i in range(5)] param_grid_lgb = { 'num_leaves': hp.quniform("num_leaves",8,16,1), 'max_depth': hp.quniform("max_depth",4,10,1), 'learning_rate': hp.uniform("learning_rate",0.07,0.3), 'n_estimators': hp.quniform("n_estimators",50,200,10), 'min_child_samples': hp.quniform("min_child_samples",100,1000,50), 'reg_alpha': hp.quniform("reg_alpha",100,1000,50), 'class_weight': hp.choice("class_weight", class_weight_list_lgb) } # ctb def hy_obj_ctb(params): ctbm = ctb.CatBoostClassifier( iterations = int(params['iterations']), # 1000 learning_rate = params['learning_rate'], # 0.3 depth = int(params['depth']), # 6 l2_leaf_reg = int(params['l2_leaf_reg']), class_weights = params['class_weight'], silent = True, thread_count=8 ) ctb_pipe.set_params(model=ctbm) # 替换模型 ctb_pipe.fit(new_train_data,y_train) train_pred = ctb_pipe.predict(new_train_data) train_f1 = f1_score(y_train, train_pred) val_pred = ctb_pipe.predict(new_val_data) # 使用验证集F1作为优化指标 val_f1 = f1_score(y_val, val_pred) generalization = 1-abs(val_f1-train_f1) / max(val_f1, train_f1) return {'loss': -val_f1,'status': STATUS_OK, 'model': ctb_pipe} # 定义搜索参数 class_weight_list_ctb = [{0:1,1:i+1} for i in range(5)] param_grid_ctb = { 'iterations': hp.quniform("iterations",30,100,5), 'learning_rate': hp.uniform("learning_rate",0.1,0.3), 'depth': hp.quniform("depth",4,8,1), 'l2_leaf_reg': hp.quniform("l2_leaf_reg",100,1000,50), 'class_weight': hp.choice("class_weight", class_weight_list_ctb) } # xgb def hy_obj_xgb(params): xgbc = xgb.XGBClassifier( n_estimators = int(params['n_estimators']), max_depth = int(params['max_depth']), learning_rate = params['learning_rate'], subsample = 0.8, colsample_bytree = 0.8, scale_pos_weight = int(params['scale_pos_weight']), # 正样本权重比例 reg_alpha = int(params['reg_alpha']), reg_lambda = int(params['reg_lambda']), use_label_encoder = False, eval_metric = "logloss", n_jobs=8 ) xgb_pipe.set_params(model=xgbc) # 替换模型 xgb_pipe.fit(new_train_data,y_train) train_pred = xgb_pipe.predict(new_train_data) train_f1 = f1_score(y_train, train_pred) val_pred = xgb_pipe.predict(new_val_data) # 使用验证集F1作为优化指标 val_f1 = f1_score(y_val, val_pred) generalization = 1-abs(val_f1-train_f1) / max(val_f1, train_f1) return {'loss': -val_f1,'status': STATUS_OK, 'model': xgb_pipe} # 定义搜索参数 param_grid_xgb = { 'n_estimators': hp.quniform("n_estimators",30,100,10), 'learning_rate': hp.uniform("learning_rate",0.08,0.3), 'max_depth': hp.quniform("max_depth",4,10,1), 'scale_pos_weight': hp.quniform("scale_pos_weight",1,5,1), 'reg_alpha': hp.quniform("reg_alpha", 50,500,100), 'reg_lambda': hp.quniform("reg_lambda", 50,500,100), } # 定义优化函数 def param_hy_lgb(fn=None,space=None,max_evals=100): #保存迭代过程 trials = Trials() #设置提前停止 early_stop_fn = no_progress_loss(100) #定义代理模型 params_best = fmin(fn = fn #目标函数 , space = space #参数空间 , algo = tpe.suggest #代理模型 , max_evals = max_evals #允许的迭代次数 , verbose = True , trials = trials , early_stop_fn = early_stop_fn) #打印最优参数,fmin会自动打印最佳分数 print("\n","best params: ", params_best) return params_best, trials # 开始优化 params_best_lgb, trials = param_hy_lgb(fn=hy_obj_lgb,space=param_grid_lgb,max_evals=200) params_best_ctb, trials = param_hy_lgb(fn=hy_obj_ctb,space=param_grid_ctb,max_evals=100) params_best_xgb, trials = param_hy_lgb(fn=hy_obj_xgb,space=param_grid_xgb,max_evals=100)
使用调节好参数的模型进行模型集成:
# 模型融合 投票组合 from sklearn.ensemble import VotingClassifier # 使用最优参数更新模型 # 由于hyperopt hp.choice返回的是参数的下标 需要将对应的参数映射回去 params_best_lgb['class_weight'] = class_weight_list_lgb[params_best_lgb['class_weight']] params_best_ctb['class_weight'] = class_weight_list_ctb[params_best_ctb['class_weight']] # 带入优化参数 dic_lgb = hy_obj_lgb(params=params_best_lgb) dic_ctb = hy_obj_ctb(params=params_best_ctb) dic_xgb = hy_obj_xgb(params=params_best_xgb) # 获取模型 model_lgb = dic_lgb['model'] model_ctb = dic_ctb['model'] model_xgb = dic_xgb['model'] # 基分类器 estimators = [ ('lgb',model_lgb), ('ctb',model_ctb), ('xgb',model_xgb) ] # 投票组合 软投票 vclf = VotingClassifier(estimators=estimators, voting='soft',n_jobs=8) vclf.fit(new_train_data, y_train) print('-------------------------模型训练完毕------------------------') print('训练集表现:') prob = vclf.predict_proba(new_train_data)[:,1] train_pred = [1 if i>0.5 else 0 for i in prob] print('混淆矩阵:\n',confusion_matrix(y_train, train_pred)) print('模型报告:\n',classification_report(y_train, train_pred)) print('验证集表现:') prob = vclf.predict_proba(new_val_data)[:,1] val_pred = [1 if i>0.5 else 0 for i in prob] print('混淆矩阵:\n',confusion_matrix(y_val, val_pred)) print('模型报告:\n',classification_report(y_val, val_pred))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。