赞
踩
Kearas == 2.2.4
Opencv == 3.3.1
Tensorflow == 1.10.0
Matplotlib == 3.1.3
Numpy == 1.19.1
# -*- coding: utf-8 -*-
import os
import cv2
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from keras import initializers
from keras.layers import SpatialDropout2D,Input, Conv2D, MaxPooling2D, Conv2DTranspose, concatenate,AveragePooling2D, UpSampling2D, BatchNormalization, Activation, add,Dropout,Permute,ZeroPadding2D,Add, Reshape
from keras.models import Model, model_from_json
from keras.optimizers import Adam
from keras.layers.advanced_activations import ELU, LeakyReLU, ReLU, PReLU
from keras.utils.vis_utils import plot_model
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from keras import applications, optimizers, callbacks
import matplotlib
import keras
import tensorflow as tf
from keras.layers import *
def conv2d_bn(x, filters, num_row, num_col, padding='same', strides=(1, 1), activation='relu', name=None):
'''
2D Convolutional layers
Arguments:
x {keras layer} -- input layer
filters {int} -- number of filters
num_row {int} -- number of rows in filters
num_col {int} -- number of columns in filters
Keyword Arguments:
padding {str} -- mode of padding (default: {'same'})
strides {tuple} -- stride of convolution operation (default: {(1, 1)})
activation {str} -- activation function (default: {'relu'})
name {str} -- name of the layer (default: {None})
Returns:
[keras layer] -- [output layer]
'''
x = Conv2D(filters, (num_row, num_col), strides=strides, padding=padding, use_bias=False)(x)
x = BatchNormalization(axis=3, scale=False)(x)
if(activation == None):
return x
x = Activation(activation, name=name)(x)
return x
def trans_conv2d_bn(x, filters, num_row, num_col, padding='same', strides=(2, 2), name=None):
'''
2D Transposed Convolutional layers
Arguments:
x {keras layer} -- input layer
filters {int} -- number of filters
num_row {int} -- number of rows in filters
num_col {int} -- number of columns in filters
Keyword Arguments:
padding {str} -- mode of padding (default: {'same'})
strides {tuple} -- stride of convolution operation (default: {(2, 2)})
name {str} -- name of the layer (default: {None})
Returns:
[keras layer] -- [output layer]
'''
x = Conv2DTranspose(filters, (num_row, num_col), strides=strides, padding=padding)(x)
x = BatchNormalization(axis=3, scale=False)(x)
return x
def DCBlock(U, inp, alpha = 1.67):
'''
DC Block
Arguments:
U {int} -- Number of filters in a corrsponding UNet stage
inp {keras layer} -- input layer
Returns:
[keras layer] -- [output layer]
'''
W = alpha * U
#shortcut = inp
#shortcut = conv2d_bn(shortcut, int(W*0.167) + int(W*0.333) +
# int(W*0.5), 1, 1, activation=None, padding='same')
conv3x3_1 = conv2d_bn(inp, int(W*0.167), 3, 3,
activation='relu', padding='same')
conv5x5_1 = conv2d_bn(conv3x3_1, int(W*0.333), 3, 3,
activation='relu', padding='same')
conv7x7_1 = conv2d_bn(conv5x5_1, int(W*0.5), 3, 3,
activation='relu', padding='same')
out1 = concatenate([conv3x3_1, conv5x5_1, conv7x7_1], axis=3)
out1 = BatchNormalization(axis=3)(out1)
conv3x3_2 = conv2d_bn(inp, int(W*0.167), 3, 3,
activation='relu', padding='same')
conv5x5_2 = conv2d_bn(conv3x3_2, int(W*0.333), 3, 3,
activation='relu', padding='same')
conv7x7_2 = conv2d_bn(conv5x5_2, int(W*0.5), 3, 3,
activation='relu', padding='same')
out2 = concatenate([conv3x3_2, conv5x5_2, conv7x7_2], axis=3)
out2 = BatchNormalization(axis=3)(out2)
out = add([out1, out2])
out = Activation('relu')(out)
out = BatchNormalization(axis=3)(out)
return out
def ResPath(filters, length, inp):
'''
ResPath
Arguments:
filters {int} -- [description]
length {int} -- length of ResPath
inp {keras layer} -- input layer
Returns:
[keras layer] -- [output layer]
'''
shortcut = inp
shortcut = conv2d_bn(shortcut, filters, 1, 1,
activation=None, padding='same')
out = conv2d_bn(inp, filters, 3, 3, activation='relu', padding='same')
out = add([shortcut, out])
out = Activation('relu')(out)
out = BatchNormalization(axis=3)(out)
for i in range(length-1):
shortcut = out
shortcut = conv2d_bn(shortcut, filters, 1, 1,
activation=None, padding='same')
out = conv2d_bn(out, filters, 3, 3, activation='relu', padding='same')
out = add([shortcut, out])
out = Activation('relu')(out)
out = BatchNormalization(axis=3)(out)
return out
def DCUNet(height, width, channels):
'''
DC-UNet
Arguments:
height {int} -- height of image
width {int} -- width of image
n_channels {int} -- number of channels in image
Returns:
[keras model] -- MultiResUNet model
'''
inputs = Input((height, width, channels))
dcblock1 = DCBlock(32, inputs)
pool1 = MaxPooling2D(pool_size=(2, 2))(dcblock1)
dcblock1 = ResPath(32, 4, dcblock1)
dcblock2 = DCBlock(32*2, pool1)
pool2 = MaxPooling2D(pool_size=(2, 2))(dcblock2)
dcblock2 = ResPath(32*2, 3, dcblock2)
dcblock3 = DCBlock(32*4, pool2)
pool3 = MaxPooling2D(pool_size=(2, 2))(dcblock3)
dcblock3 = ResPath(32*4, 2, dcblock3)
dcblock4 = DCBlock(32*8, pool3)
pool4 = MaxPooling2D(pool_size=(2, 2))(dcblock4)
dcblock4 = ResPath(32*8, 1, dcblock4)
dcblock5 = DCBlock(32*16, pool4)
up6 = concatenate([Conv2DTranspose(
32*8, (2, 2), strides=(2, 2), padding='same')(dcblock5), dcblock4], axis=3)
dcblock6 = DCBlock(32*8, up6)
up7 = concatenate([Conv2DTranspose(
32*4, (2, 2), strides=(2, 2), padding='same')(dcblock6), dcblock3], axis=3)
dcblock7 = DCBlock(32*4, up7)
up8 = concatenate([Conv2DTranspose(
32*2, (2, 2), strides=(2, 2), padding='same')(dcblock7), dcblock2], axis=3)
dcblock8 = DCBlock(32*2, up8)
up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(
2, 2), padding='same')(dcblock8), dcblock1], axis=3)
dcblock9 = DCBlock(32, up9)
conv10 = conv2d_bn(dcblock9, 1, 1, 1, activation='sigmoid')
model = Model(inputs=[inputs], outputs=[conv10])
return model
# -*- coding: utf-8 -*-
import os
import cv2
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from keras import initializers
from keras.layers import SpatialDropout2D,Input, Conv2D, MaxPooling2D, Conv2DTranspose, concatenate,AveragePooling2D, UpSampling2D, BatchNormalization, Activation, add,Dropout,Permute,ZeroPadding2D,Add, Reshape
from keras.models import Model, model_from_json
from keras.optimizers import Adam
from keras.layers.advanced_activations import ELU, LeakyReLU, ReLU, PReLU
from keras.utils.vis_utils import plot_model
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from keras import applications, optimizers, callbacks
import matplotlib
import keras
import tensorflow as tf
from keras.layers import *
from model import DCUNet
# prepare training and testing set
X = []
Y = []
for i in range(612):
path = 'E:\\DC-UNet-main\\data\\CVC-ClinicDB\\images\\'+ str(i+1)+'.png'
img = cv2.imread(path,1)
resized_img = cv2.resize(img,(128, 96), interpolation = cv2.INTER_CUBIC)
X.append(resized_img)
for i in range(612):
path2 = 'E:\\DC-UNet-main\\data\\CVC-ClinicDB\\masks\\' + str(i+1)+'.png'
msk = cv2.imread(path2,0)
resized_msk = cv2.resize(msk,(128, 96), interpolation = cv2.INTER_CUBIC)
Y.append(resized_msk)
X = np.array(X)
Y = np.array(Y)
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=5)
Y_train = Y_train.reshape((Y_train.shape[0],Y_train.shape[1],Y_train.shape[2],1))
Y_test = Y_test.reshape((Y_test.shape[0],Y_test.shape[1],Y_test.shape[2],1))
X_train = X_train / 255
X_test = X_test / 255
Y_train = Y_train / 255
Y_test = Y_test / 255
Y_train = np.round(Y_train,0)
Y_test = np.round(Y_test,0)
print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)
print(Y_test.shape)
# different loss functions
#集合相似度度量函数,通常用于计算两个样本的相似度
def dice_coef(y_true, y_pred):
smooth = 1.0 #0.0防止分母为0
y_true_f = K.flatten(y_true) #维度拉伸
y_pred_f = K.flatten(y_pred)
intersection = K.sum(y_true_f * y_pred_f)
return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
#
def jacard(y_true, y_pred):
y_true_f = K.flatten(y_true)
y_pred_f = K.flatten(y_pred)
intersection = K.sum ( y_true_f * y_pred_f)
union = K.sum ( y_true_f + y_pred_f - y_true_f * y_pred_f)
return intersection/union
def dice_coef_loss(y_true,y_pred):
return 1 - dice_coef(y_true,y_pred)
def iou_loss(y_true,y_pred):
return 1 - jacard(y_true, y_pred)
def tversky(y_true, y_pred):
y_true_pos = K.flatten(y_true)
y_pred_pos = K.flatten(y_pred)
true_pos = K.sum(y_true_pos * y_pred_pos)
false_neg = K.sum(y_true_pos * (1-y_pred_pos))
false_pos = K.sum((1-y_true_pos)*y_pred_pos)
alpha = 0.75
smooth = 1
return (true_pos + smooth)/(true_pos + alpha*false_neg + (1-alpha)*false_pos + smooth)
def tversky_loss(y_true, y_pred):
return 1 - tversky(y_true,y_pred)
def focal_tversky(y_true,y_pred):
pt_1 = tversky(y_true, y_pred)
gamma = 0.85
return K.pow((1-pt_1), gamma)
def binary_focal_loss(gamma=2, alpha=0.25):
"""
Binary form of focal loss.
适用于二分类问题的focal loss
focal_loss(p_t) = -alpha_t * (1 - p_t)**gamma * log(p_t)
where p = sigmoid(x), p_t = p or 1 - p depending on if the label is 1 or 0, respectively.
References:
https://arxiv.org/pdf/1708.02002.pdf
Usage:
model.compile(loss=[binary_focal_loss(alpha=.25, gamma=2)], metrics=["accuracy"], optimizer=adam)
"""
alpha = tf.constant(alpha, dtype=tf.float32)
gamma = tf.constant(gamma, dtype=tf.float32)
def binary_focal_loss_fixed(y_true, y_pred):
"""
y_true shape need be (None,1)
y_pred need be compute after sigmoid
"""
y_true = tf.cast(y_true, tf.float32)
alpha_t = y_true * alpha + (K.ones_like(y_true) - y_true) * (1 - alpha)
p_t = y_true * y_pred + (K.ones_like(y_true) - y_true) * (K.ones_like(y_true) - y_pred) + K.epsilon()
focal_loss = - alpha_t * K.pow((K.ones_like(y_true) - p_t), gamma) * K.log(p_t)
return K.mean(focal_loss)
return binary_focal_loss_fixed
# training
def saveModel(model):
model_json = model.to_json()
try:
os.makedirs('models')
except:
pass
fp = open('models/modelP.json','w')
fp.write(model_json)
model.save('models/modelW.h5')
def evaluateModel(model, X_test, Y_test, batchSize):
try:
os.makedirs('outputs')
except:
pass
yp = model.predict(x=X_test, batch_size=batchSize, verbose=1)
yp = np.round(yp,0)
for i in range(10):
plt.figure(figsize=(20,10))
plt.subplot(1,3,1)
plt.imshow(X_test[i])
plt.title('Input')
plt.subplot(1,3,2)
plt.imshow(Y_test[i].reshape(Y_test[i].shape[0],Y_test[i].shape[1]))
plt.title('Ground Truth')
plt.subplot(1,3,3)
plt.imshow(yp[i].reshape(yp[i].shape[0],yp[i].shape[1]))
plt.title('Prediction')
intersection = yp[i].ravel() * Y_test[i].ravel()
union = yp[i].ravel() + Y_test[i].ravel() - intersection
jacard = (np.sum(intersection)/np.sum(union))
plt.suptitle('Jacard Index'+ str(np.sum(intersection)) +'/'+ str(np.sum(union)) +'='+str(jacard))
plt.savefig('outputs/'+str(i)+'.png',format='png')
plt.close()
jacard = 0
dice = 0
for i in range(len(Y_test)):
yp_2 = yp[i].ravel()
y2 = Y_test[i].ravel()
intersection = yp_2 * y2
union = yp_2 + y2 - intersection
jacard += (np.sum(intersection)/np.sum(union))
dice += (2. * np.sum(intersection) ) / (np.sum(yp_2) + np.sum(y2))
jacard /= len(Y_test)
dice /= len(Y_test)
print('Jacard Index : '+str(jacard))
print('Dice Coefficient : '+str(dice))
fp = open('models/log.txt','a')
fp.write(str(jacard)+'\n')
fp.close()
fp = open('models/best.txt','r')
best = fp.read()
fp.close()
if(jacard>float(best)):
print('***********************************************')
print('Jacard Index improved from '+str(best)+' to '+str(jacard))
print('***********************************************')
fp = open('models/best.txt','w')
fp.write(str(jacard))
fp.close()
saveModel(model)
def trainStep(model, X_train, Y_train, X_test, Y_test, epochs, batchSize):
for epoch in range(epochs):
print('Epoch : {}'.format(epoch+1))
model.fit(x=X_train, y=Y_train, batch_size=batchSize, epochs=1, verbose=1)
evaluateModel(model,X_test, Y_test,batchSize)
return model
model = DCUNet(height=96, width=128, channels=3)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])
#binary_crossentropy
model.summary()
saveModel(model)
fp = open('models/log.txt','w')
fp.close()
fp = open('models/best.txt','w')
fp.write('-1.0')
fp.close()
trainStep(model, X_train, Y_train, X_test, Y_test, epochs=150, batchSize=4)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。