赞
踩
这是我第一次在csdn上写文章,做的东西只能算是学校的一些作业,如果有什么问题请大家多多指教。
这次做的是一个结合图像和对应短文本的无监督分类模型,使用的数据集来源于nus-wide,是一个来源于flickr的数据集,图片和文本一一对应,并对数据进行了分类
在文本特征方面,使用变分自编码器和BiLSTM结合来提取文本的特征。将文本先用genism转变为向量,每一条文本经过词嵌入的过程后变为形状为(40,100)的矩阵,将矩阵输入变分自编码器中,进行1000轮的训练,获取编码器的输出,即可获得压缩为(100,)的文本特征。将此时的z_mean作为kmeans的输入进行分类,准确率大概是60%不到,效果不能说是非常差,但总的来说挺不咋地。
下面展示一些 乱七八糟的代码
。
import matplotlib import keras.callbacks from keras import metrics from keras.optimizers import adadelta_v2,adam_v2 from keras.layers import Conv2D, Conv2DTranspose, Input,Conv1D,Conv1DTranspose,MaxPooling1D,UpSampling1D, Flatten, Dense, Lambda, Reshape,LSTM,RepeatVector,TimeDistributed,Bidirectional,Dropout # from keras.layers import BatchNormalization from keras.losses import binary_crossentropy from keras.activations import tanh, softmax from keras.models import Model from keras.datasets import mnist from keras import backend as K import numpy as np import matplotlib.pyplot as plt from keras.saving.save import load_model from matplotlib import style from sklearn.cluster import KMeans from tensorflow.python.framework.ops import disable_eager_execution disable_eager_execution() # Load MNIST from sklearn.model_selection import train_test_split matplotlib.use('TKAgg') # Normalize and reshape ============ class LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs={}): self.losses = {'batch':[], 'epoch':[]} #self.accuracy = {'batch':[], 'epoch':[]} self.val_loss = {'batch':[], 'epoch':[]} #self.val_acc = {'batch':[], 'epoch':[]} def on_batch_end(self, batch, logs={}): self.losses['batch'].append(logs.get('loss')) #self.accuracy['batch'].append(logs.get('acc')) self.val_loss['batch'].append(logs.get('val_loss')) #self.val_acc['batch'].append(logs.get('val_acc')) def on_epoch_end(self, batch, logs={}): self.losses['epoch'].append(logs.get('loss')) #self.accuracy['epoch'].append(logs.get('acc')) self.val_loss['epoch'].append(logs.get('val_loss')) #self.val_acc['epoch'].append(logs.get('val_acc')) def loss_plot(self, loss_type): iters = range(len(self.losses[loss_type])) plt.figure() # acc #plt.plot(iters, self.accuracy[loss_type], 'r', label='train acc') # loss plt.plot(iters, self.losses[loss_type], 'r', label='train loss') if loss_type == 'epoch': # val_acc #plt.plot(iters, self.val_acc[loss_type], 'b', label='val acc') # val_loss plt.plot(iters, self.val_loss[loss_type], 'b', label='val loss') plt.grid(True) plt.xlabel(loss_type) plt.ylabel('loss') plt.legend(loc="upper right") plt.show() # Norm. train_x = np.load('') #print(train_x) #train_x = np.load('') train_x = train_x.astype("float32") train_x=np.random.permutation(train_x) #train_x=train_x[0:,0:400] print(train_x.shape) #train_x=train_x.reshape((1040,1,400)) #print(train_x.shape) train_y=np.load('') # Reshape '' input_shape = (train_x.shape[1],train_x.shape[2]) print(input_shape) latent_dim = 3 input_txt = Input(shape=input_shape, name='encoder_input') x=Dropout(rate=0.2)(input_txt) x=Bidirectional(LSTM(50,activation='relu',recurrent_activation='sigmoid',recurrent_dropout=0,unroll=False,use_bias=False,name='LSTM'))(x) #x=Bidirectional(LSTM(128,activation=tanh,input_shape=input_shape,name='LSTM'))(x) #z_test=Dense(3,name="test")(x) #x=Dense(200,activation='relu',name='Dense1')(x) #t=Dense(100,activation='relu')(x) z_mu = Dense(latent_dim, name='latent_mu')(x) # Mean values of encoded input z2=Lambda(softmax,output_shape=(latent_dim,),name='z2')(z_mu) z_sigma = Dense(latent_dim ,name='latent_sigma')(x) # Std dev. (variance) of encoded input def sample_z(args): z_mu, z_sigma = args eps = K.random_normal(shape=(K.shape(z_mu)[0], K.int_shape(z_mu)[1])) return z_mu + K.exp(z_sigma / 2) * eps '''def sample_z(args):#仅做尝试,上面那个才是要用的 z_mu, z_sigma = args #batch_size = tf.shape(z_mu)[0] eps = K.random_normal(shape=(K.shape(z_mu)[0], K.int_shape(z_mu)[1]),mean=0,stddev=1) return z_mu + z_sigma * eps''' z = Lambda(sample_z, output_shape=(latent_dim,), name='z')([z_mu, z_sigma]) #t=Dense(latent_dim,name='test',activation=softmax)(z) #t=Lambda(softmax,output_shape=(latent_dim,))(z) encoder = Model(input_txt, [ x,z_mu, z_sigma, z,z2], name='encoder') encoder.summary() # ================= ########### # Decoder decoder_input = Input(shape=(latent_dim,), name='decoder_input') #decoded=Dense(200)(decoder_input)#仅尝试 decoded = RepeatVector(train_x.shape[1], name='EmbeddingtoTimeSeries')(decoder_input) decoded = Bidirectional(LSTM(50,name='DecoderLSTM1',activation='relu', return_sequences=True))(decoded) #intermediate dimensions decoded=Dropout(rate=0.2)(decoded) decoded=TimeDistributed(Dense(train_x.shape[2]))(decoded) # Define and summarize decoder model decoder = Model(decoder_input, decoded, name='decoder') decoder.summary() # apply the decoder to the latent sample z_decoded = decoder(z) # ========================= # Define custom loss # VAE is trained using two loss functions reconstruction loss and KL divergence # Let us add a class to define a custom layer with loss class CustomLayer(keras.layers.Layer): def vae_loss(self, inputs, decoded): inputs=K.flatten(inputs) decoded=K.flatten(decoded) xent_loss = binary_crossentropy(inputs, decoded) kl_loss = -5e-2 * K.mean(1 + z_sigma - K.square(z_mu) - K.exp(z_sigma),axis=-1) return K.mean(xent_loss+kl_loss) # add custom loss to the class def call(self, inputs): x = inputs[0] z_decoded = inputs[1] loss = self.vae_loss(x, z_decoded) self.add_loss(loss, inputs=inputs) return x # apply the custom loss to the input images and the decoded latent distribution sample y = CustomLayer()([input_txt, z_decoded]) print(type(y)) vae = Model(input_txt, y, name='vae') # Compile VAE vae.compile(optimizer='adam', loss=None)#似乎adam效果最好 vae.summary() # Train autoencoder #reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, mode='auto') #earlystopping = keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=30, mode='auto',restore_best_weights=True) #vae.fit(train_x,y=None, epochs=1000, batch_size=128,callbacks=[reduce_lr,earlystopping],validation_split=0.2) history = LossHistory() vae.fit(train_x,y=None, epochs=500, batch_size=64,callbacks=[history],validation_split=0.2) history.loss_plot('epoch') # ================= # Visualize results # ================= # Visualize inputs mapped to the Latent space # Remember that we have encoded inputs to latent space dimension = 2. # Extract z_mu --> first parameter in the result of encoder prediction representing mean x,mu, _, zz,sof_z = encoder.predict(test_x)#获取编码器的输出,x就是文本特征 #print(mu) # Plot dim1 and dim2 for mu if latent_dim==2: plt.figure(figsize=(6, 6)) plt.scatter(mu[:, 0], mu[:, 1], c=train_y, cmap='brg') plt.xlabel('dim 1') plt.ylabel('dim 2') plt.colorbar() plt.show() else: style.use('ggplot') fig = plt.figure() ax1 = fig.add_subplot(111, projection='3d') ax1.scatter(mu[:, 0], mu[:, 1], mu[:, 2], c=train_y, marker='o') # defining x, y, z co-ordinates ax1.set_xlabel('x-axis') ax1.set_ylabel('y-axis') ax1.set_zlabel('z-axis') plt.show() est = KMeans(n_clusters=3) est.fit(mu) k_label=est.labels_ np.savetxt('D:/vae_txt_k_label.txt',k_label)
图像特征提取就直接用了预训练的resnet50,先用权重为imagenet的resnet50提取出长度2048的图像特征(这里要把resnet最后的dense层去掉),但这长度和文本特征实在是不搭,所以用自编码器给压缩一下,个人感觉用pca效果不如自编码器,所以用了自编码器,也懒得认真对比了,学校作业就随便搞搞了,总之把图像特征搞成100的长度。
下面是 代码
。
import matplotlib from matplotlib import style import keras.callbacks from keras import metrics from keras.optimizer_v2 import adam from keras.optimizers import adadelta_v2,adam_v2 from keras.layers import Conv2D, Conv2DTranspose, Input, Flatten, Dense, Lambda, Reshape, LSTM, RepeatVector, Bidirectional,\ TimeDistributed, Bidirectional, Dropout, Conv1D, MaxPooling1D, UpSampling1D, Conv1DTranspose # from keras.layers import BatchNormalization from keras.losses import binary_crossentropy, sparse_categorical_crossentropy, categorical_crossentropy, mae, mse from keras.activations import tanh, relu from keras.models import Model, Sequential from keras.datasets import mnist from keras import backend as K import numpy as np import matplotlib.pyplot as plt from keras.saving.save import load_model from sklearn.cluster import KMeans from tensorflow.python.framework.ops import disable_eager_execution disable_eager_execution() # Load MNIST from sklearn.model_selection import train_test_split matplotlib.use('TKAgg') feature=np.load("") feature=feature.astype("float32") #print(feature) feature=np.random.permutation(feature) print(feature.shape) #print(feature) #print(feature.dtype) train_x=feature[0:,0:2048] print(train_x.shape) #train_x=train[0] #train_x=train_x.reshape((1040,1,2048)) train_y=feature[0:,2048:2049] print(train_y.shape) #reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss',factor=0.1, patience=30, mode='auto') #earlystopping = keras.callbacks.EarlyStopping(monitor='val_loss', verbose=1, patience=30,mode='auto',restore_best_weights=True) # Reshape class LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs={}): self.losses = {'batch':[], 'epoch':[]} #self.accuracy = {'batch':[], 'epoch':[]} self.val_loss = {'batch':[], 'epoch':[]} #self.val_acc = {'batch':[], 'epoch':[]} def on_batch_end(self, batch, logs={}): self.losses['batch'].append(logs.get('loss')) #self.accuracy['batch'].append(logs.get('acc')) self.val_loss['batch'].append(logs.get('val_loss')) #self.val_acc['batch'].append(logs.get('val_acc')) def on_epoch_end(self, batch, logs={}): self.losses['epoch'].append(logs.get('loss')) #self.accuracy['epoch'].append(logs.get('acc')) self.val_loss['epoch'].append(logs.get('val_loss')) #self.val_acc['epoch'].append(logs.get('val_acc')) def loss_plot(self, loss_type): iters = range(len(self.losses[loss_type])) plt.figure() # acc #plt.plot(iters, self.accuracy[loss_type], 'r', label='train acc') # loss plt.plot(iters, self.losses[loss_type], 'r', label='train loss') if loss_type == 'epoch': # val_acc #plt.plot(iters, self.val_acc[loss_type], 'b', label='val acc') # val_loss plt.plot(iters, self.val_loss[loss_type], 'b', label='val loss') plt.grid(True) plt.xlabel(loss_type) plt.ylabel('loss') plt.legend(loc="upper right") plt.show() #input_shape = (train_x.shape[1],train_x.shape[2]) input_shape=(2048,) #adam1=adam_v2.Adam(learning_rate=0.001,decay=0.001) model = Sequential() model.add(Input(shape=input_shape,name="input")) model.add(Dense(100,activation='relu',name='bidirectional')) #model.add(Dropout(rate=0.2)) model.add(Dense(3,name='dense1')) model.add(Dense(100,activation='relu')) model.add(Dense(2048)) #model.add(Bidirectional(LSTM(50,activation='relu' ,return_sequences=True))) #model.add(TimeDistributed(Dense(train_x.shape[2]))) #model.compile(optimizer='adam', loss=categorical_crossentropy,metrics=['accuracy']) model.compile(optimizer='adam', loss=mse) model.summary() # fit model #model.fit(train_x, train_x, epochs=200,callbacks=[reduce_lr,earlystopping], batch_size=64, validation_split=0.2) history=LossHistory() model.fit(train_x, train_x, epochs=500, callbacks=[history],batch_size=64) history.loss_plot('epoch') encoder=Model(inputs=model.get_layer('bidirectional').input,outputs=model.get_layer('dense1').output) encoder2=Model(inputs=model.get_layer('bidirectional').input,outputs=model.get_layer('bidirectional').output) model.save_weights("",encoder2) mu= encoder.predict(train_x) style.use('ggplot') fig = plt.figure() ax1 = fig.add_subplot(111, projection='3d') ax1.scatter(mu[:, 0], mu[:, 1],mu[:, 2], c = train_y, marker = 'o') # defining x, y, z co-ordinates ax1.set_xlabel('x-axis') ax1.set_ylabel('y-axis') ax1.set_zlabel('z-axis') plt.show() '''plt.figure(figsize=(6, 6)) plt.scatter(mu[:, 0], mu[:, 1], c=train_y, cmap='brg') plt.xlabel('dim 1') plt.ylabel('dim 2') plt.colorbar() plt.show()''' feature=np.load("") feature=feature.astype("float32") #print(feature) #feature=np.random.permutation(feature) #print(feature.shape) #print(feature) #print(feature.dtype) x=feature[0:,0:2048] ae_pic_feature=encoder2.predict(x) est = KMeans(n_clusters=3) est.fit(mu) k_label=est.labels_
先把上面两个模型的输出用concatenate拼接一下,让长度变成200,然后丢到vae里面,再把输出的结果用kmeans聚个类,基本上就搞定了
下面是 vae代码
。
import keras import matplotlib import tensorflow as tf from keras import metrics from keras.backend import softmax from keras.optimizers import adadelta_v2,adam_v2 from keras.layers import Conv2D, Conv2DTranspose, Input, Flatten, Dense, Lambda, Reshape,LSTM,RepeatVector,TimeDistributed,Bidirectional,Dropout # from keras.layers import BatchNormalization from keras.losses import binary_crossentropy from keras.activations import tanh#,softmax from keras.models import Model from keras.datasets import mnist from keras import backend as K from mpl_toolkits.mplot3d import axes3d import matplotlib.pyplot as plt from matplotlib import style import numpy as np from keras import callbacks import matplotlib.pyplot as plt from keras.saving.save import load_model from sklearn.cluster import KMeans from sklearn.preprocessing import MinMaxScaler from tensorflow.python.framework.ops import disable_eager_execution disable_eager_execution() # Load MNIST from sklearn.model_selection import train_test_split matplotlib.use('TKAgg') #(x_train, y_train), (x_test, y_test) = mnist.load_data() # Normalize and reshape ============ class LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs={}): self.losses = {'batch':[], 'epoch':[]} #self.accuracy = {'batch':[], 'epoch':[]} self.val_loss = {'batch':[], 'epoch':[]} #self.val_acc = {'batch':[], 'epoch':[]} def on_batch_end(self, batch, logs={}): self.losses['batch'].append(logs.get('loss')) #self.accuracy['batch'].append(logs.get('acc')) self.val_loss['batch'].append(logs.get('val_loss')) #self.val_acc['batch'].append(logs.get('val_acc')) def on_epoch_end(self, batch, logs={}): self.losses['epoch'].append(logs.get('loss')) #self.accuracy['epoch'].append(logs.get('acc')) self.val_loss['epoch'].append(logs.get('val_loss')) #self.val_acc['epoch'].append(logs.get('val_acc')) def loss_plot(self, loss_type): iters = range(len(self.losses[loss_type])) plt.figure() # acc #plt.plot(iters, self.accuracy[loss_type], 'r', label='train acc') # loss plt.plot(iters, self.losses[loss_type], 'r', label='train loss') if loss_type == 'epoch': # val_acc #plt.plot(iters, self.val_acc[loss_type], 'b', label='val acc') # val_loss plt.plot(iters, self.val_loss[loss_type], 'b', label='val loss') plt.grid(True) plt.xlabel(loss_type) plt.ylabel('acc-loss') plt.legend(loc="upper right") plt.show() # Norm. '''x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train = x_train / 255 x_test = x_test / 255''' feature=np.load("")#加载之前做好的特征 feature=feature.astype("float32")#使数据类型为float test_x=feature[0:,0:200] test_y=feature[0:,200:] test_x=test_x.reshape((1457,1,200)) print(feature.shape) feature=np.random.permutation(feature) #print(feature) #print(feature.dtype) train=feature[0:,0:200] #print(train) train_x=train.reshape((1457,1,200)) # Reshape input_shape = (train_x.shape[1],train_x.shape[2])#输入训练的形状为(1,400) print(input_shape) latent_dim = 3 # Number of latent dim parameters即为想要的主题数 input_txt = Input(shape=input_shape, name='encoder_input') x=Dropout(rate=0.3)(input_txt) x=Bidirectional(LSTM(100,activation=tanh,recurrent_activation='sigmoid',recurrent_dropout=0,unroll=False,use_bias=False,input_shape=input_shape,name='LSTM'))(x) z_mu = Dense(latent_dim, name='latent_mu')(x) # Mean values of encoded input z_sigma = Dense(latent_dim ,name='latent_sigma')(x) # Std dev. (variance) of encoded input def sample_z(args): z_mu, z_sigma = args eps = K.random_normal(shape=(K.shape(z_mu)[0], K.int_shape(z_mu)[1])) return z_mu + K.exp(z_sigma / 2) * eps z = Lambda(sample_z, output_shape=(latent_dim,), name='z')([z_mu, z_sigma]) #z=Lambda(softmax,output_shape=(latent_dim,))(z) encoder = Model(input_txt, [ z_mu, z_sigma, z,t,z2], name='encoder') encoder.summary() # ================= ########### # Decoder decoder_input = Input(shape=(latent_dim,), name='decoder_input') #decoded=Dense(200)(decoder_input)#仅尝试 decoded = RepeatVector(train_x.shape[1], name='EmbeddingtoTimeSeries')(decoder_input) decoded = Bidirectional(LSTM(100,name='DecoderLSTM1',activation=tanh, return_sequences=True))(decoded) #intermediate dimensions decoded=Dropout(rate=0.3)(decoded) decoded=TimeDistributed(Dense(train_x.shape[2]))(decoded) # Define and summarize decoder model decoder = Model(decoder_input, decoded, name='decoder') decoder.summary() # apply the decoder to the latent sample z_decoded = decoder(z) # ========================= # Define custom loss # VAE is trained using two loss functions reconstruction loss and KL divergence # Let us add a class to define a custom layer with loss class CustomLayer(keras.layers.Layer): def vae_loss(self, inputs, decoded): inputs=K.flatten(inputs) decoded=K.flatten(decoded) xent_loss = binary_crossentropy(inputs, decoded) kl_loss = -5e-2 * K.mean(1 + z_sigma - K.square(z_mu) - K.exp(z_sigma),axis=-1) return K.mean(xent_loss+kl_loss) # add custom loss to the class def call(self, inputs): x = inputs[0] z_decoded = inputs[1] loss = self.vae_loss(x, z_decoded) self.add_loss(loss, inputs=inputs) return x # apply the custom loss to the input images and the decoded latent distribution sample y = CustomLayer()([input_txt, z_decoded]) print(type(y)) vae = Model(input_txt, y, name='vae') # Compile VAE vae.compile(optimizer='adam', loss=None)#似乎adam效果最好 vae.summary() # Train autoencoder history = LossHistory() vae.fit(train_x,y=None, epochs=1000,callbacks=[history], batch_size=64,validation_split=0.2) history.loss_plot('epoch') # ================= # Visualize results # ================= # Visualize inputs mapped to the Latent space # Remember that we have encoded inputs to latent space dimension = 2. # Extract z_mu --> first parameter in the result of encoder prediction representing mean mu, _, zz, t,z_2 = encoder.predict(test_x)#获取编码器的输出 #encoder.save_weights('') est = KMeans(n_clusters=3) est.fit(mu) #print(mu.shape) #a=est.predict(mu[0:1]) #print(a) '''joblib.dump(est, 'D:/keyan/source/model/Kmeans11.pkl') # 载入模型 kmeans = joblib.load('') labels = kmeans.labels_''' labels=est.labels_ print(labels) np.savetxt('',labels) style.use('ggplot') fig = plt.figure() ax1 = fig.add_subplot(111, projection='3d') ax1.scatter(z_2[:, 0],z_2[:, 1],z_2[:, 2], c = test_y, marker = 'o') # defining x, y, z co-ordinates ax1.set_xlabel('x-axis') ax1.set_ylabel('y-axis') ax1.set_zlabel('z-axis') plt.show() style.use('ggplot') fig = plt.figure() ax2 = fig.add_subplot(111, projection='3d') ax2.scatter(z_2[:, 0], z_2[:, 1],z_2[:, 2], c = labels, marker = 'o') # defining x, y, z co-ordinates ax2.set_xlabel('x-axis') ax2.set_ylabel('y-axis') ax2.set_zlabel('z-axis') plt.show()
最后和本身数据集的标签对比下,分类的准确率在0.845,效果就那样吧,只是相对这个做个记录。
在VAE的encoder的z后面加个softmax函数,把z拍到一个平面上就可以当主题模型用了,可以看出不同类别主题的占比
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。