当前位置:   article > 正文

Datawhale AI 夏令营 第二期 NLP方向 Task2 学习笔记

Datawhale AI 夏令营 第二期 NLP方向 Task2 学习笔记

上期task1链接:

http://t.csdnimg.cn/VJhcX

概要:

本次Task2采用的是Seq2Seq模型,代码实现过程主要分为配置环境,数据预处理,模型训练,翻译质量评价

一.配置环境

使用以下指令安装所需库

!pip install torchtext
!pip install jieba
!pip install sacrebleu
安装spacy
https://github.com/explosion/spacy-models/releases,下载压缩包后,上传到dataset目录,

随后使用

!pip install -U pip setuptools wheel -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install -U 'spacy[cuda12x]' -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install ./dataset/en_core_web_trf-3.7.3-py3-none-any.whl

随后在代码中导入库

  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. import torch.optim as optim
  5. from torch.nn.utils import clip_grad_norm_
  6. from torchtext.data.metrics import bleu_score
  7. from torch.utils.data import Dataset, DataLoader
  8. from torchtext.data.utils import get_tokenizer
  9. from torchtext.vocab import build_vocab_from_iterator
  10. from typing import List, Tuple
  11. import jieba
  12. import random
  13. from torch.nn.utils.rnn import pad_sequence
  14. import sacrebleu
  15. import time
  16. import math

二.数据预处理

1.定义提词器

  1. # 定义tokenizer
  2. en_tokenizer = get_tokenizer('spacy', language='en_core_web_trf')
  3. zh_tokenizer = lambda x: list(jieba.cut(x)) # 使用jieba分词

2.读取数据函数

  1. # 读取数据函数
  2. def read_data(file_path: str) -> List[str]:
  3. with open(file_path, 'r', encoding='utf-8') as f:
  4. return [line.strip() for line in f]

3.构建词汇表

  1. # 构建词汇表
  2. def build_vocab(data: List[Tuple[List[str], List[str]]]):
  3. en_vocab = build_vocab_from_iterator(
  4. (en for en, _ in data),
  5. specials=['<unk>', '<pad>', '<bos>', '<eos>']
  6. )
  7. zh_vocab = build_vocab_from_iterator(
  8. (zh for _, zh in data),
  9. specials=['<unk>', '<pad>', '<bos>', '<eos>']
  10. )
  11. en_vocab.set_default_index(en_vocab['<unk>'])
  12. zh_vocab.set_default_index(zh_vocab['<unk>'])
  13. return en_vocab, zh_vocab

 4.翻译数据集

  1. class TranslationDataset(Dataset):
  2. def __init__(self, data: List[Tuple[List[str], List[str]]], en_vocab, zh_vocab):
  3. self.data = data
  4. self.en_vocab = en_vocab
  5. self.zh_vocab = zh_vocab
  6. def __len__(self):
  7. return len(self.data)
  8. def __getitem__(self, idx):
  9. en, zh = self.data[idx]
  10. en_indices = [self.en_vocab['<bos>']] + [self.en_vocab[token] for token in en] + [self.en_vocab['<eos>']]
  11. zh_indices = [self.zh_vocab['<bos>']] + [self.zh_vocab[token] for token in zh] + [self.zh_vocab['<eos>']]
  12. return en_indices, zh_indices

 5.检测是否为空

  1. def collate_fn(batch):
  2. en_batch, zh_batch = [], []
  3. for en_item, zh_item in batch:
  4. if en_item and zh_item: # 确保两个序列都不为空
  5. # print("都不为空")
  6. en_batch.append(torch.tensor(en_item))
  7. zh_batch.append(torch.tensor(zh_item))
  8. else:
  9. print("存在为空")
  10. if not en_batch or not zh_batch: # 如果整个批次为空,返回空张量
  11. return torch.tensor([]), torch.tensor([])
  12. # src_sequences = [item[0] for item in batch]
  13. # trg_sequences = [item[1] for item in batch]
  14. en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>'])
  15. zh_batch = nn.utils.rnn.pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>'])
  16. # en_batch = pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>'])
  17. # zh_batch = pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>'])
  18. return en_batch, zh_batch

6.数据加载函数

  1. # 数据加载函数
  2. def load_data(train_path: str, dev_en_path: str, dev_zh_path: str, test_en_path: str):
  3. # 读取训练数据
  4. train_data = read_data(train_path)
  5. train_en, train_zh = zip(*(line.split('\t') for line in train_data))
  6. # 读取开发集和测试集
  7. dev_en = read_data(dev_en_path)
  8. dev_zh = read_data(dev_zh_path)
  9. test_en = read_data(test_en_path)
  10. # 预处理数据
  11. train_processed = preprocess_data(train_en, train_zh)
  12. dev_processed = preprocess_data(dev_en, dev_zh)
  13. test_processed = [(en_tokenizer(en.lower())[:MAX_LENGTH], []) for en in test_en if en.strip()]
  14. # 构建词汇表
  15. global en_vocab, zh_vocab
  16. en_vocab, zh_vocab = build_vocab(train_processed)
  17. # 创建数据集
  18. train_dataset = TranslationDataset(train_processed, en_vocab, zh_vocab)
  19. dev_dataset = TranslationDataset(dev_processed, en_vocab, zh_vocab)
  20. test_dataset = TranslationDataset(test_processed, en_vocab, zh_vocab)
  21. from torch.utils.data import Subset

 注意这里的N是你要训练的数据集大小,

假如你要训练所有数据集

请注释掉  
    N = 20000
    indices = list(range(N))
    train_dataset = Subset(train_dataset, indices)

  1. # 假设你有10000个样本,你只想用前1000个样本进行测试
  2. #indices = list(range(N))
  3. #train_dataset = Subset(train_dataset, indices)
  4. #设置样本数量为5000
  5. N = 20000
  6. # 创建一个包含前5000个样本的索引列表
  7. indices = list(range(N))
  8. # 使用这些索引来创建一个新的数据集,它只包含前5000个样本
  9. train_dataset = Subset(train_dataset, indices)
  10. # 创建数据加载器
  11. train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, drop_last=True)
  12. dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, drop_last=True)
  13. test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, drop_last=True)
  14. return train_loader, dev_loader, test_loader, en_vocab, zh_vocab

三.模型构建 

1.编码器

  1. class Encoder(nn.Module):
  2. def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
  3. super().__init__()
  4. self.hid_dim = hid_dim
  5. self.n_layers = n_layers
  6. self.embedding = nn.Embedding(input_dim, emb_dim)
  7. self.gru = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True)
  8. self.dropout = nn.Dropout(dropout)
  9. def forward(self, src):
  10. # src = [batch size, src len]
  11. embedded = self.dropout(self.embedding(src))
  12. # embedded = [batch size, src len, emb dim]
  13. outputs, hidden = self.gru(embedded)
  14. # outputs = [batch size, src len, hid dim * n directions]
  15. # hidden = [n layers * n directions, batch size, hid dim]
  16. return outputs, hidden

2.注意力机制

Attention机制允许模型在解码时“关注”源句子中的不同部分。这使得翻译更加准确,尤其是对于长句子。

  1. class Attention(nn.Module):
  2. def __init__(self, hid_dim):
  3. super().__init__()
  4. self.attn = nn.Linear(hid_dim * 2, hid_dim)
  5. self.v = nn.Linear(hid_dim, 1, bias=False)
  6. def forward(self, hidden, encoder_outputs):
  7. # hidden = [1, batch size, hid dim]
  8. # encoder_outputs = [batch size, src len, hid dim]
  9. batch_size = encoder_outputs.shape[0]
  10. src_len = encoder_outputs.shape[1]
  11. hidden = hidden.repeat(src_len, 1, 1).transpose(0, 1)
  12. # hidden = [batch size, src len, hid dim]
  13. energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
  14. # energy = [batch size, src len, hid dim]
  15. attention = self.v(energy).squeeze(2)
  16. # attention = [batch size, src len]
  17. return F.softmax(attention, dim=1)

 3.解码器

  1. class Decoder(nn.Module):
  2. def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
  3. super().__init__()
  4. self.output_dim = output_dim
  5. self.hid_dim = hid_dim
  6. self.n_layers = n_layers
  7. self.attention = attention
  8. self.embedding = nn.Embedding(output_dim, emb_dim)
  9. self.gru = nn.GRU(hid_dim + emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True)
  10. self.fc_out = nn.Linear(hid_dim * 2 + emb_dim, output_dim)
  11. self.dropout = nn.Dropout(dropout)
  12. def forward(self, input, hidden, encoder_outputs):
  13. # input = [batch size, 1]
  14. # hidden = [n layers, batch size, hid dim]
  15. # encoder_outputs = [batch size, src len, hid dim]
  16. input = input.unsqueeze(1)
  17. embedded = self.dropout(self.embedding(input))
  18. # embedded = [batch size, 1, emb dim]
  19. a = self.attention(hidden[-1:], encoder_outputs)
  20. # a = [batch size, src len]
  21. a = a.unsqueeze(1)
  22. # a = [batch size, 1, src len]
  23. weighted = torch.bmm(a, encoder_outputs)
  24. # weighted = [batch size, 1, hid dim]
  25. rnn_input = torch.cat((embedded, weighted), dim=2)
  26. # rnn_input = [batch size, 1, emb dim + hid dim]
  27. output, hidden = self.gru(rnn_input, hidden)
  28. # output = [batch size, 1, hid dim]
  29. # hidden = [n layers, batch size, hid dim]
  30. embedded = embedded.squeeze(1)
  31. output = output.squeeze(1)
  32. weighted = weighted.squeeze(1)
  33. prediction = self.fc_out(torch.cat((output, weighted, embedded), dim=1))
  34. # prediction = [batch size, output dim]
  35. return prediction, hidden

4.Seq2Seq模型

  1. class Seq2Seq(nn.Module):
  2. def __init__(self, encoder, decoder, device):
  3. super().__init__()
  4. self.encoder = encoder
  5. self.decoder = decoder
  6. self.device = device
  7. def forward(self, src, trg, teacher_forcing_ratio=0.5):
  8. # src = [batch size, src len]
  9. # trg = [batch size, trg len]
  10. batch_size = src.shape[0]
  11. trg_len = trg.shape[1]
  12. trg_vocab_size = self.decoder.output_dim
  13. outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
  14. encoder_outputs, hidden = self.encoder(src)
  15. input = trg[:, 0]
  16. for t in range(1, trg_len):
  17. output, hidden = self.decoder(input, hidden, encoder_outputs)
  18. outputs[:, t] = output
  19. teacher_force = random.random() < teacher_forcing_ratio
  20. top1 = output.argmax(1)
  21. input = trg[:, t] if teacher_force else top1
  22. return outputs

5.初始化模型函数 

  1. # 初始化模型
  2. def initialize_model(input_dim, output_dim, emb_dim, hid_dim, n_layers, dropout, device):
  3. attn = Attention(hid_dim)
  4. enc = Encoder(input_dim, emb_dim, hid_dim, n_layers, dropout)
  5. dec = Decoder(output_dim, emb_dim, hid_dim, n_layers, dropout, attn)
  6. model = Seq2Seq(enc, dec, device).to(device)
  7. return model

四.训练 

 1.定义优化器

  1. # 定义优化器
  2. def initialize_optimizer(model, learning_rate=0.001):
  3. return optim.Adam(model.parameters(), lr=learning_rate)

2.运行时间计算函数

  1. # 运行时间
  2. def epoch_time(start_time, end_time):
  3. elapsed_time = end_time - start_time
  4. elapsed_mins = int(elapsed_time / 60)
  5. elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  6. return elapsed_mins, elapsed_secs

 3.训练函数

  1. def train(model, iterator, optimizer, criterion, clip):
  2. model.train()
  3. epoch_loss = 0
  4. for i, batch in enumerate(iterator):
  5. #print(f"Training batch {i}")
  6. src, trg = batch
  7. #print(f"Source shape before: {src.shape}, Target shape before: {trg.shape}")
  8. if src.numel() == 0 or trg.numel() == 0:
  9. #print("Empty batch detected, skipping...")
  10. continue # 跳过空的批次
  11. src, trg = src.to(DEVICE), trg.to(DEVICE)
  12. optimizer.zero_grad()
  13. output = model(src, trg)
  14. output_dim = output.shape[-1]
  15. output = output[:, 1:].contiguous().view(-1, output_dim)
  16. trg = trg[:, 1:].contiguous().view(-1)
  17. loss = criterion(output, trg)
  18. loss.backward()
  19. clip_grad_norm_(model.parameters(), clip)
  20. optimizer.step()
  21. epoch_loss += loss.item()
  22. print(f"Average loss for this epoch: {epoch_loss / len(iterator)}")
  23. return epoch_loss / len(iterator)

4.评估函数

  1. def evaluate(model, iterator, criterion):
  2. model.eval()
  3. epoch_loss = 0
  4. with torch.no_grad():
  5. for i, batch in enumerate(iterator):
  6. #print(f"Evaluating batch {i}")
  7. src, trg = batch
  8. if src.numel() == 0 or trg.numel() == 0:
  9. continue # 跳过空批次
  10. src, trg = src.to(DEVICE), trg.to(DEVICE)
  11. output = model(src, trg, 0) # 关闭 teacher forcing
  12. output_dim = output.shape[-1]
  13. output = output[:, 1:].contiguous().view(-1, output_dim)
  14. trg = trg[:, 1:].contiguous().view(-1)
  15. loss = criterion(output, trg)
  16. epoch_loss += loss.item()
  17. return epoch_loss / len(iterator)

5.翻译函数 

  1. # 翻译函数
  2. def translate_sentence(sentence, src_vocab, trg_vocab, model, device, max_length=50):
  3. model.eval()
  4. #print(sentence) # 打印sentence的内容
  5. if isinstance(sentence, str):
  6. #tokens = [token.lower() for token in en_tokenizer(sentence)]
  7. tokens = [token for token in en_tokenizer(sentence)]
  8. else:
  9. #tokens = [token.lower() for token in sentence]
  10. tokens = [str(token) for token in sentence]
  11. tokens = ['<bos>'] + tokens + ['<eos>']
  12. src_indexes = [src_vocab[token] for token in tokens]
  13. src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)
  14. with torch.no_grad():
  15. encoder_outputs, hidden = model.encoder(src_tensor)
  16. trg_indexes = [trg_vocab['<bos>']]
  17. for i in range(max_length):
  18. trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
  19. with torch.no_grad():
  20. output, hidden = model.decoder(trg_tensor, hidden, encoder_outputs)
  21. pred_token = output.argmax(1).item()
  22. trg_indexes.append(pred_token)
  23. if pred_token == trg_vocab['<eos>']:
  24. break
  25. trg_tokens = [trg_vocab.get_itos()[i] for i in trg_indexes]
  26. return trg_tokens[1:-1] # 移除 <bos><eos>

 6.计算BLEU分数

  1. def calculate_bleu(dev_loader, src_vocab, trg_vocab, model, device):
  2. translated_sentences = []
  3. references = []
  4. for src, trg in dev_loader:
  5. src = src.to(device)
  6. translation = translate_sentence(src, src_vocab, trg_vocab, model, device)
  7. # 将翻译结果转换为字符串
  8. translated_sentences.append(' '.join(translation))
  9. # 将每个参考翻译转换为字符串,并添加到references列表中
  10. for t in trg:
  11. ref_str = ' '.join([trg_vocab.get_itos()[idx] for idx in t.tolist() if idx not in [trg_vocab['<bos>'], trg_vocab['<eos>'], trg_vocab['<pad>']]])
  12. references.append(ref_str)
  13. print("translated_sentences",translated_sentences[:2])
  14. print("references:",references[6:8])
  15. # 使用`sacrebleu`计算BLEU分数
  16. # 注意:sacrebleu要求references是一个列表的列表,其中每个子列表包含一个或多个参考翻译
  17. bleu = sacrebleu.corpus_bleu(translated_sentences, [references])
  18. # 打印BLEU分数
  19. return bleu.score

7.主循环训练

  1. # 主训练循环
  2. def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1):
  3. best_valid_loss = float('inf')
  4. for epoch in range(N_EPOCHS):
  5. start_time = time.time()
  6. #print(f"Starting Epoch {epoch + 1}")
  7. train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
  8. valid_loss = evaluate(model, valid_iterator, criterion)
  9. end_time = time.time()
  10. epoch_mins, epoch_secs = epoch_time(start_time, end_time)
  11. if valid_loss < best_valid_loss:
  12. best_valid_loss = valid_loss
  13. torch.save(model.state_dict(), './model/best-model_test.pt')
  14. print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
  15. print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
  16. print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')

 8.训练的循环函数

  1. # 主训练循环
  2. def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1):
  3. best_valid_loss = float('inf')
  4. for epoch in range(N_EPOCHS):
  5. start_time = time.time()
  6. #print(f"Starting Epoch {epoch + 1}")
  7. train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
  8. valid_loss = evaluate(model, valid_iterator, criterion)
  9. end_time = time.time()
  10. epoch_mins, epoch_secs = epoch_time(start_time, end_time)
  11. if valid_loss < best_valid_loss:
  12. best_valid_loss = valid_loss
  13. torch.save(model.state_dict(), './model/best-model_test.pt')
  14. print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
  15. print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
  16. print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')

 9.主体1

统计并打印出数据集大小,包括分类后

  1. # 定义常量
  2. MAX_LENGTH = 100 # 最大句子长度
  3. BATCH_SIZE = 32
  4. DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
  5. N = 10000 # 采样训练集的数量
  6. train_path = './dataset/train.txt'
  7. dev_en_path = './dataset/dev_en.txt'
  8. dev_zh_path = './dataset/dev_zh.txt'
  9. test_en_path = './dataset/test_en.txt'
  10. train_loader, dev_loader, test_loader, en_vocab, zh_vocab = load_data(
  11. train_path, dev_en_path, dev_zh_path, test_en_path
  12. )
  13. print(f"英语词汇表大小: {len(en_vocab)}")
  14. print(f"中文词汇表大小: {len(zh_vocab)}")
  15. print(f"训练集大小: {len(train_loader.dataset)}")
  16. print(f"开发集大小: {len(dev_loader.dataset)}")
  17. print(f"测试集大小: {len(test_loader.dataset)}")

10主函数

注意这里的N_EPOCHS是训练轮数,如果你要调整训练轮数就修改N_EPOCHS的大小

  1. if __name__ == '__main__':
  2. N_EPOCHS = 3
  3. CLIP=1
  4. # 模型参数
  5. INPUT_DIM = len(en_vocab)
  6. OUTPUT_DIM = len(zh_vocab)
  7. EMB_DIM = 128
  8. HID_DIM = 256
  9. N_LAYERS = 2
  10. DROPOUT = 0.5
  11. # 初始化模型
  12. model = initialize_model(INPUT_DIM, OUTPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT, DEVICE)
  13. print(f'The model has {sum(p.numel() for p in model.parameters() if p.requires_grad):,} trainable parameters')
  14. # 定义损失函数
  15. criterion = nn.CrossEntropyLoss(ignore_index=zh_vocab['<pad>'])
  16. # 初始化优化器
  17. optimizer = initialize_optimizer(model)
  18. # 训练模型
  19. train_model(model, train_loader, dev_loader, optimizer, criterion, N_EPOCHS, CLIP)

 五.在开发集上评价

注意这里要自己手动建立一个文件名是model的文件夹

否则会中断报错

  1. # 加载最佳模型
  2. model.load_state_dict(torch.load('./model/best-model_test.pt'))
  3. # 计算BLEU分数
  4. bleu_score = calculate_bleu(dev_loader, en_vocab, zh_vocab, model, DEVICE)
  5. print(f'BLEU score = {bleu_score*100:.2f}')

六.在测评集上进行翻译 

  1. # 加载最佳模型
  2. #model.load_state_dict(torch.load('../model/best-model_test.pt'))
  3. #%%
  4. with open('./results/submit_test.txt', 'w') as f:
  5. translated_sentences = []
  6. for batch in test_loader: # 遍历所有数据
  7. src, _ = batch
  8. src = src.to(DEVICE)
  9. translated = translate_sentence(src[0], en_vocab, zh_vocab, model, DEVICE) #翻译结果
  10. #print(translated)
  11. results = "".join(translated)
  12. f.write(results + '\n') # 将结果写入文件

七.结论

本baseline由于模型泛化能力太弱,在验证集,测试集效果不好,得分也远低于task1的得分,

每跑一轮Val.PPL基本都在上升,最后结果基本都是重复某些文字翻译

 翻译结果如图

 

得分如图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/862373
推荐阅读
相关标签
  

闽ICP备14008679号