赞
踩
本文实现的是二项逻辑斯谛回归模型,因此使用的是处理过后的两类别数据 mnist_binary.csv,表中对原手写数据中0~4取作负类 -1,将5~9取作正类 +1。
另根据逻辑斯谛回归模型按条件概率分布定义:
P
(
Y
=
1
∣
x
)
=
e
x
p
(
w
⋅
x
)
1
+
e
x
p
(
w
⋅
x
)
P(Y=1|x)=\frac{exp(w\cdot x)}{1 + exp(w\cdot x)}
P(Y=1∣x)=1+exp(w⋅x)exp(w⋅x)
P
(
Y
=
0
∣
x
)
=
1
1
+
e
x
p
(
w
⋅
x
)
P(Y=0|x)=\frac{1}{1 + exp(w\cdot x)}
P(Y=0∣x)=1+exp(w⋅x)1
Y的取值应为0,1,因此需要将表中的-1类转换为0后再进行训练;此外由于要计算指数函数,特征取值过多会导致指数函数计算过程中的溢出,因此还需要将图像数据进行二值化操作。此部分直接在代码中完成,就不生成相应的数据集了。
上面提到的逻辑斯谛回归模型的条件概率分布定义,可以看作是模型将线性函数
w
⋅
x
w\cdot x
w⋅x通过其定义式转换为概率表现形式:
P
(
Y
=
1
∣
x
)
=
e
x
p
(
w
⋅
x
)
1
+
e
x
p
(
w
⋅
x
)
P(Y=1|x)=\frac{exp(w\cdot x)}{1 + exp(w\cdot x)}
P(Y=1∣x)=1+exp(w⋅x)exp(w⋅x)
上式中表示事情发生的概率,在线性函数趋近于无穷大时,概率值越接近于1;线性函数趋近于负无穷时,概率值就接近于0;函数图像如下所示,模型的临界点在线性函数为零时,条件概率值为0.5。
逻辑斯谛回归模型也可以推广至多分类,见总结部分。
设上述逻辑斯谛回归模型可改写为如下格式:
P ( Y = 1 ∣ x ) = π ( x ) , P ( Y = 0 ∣ x ) = 1 − π ( x ) P(Y=1|x)=\pi(x),P(Y=0|x)=1-\pi(x) P(Y=1∣x)=π(x),P(Y=0∣x)=1−π(x)
其似然函数为:
∏ i = 1 N [ π ( x i ) ] y i [ 1 − π ( x i ) ] 1 − y i \prod_{i=1}^{N}[\pi(x_i)]^{y_i}[1-\pi(x_i)]^{1-y_i} i=1∏N[π(xi)]yi[1−π(xi)]1−yi
对数似然函数:
L
(
w
)
=
∑
i
=
1
N
[
y
i
l
o
g
π
(
x
i
)
+
(
1
−
y
i
)
l
o
g
(
1
−
π
(
x
i
)
)
]
=
∑
i
=
1
N
[
y
i
l
o
g
π
(
x
i
)
1
−
π
(
x
i
)
+
l
o
g
(
1
−
π
(
x
i
)
)
]
=
∑
i
=
1
N
[
y
i
(
w
⋅
x
i
)
−
l
o
g
(
1
+
e
x
p
(
w
⋅
x
i
)
)
]
=
y
i
(
w
⋅
x
)
−
l
o
g
(
1
+
e
x
p
(
w
⋅
x
)
)
利用随机梯度下降方法优化算法,以向量形式对权重进行求导:
∂
L
(
w
)
∂
w
=
y
i
x
−
x
⋅
e
x
p
(
w
⋅
x
)
1
+
e
x
p
(
w
⋅
x
)
=
x
[
y
i
−
e
x
p
(
w
⋅
x
)
1
+
e
x
p
(
w
⋅
x
)
]
每次迭代过程中更新权重参数:
w = w + α ∂ L ( w ) ∂ w w = w + \alpha\frac{\partial L(w)}{\partial w} w=w+α∂w∂L(w)
根据上述算法步骤,可以发现基于随机梯度下降法的二项逻辑斯谛回归和基于梯度下降法的感知机模型学习算法流程基本一致,区别在于参数步骤的更新方式。另外在判别过程中:感知机采用符号函数Sgin,逻辑斯谛回归采用逻辑斯谛分布Sigmoid进行计算,可参考感知机模型学习原始算法。
具体实现代码如下:
# @Author: phd # @Date: 2019-08-18 # @Site: github.com/phdsky # @Description: NULL import time import logging import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.preprocessing import Binarizer def log(func): def wrapper(*args, **kwargs): start_time = time.time() ret = func(*args, **kwargs) end_time = time.time() logging.debug('%s() cost %s seconds' % (func.__name__, end_time - start_time)) return ret return wrapper def calc_accuracy(y_pred, y_truth): assert len(y_pred) == len(y_truth) n = len(y_pred) hit_count = 0 for i in range(0, n): if y_pred[i] == y_truth[i]: hit_count += 1 print("Predicting accuracy %f" % (hit_count / n)) class LogisticRegression(object): def __init__(self, w, b, learning_rate, max_epoch, learning_period, learning_ratio): self.weight = w self.bias = b self.lr_rate = learning_rate self.max_epoch = max_epoch self.lr_period = learning_period self.lr_ratio = learning_ratio def calculate(self, feature): # wx = sum([self.weight[j] * feature[j] for j in range(len(self.weight))]) wx = np.dot(self.weight.transpose(), feature) exp_wx = np.exp(wx) predicted = 0 if (1 / (1 + exp_wx)) > 0.5 else 1 return predicted, exp_wx @log def train(self, X_train, y_train): # Fuse weight with bias self.weight = np.full((len(X_train[0]), 1), self.weight, dtype=float) self.weight = np.row_stack((self.weight, self.bias)) epoch = 0 while epoch < self.max_epoch: hit_count = 0 data_count = len(X_train) for i in range(data_count): feature = X_train[i].reshape([len(X_train[i]), 1]) feature = np.row_stack((feature, 1)) label = y_train[i] predicted, exp_wx = self.calculate(feature) if predicted == label: hit_count += 1 continue # for k in range(len(self.weight)): # self.weight[k] += self.lr_rate * (label*feature[k] - ((feature[k] * exp_wx) / (1 + exp_wx))) self.weight += self.lr_rate * feature * (label - (exp_wx / (1 + exp_wx))) epoch += 1 print("\rEpoch %d, lr_rate=%f, Acc = %f" % (epoch, self.lr_rate, hit_count / data_count), end='') # Decay learning rate if epoch % self.lr_period == 0: self.lr_rate /= self.lr_ratio # Stop training if self.lr_rate <= 1e-6: print("\nLearning rate is too low, Early stopping...\n") break @log def predict(self, X_test): n = len(X_test) predict_label = np.full(n, -1) for i in range(0, n): to_predict = X_test[i].reshape([len(X_test[i]), 1]) vec_predict = np.row_stack((to_predict, 1)) predict_label[i], _ = self.calculate(vec_predict) return predict_label if __name__ == "__main__": logger = logging.getLogger() logger.setLevel(logging.DEBUG) mnist_data = pd.read_csv("../data/mnist_binary.csv") mnist_values = mnist_data.values images = mnist_values[::, 1::] labels = mnist_values[::, 0] X_train, X_test, y_train, y_test = train_test_split( images, labels, test_size=0.33, random_state=42 ) # Handle all -1 in y_train to 0 y_train = y_train * (y_train == 1) y_test = y_test * (y_test == 1) # Binary the image to avoid predict_probability gets 0 binarizer_train = Binarizer(threshold=127).fit(X_train) X_train_binary = binarizer_train.transform(X_train) binarizer_test = Binarizer(threshold=127).fit(X_test) X_test_binary = binarizer_test.transform(X_test) lr = LogisticRegression(w=0, b=1, learning_rate=0.001, max_epoch=100, learning_period=10, learning_ratio=3) print("Logistic regression training...") lr.train(X_train=X_train_binary, y_train=y_train) print("\nTraining done...") print("Testing on %d samples..." % len(X_test)) y_predicted = lr.predict(X_test=X_test_binary) calc_accuracy(y_pred=y_predicted, y_truth=y_test)
代码输出
/Users/phd/Softwares/anaconda3/bin/python /Users/phd/Desktop/ML/logistic_regression/logistic_regression.py
Logistic regression training...
Epoch 70, lr_rate=0.000001, Acc = 0.818479
Learning rate is too low, Early stopping...
Training done...
Testing on 13860 samples...
DEBUG:root:train() cost 38.08758902549744 seconds
Predicting accuracy 0.831097
DEBUG:root:predict() cost 0.2131938934326172 seconds
Process finished with exit code 0
从结果可以看出,在图像二值化后逻辑斯谛算法的训练和测试精度都在80%+,算法效果较好;预测结果优于直接使用原始数据的感知机模型。
{
P
(
Y
=
k
∣
x
)
=
e
x
p
(
w
k
⋅
x
)
1
+
∑
k
=
1
K
−
1
e
x
p
(
w
k
⋅
x
)
,
k
=
1
,
2
,
.
.
.
,
K
−
1
P
(
Y
=
K
∣
x
)
=
1
1
+
∑
k
=
1
K
−
1
e
x
p
(
w
k
⋅
x
)
\left\{
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。