赞
踩
首先,我们需要导入所需的库:
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from transformers import BertTokenizer, BertModel
然后定义一些超参数和模型结构:
- # 超参数
- MAX_LEN = 128
- BATCH_SIZE = 32
- EPOCHS = 10
- LEARNING_RATE = 0.001
-
- # 加载BERT模型和tokenizer
- tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
- bert_model = BertModel.from_pretrained('bert-base-chinese')
-
- class EntityModel(nn.Module):
- def __init__(self, bert_model, hidden_size, num_tags):
- super(EntityModel, self).__init__()
- self.bert = bert_model
- self.dropout = nn.Dropout(0.1)
- self.bilstm = nn.LSTM(bidirectional=True, input_size=hidden_size, hidden_size=hidden_size // 2, batch_first=True)
- self.fc = nn.Linear(hidden_size, num_tags)
- self.crf = CRF(num_tags)
-
- def forward(self, input_ids, attention_mask, labels=None):
- outputs = self.bert(input_ids, attention_mask=attention_mask)
- sequence_output = outputs[0]
- sequence_output = self.dropout(sequence_output)
- lstm_output, _ = self.bilstm(sequence_output)
- logits = self.fc(lstm_output)
- if labels is not None:
- loss = -self.crf(logits, labels, mask=attention_mask.byte())
- return loss
- else:
- tags = self.crf.decode(logits, mask=attention_mask.byte())
- return tags
在这里,我们使用了BERT模型和BiLSTM层来提取句子的特征,然后通过全连接层将其映射到标签空间,并使用CRF层来对标签序列进行建模。
接下来,我们需要定义一些辅助函数:
- def tokenize_and_preserve_labels(text, labels):
- tokenized_text = []
- token_labels = []
- for word, label in zip(text, labels):
- tokenized_word = tokenizer.tokenize(word)
- n_subwords = len(tokenized_word)
-
- tokenized_text.extend(tokenized_word)
- token_labels.extend([label] * n_subwords)
-
- return tokenized_text, token_labels
-
- def pad_sequences(sequences, max_len, padding_value=0):
- padded_sequences = torch.zeros((len(sequences), max_len)).long()
- for i, seq in enumerate(sequences):
- seq_len = len(seq)
- if seq_len <= max_len:
- padded_sequences[i, :seq_len] = torch.tensor(seq)
- else:
- padded_sequences[i, :] = torch.tensor(seq[:max_len])
- return padded_sequences
-
- def train(model, optimizer, train_dataloader):
- model.train()
- total_loss = 0
- for step, batch in enumerate(train_dataloader):
- input_ids = batch['input_ids'].to(device)
- attention_mask = batch['attention_mask'].to(device)
- labels = batch['labels'].to(device)
-
- loss = model(input_ids, attention_mask, labels)
- total_loss += loss.item()
-
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- avg_train_loss = total_loss / len(train_dataloader)
- return avg_train_loss
-
- def evaluate(model, eval_dataloader):
- model.eval()
- total_loss = 0
- with torch.no_grad():
- for step, batch in enumerate(eval_dataloader):
- input_ids = batch['input_ids'].to(device)
- attention_mask = batch['attention_mask'].to(device)
- labels = batch['labels'].to(device)
-
- loss = model(input_ids, attention_mask, labels)
- total_loss += loss.item()
-
- avg_eval_loss = total_loss / len(eval_dataloader)
- return avg_eval_loss
-
- def predict(model, text):
- model.eval()
- tokenized_text = tokenizer.tokenize(text)
- tokenized_text_with_labels = [(token, 'O') for token in tokenized_text]
- input_ids = torch.tensor([tokenizer.convert_tokens_to_ids(tokenized_text)])
- attention_mask = torch.ones_like(input_ids)
-
- with torch.no_grad():
- tags = model(input_ids.to(device), attention_mask.to(device))
-
- tag_labels = [id2label[tag] for tag in tags[0]]
- return list(zip(tokenized_text, tag_labels))
在这里,我们定义了一个标记化函数,用于将原始文本和标签转换为标记化的文本和标签序列。我们还定义了一个填充函数,用于对序列进行填充,以便它们可以被批处理。然后我们定义了训练、评估和预测函数。
接下来,我们需要加载数据集并将其转换为模型所需的格式:
- # 加载数据集
- train_data = []
- with open('train.txt', 'r', encoding='utf-8') as f:
- words = []
- labels = []
- for line in f:
- line = line.strip()
- if line == '':
- train_data.append((words, labels))
- words = []
- labels = []
- else:
- word, label = line.split()
- words.append(word)
- labels.append(label)
-
- if len(words) > 0:
- train_data.append((words, labels))
-
- # 将数据集转换为模型所需的格式
- train_input_ids = []
- train_attention_masks = []
- train_labels = []
-
- for words, labels in train_data:
- tokenized_text, token_labels = tokenize_and_preserve_labels(words, labels)
- input_ids = tokenizer.convert_tokens_to_ids(tokenized_text)
- attention_mask = [1] * len(input_ids)
-
- train_input_ids.append(input_ids)
- train_attention_masks.append(attention_mask)
- train_labels.append([label2id[label] for label in token_labels])
-
- train_input_ids = pad_sequences(train_input_ids, MAX_LEN)
- train_attention_masks = pad_sequences(train_attention_masks, MAX_LEN)
- train_labels = pad_sequences(train_labels, MAX_LEN, padding_value=-1)
-
- train_dataset = torch.utils.data.TensorDataset(train_input_ids, train_attention_masks, train_labels)
- train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
-
- # 同样地,我们还需要加载验证集和测试集,并将它们转换为模型所需的格式
在这里,我们加载了一个包含训练数据的文件,并将其转换为模型所需的格式。我们使用了标记化函数和填充函数来实现这一点。
最后,我们可以使用上述辅助函数和数据集来训练、评估和测试模型:
- # 训练模型
- model = EntityModel(bert_model, hidden_size=768, num_tags=len(label2id))
- model.to(device)
-
- optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
-
- for epoch in range(EPOCHS):
- avg_train_loss = train(model, optimizer, train_dataloader)
- avg_eval_loss = evaluate(model, eval_dataloader)
- print(f'Epoch {epoch + 1}: train_loss={avg_train_loss:.4f}, eval_loss={avg_eval_loss:.4f}')
-
- # 测试模型
- test_sentences = ['今天是个好日子', '我喜欢中国菜', '巴黎是一座美丽的城市']
- for sentence in test_sentences:
- tags = predict(model, sentence)
- print(tags)
在这里,我们使用Adam优化器和交叉熵损失函数来训练模型。然后,我们使用测试集来评估模型的性能,并使用模型来预测一些新句子中的实体。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。