当前位置:   article > 正文

Datawhale AI 夏令营之基于术语词典干预的机器翻译挑战赛

Datawhale AI 夏令营之基于术语词典干预的机器翻译挑战赛

导入必须的库

  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. import torch.optim as optim
  5. from torch.nn.utils import clip_grad_norm_
  6. from torchtext.data.metrics import bleu_score
  7. from torch.utils.data import Dataset, DataLoader
  8. from torchtext.data.utils import get_tokenizer
  9. from torchtext.vocab import build_vocab_from_iterator
  10. from typing import List, Tuple
  11. import jieba
  12. import random
  13. from torch.nn.utils.rnn import pad_sequence
  14. import sacrebleu
  15. import time
  16. import math

数据预处理

  1. # 定义tokenizer
  2. en_tokenizer = get_tokenizer('spacy', language='en_core_web_trf')
  3. zh_tokenizer = lambda x: list(jieba.cut(x)) # 使用jieba分词
  4. # 读取数据函数
  5. def read_data(file_path: str) -> List[str]:
  6. with open(file_path, 'r', encoding='utf-8') as f:
  7. return [line.strip() for line in f]
  8. # 数据预处理函数
  9. def preprocess_data(en_data: List[str], zh_data: List[str]) -> List[Tuple[List[str], List[str]]]:
  10. processed_data = []
  11. for en, zh in zip(en_data, zh_data):
  12. en_tokens = en_tokenizer(en.lower())[:MAX_LENGTH]
  13. zh_tokens = zh_tokenizer(zh)[:MAX_LENGTH]
  14. if en_tokens and zh_tokens: # 确保两个序列都不为空
  15. processed_data.append((en_tokens, zh_tokens))
  16. return processed_data
  17. # 构建词汇表
  18. def build_vocab(data: List[Tuple[List[str], List[str]]]):
  19. en_vocab = build_vocab_from_iterator(
  20. (en for en, _ in data),
  21. specials=['<unk>', '<pad>', '<bos>', '<eos>']
  22. )
  23. zh_vocab = build_vocab_from_iterator(
  24. (zh for _, zh in data),
  25. specials=['<unk>', '<pad>', '<bos>', '<eos>']
  26. )
  27. en_vocab.set_default_index(en_vocab['<unk>'])
  28. zh_vocab.set_default_index(zh_vocab['<unk>'])
  29. return en_vocab, zh_vocab
  30. class TranslationDataset(Dataset):
  31. def __init__(self, data: List[Tuple[List[str], List[str]]], en_vocab, zh_vocab):
  32. self.data = data
  33. self.en_vocab = en_vocab
  34. self.zh_vocab = zh_vocab
  35. def __len__(self):
  36. return len(self.data)
  37. def __getitem__(self, idx):
  38. en, zh = self.data[idx]
  39. en_indices = [self.en_vocab['<bos>']] + [self.en_vocab[token] for token in en] + [self.en_vocab['<eos>']]
  40. zh_indices = [self.zh_vocab['<bos>']] + [self.zh_vocab[token] for token in zh] + [self.zh_vocab['<eos>']]
  41. return en_indices, zh_indices
  42. def collate_fn(batch):
  43. en_batch, zh_batch = [], []
  44. for en_item, zh_item in batch:
  45. if en_item and zh_item: # 确保两个序列都不为空
  46. # print("都不为空")
  47. en_batch.append(torch.tensor(en_item))
  48. zh_batch.append(torch.tensor(zh_item))
  49. else:
  50. print("存在为空")
  51. if not en_batch or not zh_batch: # 如果整个批次为空,返回空张量
  52. return torch.tensor([]), torch.tensor([])
  53. # src_sequences = [item[0] for item in batch]
  54. # trg_sequences = [item[1] for item in batch]
  55. en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>'])
  56. zh_batch = nn.utils.rnn.pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>'])
  57. # en_batch = pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>'])
  58. # zh_batch = pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>'])
  59. return en_batch, zh_batch
  60. # 数据加载函数
  61. def load_data(train_path: str, dev_en_path: str, dev_zh_path: str, test_en_path: str):
  62. # 读取训练数据
  63. train_data = read_data(train_path)
  64. train_en, train_zh = zip(*(line.split('\t') for line in train_data))
  65. # 读取开发集和测试集
  66. dev_en = read_data(dev_en_path)
  67. dev_zh = read_data(dev_zh_path)
  68. test_en = read_data(test_en_path)
  69. # 预处理数据
  70. train_processed = preprocess_data(train_en, train_zh)
  71. dev_processed = preprocess_data(dev_en, dev_zh)
  72. test_processed = [(en_tokenizer(en.lower())[:MAX_LENGTH], []) for en in test_en if en.strip()]
  73. # 构建词汇表
  74. global en_vocab, zh_vocab
  75. en_vocab, zh_vocab = build_vocab(train_processed)
  76. # 创建数据集
  77. train_dataset = TranslationDataset(train_processed, en_vocab, zh_vocab)
  78. dev_dataset = TranslationDataset(dev_processed, en_vocab, zh_vocab)
  79. test_dataset = TranslationDataset(test_processed, en_vocab, zh_vocab)
  80. from torch.utils.data import Subset
  81. # 假设你有10000个样本,你只想用前1000个样本进行测试
  82. indices = list(range(N))
  83. train_dataset = Subset(train_dataset, indices)
  84. # 创建数据加载器
  85. train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, drop_last=True)
  86. dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, drop_last=True)
  87. test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, drop_last=True)
  88. return train_loader, dev_loader, test_loader, en_vocab, zh_vocab

模型构建

  1. class PositionalEncoding(nn.Module):
  2. def __init__(self, d_model, dropout=0.1, max_len=5000):
  3. super(PositionalEncoding, self).__init__()
  4. self.dropout = nn.Dropout(p=dropout)
  5. pe = torch.zeros(max_len, d_model)
  6. position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
  7. div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
  8. pe[:, 0::2] = torch.sin(position * div_term)
  9. pe[:, 1::2] = torch.cos(position * div_term)
  10. pe = pe.unsqueeze(0).transpose(0, 1)
  11. self.register_buffer('pe', pe)
  12. def forward(self, x):
  13. x = x + self.pe[:x.size(0), :]
  14. return self.dropout(x)
  15. class TransformerModel(nn.Module):
  16. def __init__(self, src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
  17. super(TransformerModel, self).__init__()
  18. self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
  19. self.src_embedding = nn.Embedding(len(src_vocab), d_model)
  20. self.tgt_embedding = nn.Embedding(len(tgt_vocab), d_model)
  21. self.positional_encoding = PositionalEncoding(d_model, dropout)
  22. self.fc_out = nn.Linear(d_model, len(tgt_vocab))
  23. self.src_vocab = src_vocab
  24. self.tgt_vocab = tgt_vocab
  25. self.d_model = d_model
  26. def forward(self, src, tgt):
  27. # 调整src和tgt的维度
  28. src = src.transpose(0, 1) # (seq_len, batch_size)
  29. tgt = tgt.transpose(0, 1) # (seq_len, batch_size)
  30. src_mask = self.transformer.generate_square_subsequent_mask(src.size(0)).to(src.device)
  31. tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(0)).to(tgt.device)
  32. src_padding_mask = (src == self.src_vocab['<pad>']).transpose(0, 1)
  33. tgt_padding_mask = (tgt == self.tgt_vocab['<pad>']).transpose(0, 1)
  34. src_embedded = self.positional_encoding(self.src_embedding(src) * math.sqrt(self.d_model))
  35. tgt_embedded = self.positional_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model))
  36. output = self.transformer(src_embedded, tgt_embedded,
  37. src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, src_padding_mask)
  38. return self.fc_out(output).transpose(0, 1)
  39. def initialize_model(src_vocab, tgt_vocab, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
  40. model = TransformerModel(src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
  41. return model

开始训练

  1. # 定义优化器
  2. def initialize_optimizer(model, learning_rate=0.001):
  3. return optim.Adam(model.parameters(), lr=learning_rate)
  4. # 运行时间
  5. def epoch_time(start_time, end_time):
  6. elapsed_time = end_time - start_time
  7. elapsed_mins = int(elapsed_time / 60)
  8. elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  9. return elapsed_mins, elapsed_secs
  10. def train(model, iterator, optimizer, criterion, clip):
  11. model.train()
  12. epoch_loss = 0
  13. for i, batch in enumerate(iterator):
  14. src, tgt = batch
  15. if src.numel() == 0 or tgt.numel() == 0:
  16. continue
  17. src, tgt = src.to(DEVICE), tgt.to(DEVICE)
  18. optimizer.zero_grad()
  19. output = model(src, tgt[:, :-1])
  20. output_dim = output.shape[-1]
  21. output = output.contiguous().view(-1, output_dim)
  22. tgt = tgt[:, 1:].contiguous().view(-1)
  23. loss = criterion(output, tgt)
  24. loss.backward()
  25. clip_grad_norm_(model.parameters(), clip)
  26. optimizer.step()
  27. epoch_loss += loss.item()
  28. return epoch_loss / len(iterator)
  29. def evaluate(model, iterator, criterion):
  30. model.eval()
  31. epoch_loss = 0
  32. with torch.no_grad():
  33. for i, batch in enumerate(iterator):
  34. src, tgt = batch
  35. if src.numel() == 0 or tgt.numel() == 0:
  36. continue
  37. src, tgt = src.to(DEVICE), tgt.to(DEVICE)
  38. output = model(src, tgt[:, :-1])
  39. output_dim = output.shape[-1]
  40. output = output.contiguous().view(-1, output_dim)
  41. tgt = tgt[:, 1:].contiguous().view(-1)
  42. loss = criterion(output, tgt)
  43. epoch_loss += loss.item()
  44. return epoch_loss / len(iterator)
  45. def translate_sentence(src_indexes, src_vocab, tgt_vocab, model, device, max_length=50):
  46. model.eval()
  47. src_tensor = src_indexes.unsqueeze(0).to(device) # 添加批次维度
  48. with torch.no_grad():
  49. encoder_outputs = model.transformer.encoder(model.positional_encoding(model.src_embedding(src_tensor) * math.sqrt(model.d_model)))
  50. trg_indexes = [tgt_vocab['<bos>']]
  51. for i in range(max_length):
  52. trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)
  53. with torch.no_grad():
  54. output = model(src_tensor, trg_tensor)
  55. pred_token = output.argmax(2)[:, -1].item()
  56. trg_indexes.append(pred_token)
  57. if pred_token == tgt_vocab['<eos>']:
  58. break
  59. trg_tokens = [tgt_vocab.get_itos()[i] for i in trg_indexes]
  60. return trg_tokens[1:-1] # 移除<bos>和<eos>标记
  61. def calculate_bleu(dev_loader, src_vocab, tgt_vocab, model, device):
  62. model.eval()
  63. translations = []
  64. references = []
  65. with torch.no_grad():
  66. for src, tgt in dev_loader:
  67. src = src.to(device)
  68. for sentence in src:
  69. translated = translate_sentence(sentence, src_vocab, tgt_vocab, model, device)
  70. translations.append(' '.join(translated))
  71. for reference in tgt:
  72. ref_tokens = [tgt_vocab.get_itos()[idx] for idx in reference if idx not in [tgt_vocab['<bos>'], tgt_vocab['<eos>'], tgt_vocab['<pad>']]]
  73. references.append([' '.join(ref_tokens)])
  74. bleu = sacrebleu.corpus_bleu(translations, references)
  75. return bleu.score
  76. # 主训练循环
  77. def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1, save_path = '../model/best-model_transformer.pt'):
  78. best_valid_loss = float('inf')
  79. for epoch in range(N_EPOCHS):
  80. start_time = time.time()
  81. #print(f"Starting Epoch {epoch + 1}")
  82. train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
  83. valid_loss = evaluate(model, valid_iterator, criterion)
  84. end_time = time.time()
  85. epoch_mins, epoch_secs = epoch_time(start_time, end_time)
  86. if valid_loss < best_valid_loss:
  87. best_valid_loss = valid_loss
  88. torch.save(model.state_dict(), save_path)
  89. print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
  90. print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
  91. print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
  92. # 定义常量
  93. MAX_LENGTH = 100 # 最大句子长度
  94. BATCH_SIZE = 32
  95. DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
  96. N = 148363 # 采样训练集的数量,最多148363
  97. train_path = '../dataset/train.txt'
  98. dev_en_path = '../dataset/dev_en.txt'
  99. dev_zh_path = '../dataset/dev_zh.txt'
  100. test_en_path = '../dataset/test_en.txt'
  101. train_loader, dev_loader, test_loader, en_vocab, zh_vocab = load_data(
  102. train_path, dev_en_path, dev_zh_path, test_en_path
  103. )
  104. print(f"英语词汇表大小: {len(en_vocab)}")
  105. print(f"中文词汇表大小: {len(zh_vocab)}")
  106. print(f"训练集大小: {len(train_loader.dataset)}")
  107. print(f"开发集大小: {len(dev_loader.dataset)}")
  108. print(f"测试集大小: {len(test_loader.dataset)}")
  109. # 主函数
  110. if __name__ == '__main__':
  111. # 模型参数
  112. D_MODEL = 256
  113. NHEAD = 8
  114. NUM_ENCODER_LAYERS = 3
  115. NUM_DECODER_LAYERS = 3
  116. DIM_FEEDFORWARD = 512
  117. DROPOUT = 0.1
  118. N_EPOCHS = 5
  119. CLIP = 1
  120. # 初始化模型
  121. model = initialize_model(en_vocab, zh_vocab, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT).to(DEVICE)
  122. print(f'The model has {sum(p.numel() for p in model.parameters() if p.requires_grad):,} trainable parameters')
  123. # 定义损失函数
  124. criterion = nn.CrossEntropyLoss(ignore_index=zh_vocab['<pad>'])
  125. # 初始化优化器
  126. optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
  127. # 训练模型
  128. save_path = '../model/best-model_transformer.pt'
  129. train_model(model, train_loader, dev_loader, optimizer, criterion, N_EPOCHS, CLIP, save_path=save_path)
  130. print(f"训练完成!模型已保存到:{save_path}")

在测试集运行模型
 

  1. # 加载最佳模型
  2. model.load_state_dict(torch.load('../model/best-model_transformer.pt'))
  3. save_dir = '../results/submit_task3.txt'
  4. with open(save_dir, 'w') as f:
  5. translated_sentences = []
  6. for batch in test_loader: # 遍历所有数据
  7. src, _ = batch
  8. src = src.to(DEVICE)
  9. translated = translate_sentence(src[0], en_vocab, zh_vocab, model, DEVICE) #翻译结果
  10. results = "".join(translated)
  11. f.write(results + '\n') # 将结果写入文件
  12. print(f"翻译完成,结果已保存到{save_dir}")

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号