赞
踩
Seq2Seq模型(Sequence-to-Sequence模型),也称为编码器-解码器模型,是一种用于处理序列到序列(Sequence-to-Sequence)任务的深度学习架构,最初由Sutskever等人在2014年提出并广泛应用于机器翻译任务。以下是Seq2Seq模型的基本原理:
基本结构
编码器(Encoder):
编码器的作用是将输入序列(如源语言句子)转换为一个上下文向量(context vector)或者一个固定长度的编码器输出。
典型的编码器使用循环神经网络(RNN,如LSTM或GRU)或者Transformer编码器来处理输入序列。编码器将每个时间步的输入转换为一个隐藏状态。
编码器的最终隐藏状态(或者多个时间步的隐藏状态的组合)被认为是整个输入序列的编码表示。
解码器(Decoder):
解码器接收编码器的输出(上下文向量或者编码器的隐藏状态)作为输入。
解码器的任务是生成目标序列(如目标语言句子),其长度通常与输入序列的长度不同。
解码器通常也是一个循环神经网络(如LSTM或GRU)或者Transformer解码器。它在每个时间步生成一个输出符号,直到生成特殊的终止符号(如)为止。
训练过程
输入输出对齐:
在训练阶段,Seq2Seq模型接收一对输入输出序列对(如源语言句子和目标语言句子)作为训练数据。
编码器将源语言句子编码成上下文向量或者隐藏状态,并将其传递给解码器。
解码器从开始符号(如)开始生成目标语言序列,直到生成结束符号(如)或者达到最大输出长度为止。
损失函数:
Seq2Seq模型的训练通常使用交叉熵损失函数,该损失函数用于衡量模型生成的序列与真实目标序列之间的差异。
在训练期间,解码器的输出与真实的目标序列逐步比较,以最小化预测输出与实际目标之间的差距。
推理过程
预测:
在推理阶段(或测试阶段),输入序列通过编码器得到其上下文表示。
解码器根据编码器的上下文向量开始生成目标序列,每步都根据前一步的输出和当前的隐藏状态来预测下一个输出符号,直到生成终止符号或者达到预定的最大输出长度为止。
应用领域
Seq2Seq模型广泛应用于机器翻译、对话系统、文本摘要、语音识别等需要将一个序列转换成另一个序列的任务中。
最近的研究还将其扩展到图像描述生成和其他需要序列生成的领域。
下载en_core_web_sm语言包(注意版本)
en_core_web_sm
这是英语分词包,不同于task1中的按照英文26个字母划分。是根据单词进行划分的。能够有效改善翻译能力。
将下载好的文件拖入资源管理…/dataset文件夹中,运行
!pip install -U pip setuptools wheel -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install -U 'spacy[cuda12x]' -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install ../dataset/en_core_web_trf-3.7.3-py3-none-any.whl
!python -m spacy download en_core_web_sm
分别使用en_core_web_sm,和jieba进行中英文的分词
# 定义tokenizer
en_tokenizer = get_tokenizer('spacy', language='en_core_web_trf')
zh_tokenizer = lambda x: list(jieba.cut(x)) # 使用jieba分词
# 读取数据函数 def read_data(file_path: str) -> List[str]: with open(file_path, 'r', encoding='utf-8') as f: return [line.strip() for line in f] # 数据预处理函数 def preprocess_data(en_data: List[str], zh_data: List[str]) -> List[Tuple[List[str], List[str]]]: processed_data = [] for en, zh in zip(en_data, zh_data): en_tokens = en_tokenizer(en.lower())[:MAX_LENGTH] zh_tokens = zh_tokenizer(zh)[:MAX_LENGTH] if en_tokens and zh_tokens: # 确保两个序列都不为空 processed_data.append((en_tokens, zh_tokens)) return processed_data # 构建词汇表 def build_vocab(data: List[Tuple[List[str], List[str]]]): en_vocab = build_vocab_from_iterator( (en for en, _ in data), specials=['<unk>', '<pad>', '<bos>', '<eos>'] ) zh_vocab = build_vocab_from_iterator( (zh for _, zh in data), specials=['<unk>', '<pad>', '<bos>', '<eos>'] ) en_vocab.set_default_index(en_vocab['<unk>']) zh_vocab.set_default_index(zh_vocab['<unk>']) return en_vocab, zh_vocab
# 数据加载函数 def load_data(train_path: str, dev_en_path: str, dev_zh_path: str, test_en_path: str): # 读取训练数据 train_data = read_data(train_path) train_en, train_zh = zip(*(line.split('\t') for line in train_data)) # 读取开发集和测试集 dev_en = read_data(dev_en_path) dev_zh = read_data(dev_zh_path) test_en = read_data(test_en_path) # 预处理数据 train_processed = preprocess_data(train_en, train_zh) dev_processed = preprocess_data(dev_en, dev_zh) test_processed = [(en_tokenizer(en.lower())[:MAX_LENGTH], []) for en in test_en if en.strip()] # 构建词汇表 global en_vocab, zh_vocab en_vocab, zh_vocab = build_vocab(train_processed) # 创建数据集 train_dataset = TranslationDataset(train_processed, en_vocab, zh_vocab) dev_dataset = TranslationDataset(dev_processed, en_vocab, zh_vocab) test_dataset = TranslationDataset(test_processed, en_vocab, zh_vocab) from torch.utils.data import Subset # 假设你有10000个样本,你只想用前1000个样本进行测试 indices = list(range(N)) train_dataset = Subset(train_dataset, indices) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, drop_last=True) dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, drop_last=True) test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, drop_last=True) return train_loader, dev_loader, test_loader, en_vocab, zh_vocab
模型构建见task1的代码,encoder、decoder等。
def train(model, iterator, optimizer, criterion, clip): model.train() epoch_loss = 0 for i, batch in enumerate(iterator): #print(f"Training batch {i}") src, trg = batch #print(f"Source shape before: {src.shape}, Target shape before: {trg.shape}") if src.numel() == 0 or trg.numel() == 0: #print("Empty batch detected, skipping...") continue # 跳过空的批次 src, trg = src.to(DEVICE), trg.to(DEVICE) optimizer.zero_grad() output = model(src, trg) output_dim = output.shape[-1] output = output[:, 1:].contiguous().view(-1, output_dim) trg = trg[:, 1:].contiguous().view(-1) loss = criterion(output, trg) loss.backward() clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() print(f"Average loss for this epoch: {epoch_loss / len(iterator)}") return epoch_loss / len(iterator) def evaluate(model, iterator, criterion): model.eval() epoch_loss = 0 with torch.no_grad(): for i, batch in enumerate(iterator): #print(f"Evaluating batch {i}") src, trg = batch if src.numel() == 0 or trg.numel() == 0: continue # 跳过空批次 src, trg = src.to(DEVICE), trg.to(DEVICE) output = model(src, trg, 0) # 关闭 teacher forcing output_dim = output.shape[-1] output = output[:, 1:].contiguous().view(-1, output_dim) trg = trg[:, 1:].contiguous().view(-1) loss = criterion(output, trg) epoch_loss += loss.item() return epoch_loss / len(iterator)
# 主训练循环 def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1): best_valid_loss = float('inf') for epoch in range(N_EPOCHS): start_time = time.time() #print(f"Starting Epoch {epoch + 1}") train_loss = train(model, train_iterator, optimizer, criterion, CLIP) valid_loss = evaluate(model, valid_iterator, criterion) end_time = time.time() epoch_mins, epoch_secs = epoch_time(start_time, end_time) if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), '../model/best-model_transformer.pt') print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s') print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}') print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
def calculate_bleu(dev_loader, src_vocab, trg_vocab, model, device): translated_sentences = [] references = [] for src, trg in dev_loader: src = src.to(device) translation = translate_sentence(src, src_vocab, trg_vocab, model, device) translated_sentences.append(' '.join(translation)) # Wrap in a list for BLEU calculation # 将每个参考翻译转换为字符串,并添加到references列表中 for t in trg: ref_str = ' '.join([trg_vocab.get_itos()[idx] for idx in t.tolist() if idx not in [trg_vocab['<bos>'], trg_vocab['<eos>'], trg_vocab['<pad>']]]) references.append(ref_str) print("translated_sentences",translated_sentences[:2]) print("references:",references[6:8]) # 使用`sacrebleu`计算BLEU分数 # 注意:sacrebleu要求references是一个列表的列表,其中每个子列表包含一个或多个参考翻译 bleu = sacrebleu.corpus_bleu(translated_sentences, [references]) # 打印BLEU分数 return bleu.score
# 加载最佳模型
model.load_state_dict(torch.load('../model/best-model_test.pt'))
with open('../results/submit_test.txt', 'w') as f:
translated_sentences = []
for batch in test_loader: # 遍历所有数据
src, _ = batch
src = src.to(DEVICE)
translated = translate_sentence(src[0], en_vocab, zh_vocab, model, DEVICE) #翻译结果
results = "".join(translated)
f.write(results + '\n') # 将结果写入文件
task2对baseline对英语分词形式的进行了改动,以及对seq2seq模型相关原理的介绍和解释,和对所给训练集数据的处理加工。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。