赞
踩
模型:
Iterated Dilated Convolutions(IDCNN)
论文:
Fast and Accurate Entity Recognition with Iterated Dilated Convolutions
摘要:
对于序列标注来讲,普通CNN有一个劣势,就是卷积之后,末层神经元可能只是得到了原始输入数据中一小块的信息。而对NER来讲,整个句子的每个字都有可能都会对当前需要标注的字做出影响。为了覆盖到输入的全部信息就需要加入更多的卷积层, 导致层数越来越深,参数越来越多,而为了防止过拟合又要加入更多的Dropout之类的正则化,带来更多的超参数,整个模型变得庞大和难以训练。因为CNN这样的劣势,大部分序列标注问题人们还是使用biLSTM之类的网络结构,尽可能使用网络的记忆力记住全句的信息来对单个字做标注。
作者提出了一个dilated CNN的模型,意思是“膨胀的”CNN。想法其实很简单:正常CNN的filter,都是作用在输入矩阵一片连续的位置上,不断sliding做卷积。dilated CNN为这片filter增加了一个dilation width,作用在输入矩阵的时候,会skip掉所有dilation width中间的输入数据;而filter矩阵本身的大小仍然不变,这样filter获取到了更广阔的输入矩阵上的数据,看上去就像是“膨胀”了一般。
具体使用时,dilated width会随着层数的增加而指数增加。这样随着层数的增加,参数数量是线性增加的,而receptive field却是指数增加的,可以很快覆盖到全部的输入数据。
Tips:文章可能存在一些漏洞,欢迎留言指出
网络结构
感受野大小对比
普通卷积
膨胀卷积
import torch from torch import nn from torch.utils.data import Dataset from torchcrf import CRF class IDCNN_CRF(nn.Module): def __init__(self, vocab_size, embedding_dim, padding_idx, filters: int, kernel_size, tagset_size): super(IDCNN_CRF, self).__init__() # 词嵌入层 self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=padding_idx) # IDCNN self.conv1_1 = nn.Conv1d(in_channels=embedding_dim, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv1_2 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv1_3 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2 + 1, dilation=(2,)) self.conv2_1 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv2_2 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv2_3 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2 + 1, dilation=(2,)) self.conv3_1 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv3_2 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv3_3 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2 + 1, dilation=(2,)) self.conv4_1 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv4_2 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2, dilation=(1,)) self.conv4_3 = nn.Conv1d(in_channels=filters, out_channels=filters, kernel_size=kernel_size, padding=kernel_size // 2 + 1, dilation=(2,)) # 归一化层 self.norm = nn.LayerNorm(filters, elementwise_affine=False) # 全连接层 self.dense = nn.Linear(in_features=filters, out_features=tagset_size) # CRF层 self.crf = CRF(num_tags=tagset_size) def forward(self, texts, tags, masks): # [8, 60, 120] texts = self.embed(texts).permute(0, 2, 1) x = torch.relu(self.conv1_1(texts)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv1_2(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv1_3(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv2_1(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv2_2(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv2_3(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv3_1(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv3_2(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv3_3(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv4_1(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv4_2(x)).permute(0, 2, 1) x = self.norm(x).permute(0, 2, 1) x = torch.relu(self.conv4_3(x)).permute(0, 2, 1) x = self.norm(x) out = x.permute(1, 0, 2) feats = self.dense(out) if tags is not None: tags = tags.permute(1, 0) if masks is not None: masks = masks.permute(1, 0) # 计算损失值和概率 if tags is not None: loss = self.neg_log_likelihood(feats, tags, masks, 'mean') predictions = self.crf.decode(emissions=feats, mask=masks) # [batch, 任意数] return loss, predictions else: predictions = self.crf.decode(emissions=feats, mask=masks) return predictions # 负对数似然损失函数 def neg_log_likelihood(self, emissions, tags=None, mask=None, reduction=None): return -1 * self.crf(emissions=emissions, tags=tags, mask=mask, reduction=reduction)
设置batch_size=8,embddding_dim=200,filters=240,epoch=6
与BiLSTM-CRF模型对比,速度快不了多少,预测结果感觉差强人意,可能是因为没有设置并行的Block。
import pandas as pd import torch import pickle import os from torch import optim from torch.utils.data import DataLoader from tqdm import tqdm from idcnn_crf import IDCNN_CRF, NerDataset, NerDatasetTest from sklearn.metrics import f1_score from torchviz import make_dot # 路径 TRAIN_PATH = './dataset/train_data_public.csv' TEST_PATH = './dataset/test_public.csv' VOCAB_PATH = './data/vocab.txt' WORD2INDEX_PATH = './data/word2index.pkl' MODEL_PATH = './model/idcnn_crf.pkl' # 超参数 MAX_LEN = 60 BATCH_SIZE = 8 EMBEDDING_DIM = 200 FILTERS = 240 EPOCH = 6 # 预设 # 设备 DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" # tag2index tag2index = { "O": 0, # 其他 "B-BANK": 1, "I-BANK": 2, # 银行实体 "B-PRODUCT": 3, "I-PRODUCT": 4, # 产品实体 "B-COMMENTS_N": 5, "I-COMMENTS_N": 6, # 用户评论,名词 "B-COMMENTS_ADJ": 7, "I-COMMENTS_ADJ": 8 # 用户评论,形容词 } # 预设标签 unk_flag = '[UNK]' # 未知 pad_flag = '[PAD]' # 填充 start_flag = '[STA]' # 开始 end_flag = '[END]' # 结束 # 生成word2index词典 def create_word2index(): if not os.path.exists(WORD2INDEX_PATH): # 如果文件不存在,则生成word2index word2index = dict() with open(VOCAB_PATH, 'r', encoding='utf8') as f: for word in f.readlines(): word2index[word.strip()] = len(word2index) + 1 with open(WORD2INDEX_PATH, 'wb') as f: pickle.dump(word2index, f) # 数据准备 def data_prepare(): # 加载训练集、测试集 train_dataset = pd.read_csv(TRAIN_PATH, encoding='utf8') test_dataset = pd.read_csv(TEST_PATH, encoding='utf8') return train_dataset, test_dataset # 文本、标签转化为索引(训练集) def text_tag_to_index(dataset, word2index): # 预设索引 unk_index = word2index.get(unk_flag) pad_index = word2index.get(pad_flag) start_index = word2index.get(start_flag, 2) end_index = word2index.get(end_flag, 3) texts, tags, masks = [], [], [] n_rows = len(dataset) # 行数 for row in tqdm(range(n_rows)): text = dataset.iloc[row, 1] tag = dataset.iloc[row, 2] # 文本对应的索引 text_index = [start_index] + [word2index.get(w, unk_index) for w in text] + [end_index] # 标签对应的索引 tag_index = [0] + [tag2index.get(t) for t in tag.split()] + [0] # 填充或截断句子至标准长度 if len(text_index) < MAX_LEN: # 句子短,填充 pad_len = MAX_LEN - len(text_index) text_index += pad_len * [pad_index] tag_index += pad_len * [0] elif len(text_index) > MAX_LEN: # 句子长,截断 text_index = text_index[:MAX_LEN - 1] + [end_index] tag_index = tag_index[:MAX_LEN - 1] + [0] # 转换为mask def _pad2mask(t): return 0 if t == pad_index else 1 # mask mask = [_pad2mask(t) for t in text_index] # 加入列表中 texts.append(text_index) tags.append(tag_index) masks.append(mask) # 转换为tensor texts = torch.LongTensor(texts) tags = torch.LongTensor(tags) masks = torch.tensor(masks, dtype=torch.uint8) return texts, tags, masks # 文本转化为索引(测试集) def text_to_index(dataset, word2index): # 预设索引 unk_index = word2index.get(unk_flag) pad_index = word2index.get(pad_flag) start_index = word2index.get(start_flag, 2) end_index = word2index.get(end_flag, 3) texts, masks = [], [] n_rows = len(dataset) # 行数 for row in tqdm(range(n_rows)): text = dataset.iloc[row, 1] # 文本对应的索引 text_index = [start_index] + [word2index.get(w, unk_index) for w in text] + [end_index] # 填充或截断句子至标准长度 if len(text_index) < MAX_LEN: # 句子短,填充 pad_len = MAX_LEN - len(text_index) text_index += pad_len * [pad_index] elif len(text_index) > MAX_LEN: # 句子长,截断 text_index = text_index[:MAX_LEN - 1] + [end_index] # 转换为mask def _pad2mask(t): return 0 if t == pad_index else 1 # mask mask = [_pad2mask(t) for t in text_index] masks.append(mask) # 加入列表中 texts.append(text_index) # 转换为tensor texts = torch.LongTensor(texts) masks = torch.tensor(masks, dtype=torch.uint8) return texts, masks # 训练 def train(train_dataloader, model, optimizer, epoch): for i, batch_data in enumerate(train_dataloader): texts = batch_data['texts'].to(DEVICE) tags = batch_data['tags'].to(DEVICE) masks = batch_data['masks'].to(DEVICE) loss, predictions = model(texts, tags, masks) optimizer.zero_grad() loss.backward() optimizer.step() if i % 200 == 0: micro_f1 = get_f1_score(tags, masks, predictions) print(f'Epoch:{epoch} | i:{i} | loss:{loss.item()} | Micro_F1:{micro_f1}') # 计算f1值 def get_f1_score(tags, masks, predictions): final_tags = [] final_predictions = [] tags = tags.to('cpu').data.numpy().tolist() masks = masks.to('cpu').data.numpy().tolist() for index in range(BATCH_SIZE): length = masks[index].count(1) # 未被mask的长度 final_tags += tags[index][1:length - 1] # 去掉头和尾,有效tag,最大max_len-2 final_predictions += predictions[index][1:length - 1] # 去掉头和尾,有效mask,最大max_len-2 f1 = f1_score(final_tags, final_predictions, average='micro') # 取微平均 return f1 # 执行流水线 def execute(): # 读取数据集 train_dataset, test_dataset = data_prepare() # 生成word2index字典 create_word2index() with open(WORD2INDEX_PATH, 'rb') as f: word2index = pickle.load(f) # 文本、标签转化为索引 texts, tags, masks = text_tag_to_index(train_dataset, word2index) # 数据集装载 train_dataset = NerDataset(texts, tags, masks) train_dataloader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0) # 构建模型 model = IDCNN_CRF(vocab_size=len(word2index), embedding_dim=EMBEDDING_DIM, padding_idx=1, filters=FILTERS, kernel_size=3, tagset_size=len(tag2index)).to(DEVICE) print(model) optimizer = optim.Adam(model.parameters(), lr=1e-5) print(f"GPU_NAME:{torch.cuda.get_device_name()} | Memory_Allocated:{torch.cuda.memory_allocated()}") # 模型训练 for i in range(EPOCH): train(train_dataloader, model, optimizer, i) # 保存模型 torch.save(model.state_dict(), MODEL_PATH) # 测试集预测实体标签 def test(): # 加载数据集 test_dataset = pd.read_csv(TEST_PATH, encoding='utf8') # 加载word2index文件 with open(WORD2INDEX_PATH, 'rb') as f: word2index = pickle.load(f) # 文本、标签转化为索引 texts, masks = text_to_index(test_dataset, word2index) # 装载测试集 dataset_test = NerDatasetTest(texts, masks) test_dataloader = DataLoader(dataset=dataset_test, batch_size=BATCH_SIZE, shuffle=False) # 构建模型 model = IDCNN_CRF(vocab_size=len(word2index), embedding_dim=EMBEDDING_DIM, padding_idx=1, filters=FILTERS, kernel_size=3, tagset_size=len(tag2index)).to(DEVICE) print(model) model.load_state_dict(torch.load(MODEL_PATH)) # 模型预测 model.eval() predictions_list = [] for i, batch_data in enumerate(test_dataloader): texts = batch_data['texts'].to(DEVICE) masks = batch_data['masks'].to(DEVICE) predictions = model(texts, None, masks) # print(len(texts), len(predictions)) predictions_list.extend(predictions) print(len(predictions_list)) print(len(test_dataset['text'])) # 将预测结果转换为文本格式 entity_tag_list = [] index2tag = {v: k for k, v in tag2index.items()} # 反转字典 for i, (text, predictions) in enumerate(zip(test_dataset['text'], predictions_list)): # 删除首位和最后一位 predictions.pop() predictions.pop(0) text_entity_tag = [] for c, t in zip(text, predictions): if t != 0: text_entity_tag.append(c + index2tag[t]) entity_tag_list.append(" ".join(text_entity_tag)) # 合并为str并加入列表中 print(len(entity_tag_list)) result_df = pd.DataFrame(data=entity_tag_list, columns=['result']) result_df.to_csv('./data/result_df4.csv') if __name__ == '__main__': execute()
质量分评分好玄学啊,老是说我文章字数过短。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。