赞
踩
命名实体识别
(Named Entity Recognition,简称NER)是指识别文本中具有特定意义的实体,主要包括人名、地名、机构名、专有名词等。
通常包括两部分:
(1)实体边界识别
(2)确定实体类别 (人名、地名、机构名或其他)。
例如:”小明在北京上班“
实体类别 | 实体 |
---|---|
地点LOC | 北京 |
人物PER | 小明 |
命名实体识别数据标注体系有很多,比如IOB1、IOB2、IOE1、IOE2、IOBES等。
先介绍下IOB2标注体系:
I表示实体内部、O表示实体外部、B表示实体开始
B/I-XXX,XXX表示具体的类别。
例如:
# IOB2标注列表如下: ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC'] O表示实体外部 B-PER表示人名开始 I-PER表示人名中间 B-ORG表示组织名开始 I-ORG表示组织名中间 B-LOC表示地名开始 I-LOC表示地名开始 # 标注好的数据,实际是上述列表中元素所在的位置 # 例如,【厦门】用【5,6】表示【B-LOC,I-LOC】 下面是一个示例: ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'] [ 0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0]
再介绍一种IOBES标注体系:
I表示实体内部,O表示实体外部,B表示实体开始,E表示实体结束,S表示一个词单独形成一个命名实体
有时也会使用M代替I,但本质是同一含义
我们可以用Precision、Recall、F1值来进行评估。
举例如下:
命名实体识别任务用到的模型是AutoModelForTokenClassification,即对每个token进行分类;
此外,关于Padding文本分类任务使用的是DataCollatorWithPadding,这里使用的是DataCollatorForTokenClassification,这个类是专门针对Token分类任务的。
我们可以在transformers的BertForTokenClassification中看到模型的源码:
class BertForTokenClassification(BertPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.bert = BertModel(config, add_pooling_layer=False) classifier_dropout = ( config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob ) self.dropout = nn.Dropout(classifier_dropout) # num_labels类 self.classifier = nn.Linear(config.hidden_size, config.num_labels) # Initialize weights and apply final processing self.post_init() @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict # 先经过bert outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # sequence_output的shape为[batch_size, seq_len, hidden_size] # 其中,hidden_size=768 # 将【last_hidden_state】输入到全连接层进行分类 sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) # logits的shape为[batch_size, seq_len, num_labels] logits = self.classifier(sequence_output) loss = None if labels is not None: # 计算交叉熵损失 loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return TokenClassifierOutput( loss=loss, # 训练最后一个epoch后的损失 logits=logits, # 每个token属于num_labels每一项的概率 hidden_states=outputs.hidden_states, attentions=outputs.attentions, )
模型使用哈工大开源的chinese-macbert-base
这次使用的数据集是人民日报数据集,任务是命名实体识别,该数据集可以在datasets官网中找到,名称是peoples_daily_ner
import evaluate
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import TrainingArguments, Trainer, DataCollatorForTokenClassification
import warnings
warnings.filterwarnings('ignore')
# 如果可以联网,直接使用load_dataset进行加载
#ner_datasets = load_dataset("peoples_daily_ner", cache_dir="./data")
# 离线加载数据
from datasets import DatasetDict
ner_datasets = DatasetDict.load_from_disk("ner_data")
# 可以看到,数据集被成功加载,而且按照标准的训练集、验证集和测试集划分好了,无需我们再额外划分。
ner_datasets
DatasetDict({
train: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 20865
})
validation: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 2319
})
test: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 4637
})
})
# 可以看到,数据已经被划分为了token,存放在tokens字段中
# 标签存放在ner_tags字段中,但是这里的标签值是数值类型的,我们还需要进一步获取各个数值的含义。
print(ner_datasets["train"][0])
{
'id': '0',
'tokens': ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'],
'ner_tags': [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0]
}
# features中包含着各个字段的信息
ner_datasets["train"].features
{
'id': Value(dtype='string', id=None),
'tokens': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
'ner_tags': Sequence(feature=ClassLabel(names=['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC'], id=None), length=-1, id=None)
}
# 在ner_tags这一feature中存在着我们需要的信息,直接将该值提取出来即可
label_list = ner_datasets["train"].features["ner_tags"].feature.names
label_list
['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
# 注意:需要魔法流量
# tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")
model_path = '/root/autodl-fs/models/chinese-macbert-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 对于已经做好tokenize的数据,要指定is_split_into_words参数为True
tokenizer(ner_datasets["train"][0]["tokens"], is_split_into_words=True)
{
'input_ids': [101, 3862, 7157, 3683, 6612, 1765, 4157, 1762, 1336, 7305, 680, 7032, 7305, 722, 7313, 4638, 3862, 1818, 511, 102],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
}
# 对于labels,可以把word_ids中为None的值设置为-100 ''' 举例如下: 【shang hai】原始的ner_tags = [5,6],5代表'B-LOC', 6代表'I-LOC' 假如经过分词后,【shang】被分为4个词,【hai】分为1个词,那么其word_ids为【None 0 0 0 0 1 None】 分词后input_ids【101 6 7 8 9 10 102】,特殊词元:101为cls、102为sep 那么其labels应该为【-100 5 5 5 5 6 -100】 ''' def process_function(examples): tokenized_exmaples = tokenizer(examples["tokens"], max_length=128, truncation=True, is_split_into_words=True) labels = [] for i, label in enumerate(examples["ner_tags"]): word_ids = tokenized_exmaples.word_ids(batch_index=i) label_ids = [] for word_id in word_ids: if word_id is None: label_ids.append(-100) else: label_ids.append(label[word_id]) labels.append(label_ids) tokenized_exmaples["labels"] = labels return tokenized_exmaples
# 定义完数据处理函数,便可以使用map方法,对数据集进行处理
# 不要忘了指定batched参数值为True,这样会加速数据处理。
tokenized_datasets = ner_datasets.map(process_function, batched=True)
tokenized_datasets
DatasetDict({
train: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 20865
})
validation: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 2319
})
test: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 4637
})
})
print(tokenized_datasets["train"][0])
{
'id': '0',
'tokens': ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'],
'ner_tags': [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0],
'input_ids': [101, 3862, 7157, 3683, 6612, 1765, 4157, 1762, 1336, 7305, 680, 7032, 7305, 722, 7313, 4638, 3862, 1818, 511, 102],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
'labels': [-100, 0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0, -100]
}
# 配置训练器之前,需要先加载模型,这里的模型需要和前面分词器的模型一致
# 这里除了要指定模型名称外,还要指定num_labels参数,值为label值的个数。
# model = AutoModelForTokenClassification.from_pretrained("hfl/chinese-macbert-base", num_labels=len(label_list))
model = AutoModelForTokenClassification.from_pretrained(model_path, num_labels=len(label_list))
model.config.num_labels # 7
# pip install seqeval
# seqeval = evaluate.load("seqeval")
# 这里本地加载
seqeval = evaluate.load("seqeval_metric.py")
seqeval
EvaluationModule(name: "seqeval", module_type: "metric", features: {'predictions': Sequence(feature=Value(dtype='string', id='label'), length=-1, id='sequence'), 'references': Sequence(feature=Value(dtype='string', id='label'), length=-1, id='sequence')}, usage: """ Produces labelling scores along with its sufficient statistics from a source against one or more references. Args: predictions: List of List of predicted labels (Estimated targets as returned by a tagger) references: List of List of reference labels (Ground truth (correct) target values) suffix: True if the IOB prefix is after type, False otherwise. default: False scheme: Specify target tagging scheme. Should be one of ["IOB1", "IOB2", "IOE1", "IOE2", "IOBES", "BILOU"]. default: None mode: Whether to count correct entity labels with incorrect I/B tags as true positives or not. If you want to only count exact matches, pass mode="strict". default: None. sample_weight: Array-like of shape (n_samples,), weights for individual samples. default: None zero_division: Which value to substitute as a metric value when encountering zero division. Should be on of 0, 1, "warn". "warn" acts as 0, but the warning is raised. Returns: 'scores': dict. Summary of the scores for overall and per type Overall: 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': F1 score, also known as balanced F-score or F-measure, Per type: 'precision': precision, 'recall': recall, 'f1': F1 score, also known as balanced F-score or F-measure Examples: >>> predictions = [['O', 'O', 'B-MISC', 'I-MISC', 'I-MISC', 'I-MISC', 'O'], ['B-PER', 'I-PER', 'O']] >>> references = [['O', 'O', 'O', 'B-MISC', 'I-MISC', 'I-MISC', 'O'], ['B-PER', 'I-PER', 'O']] >>> seqeval = evaluate.load("seqeval") >>> results = seqeval.compute(predictions=predictions, references=references) >>> print(list(results.keys())) ['MISC', 'PER', 'overall_precision', 'overall_recall', 'overall_f1', 'overall_accuracy'] >>> print(results["overall_f1"]) 0.5 >>> print(results["PER"]["f1"]) 1.0
import numpy as np # 该函数输入要求是BIO形式的标签序列,因此我们还需要将模型预测结果进行转换 # 根据真实标签不等于-100进行过滤即可 def eval_metric(pred): predictions, labels = pred predictions = np.argmax(predictions, axis=-1) # 将id转换为原始的字符串类型的标签 true_predictions = [ [label_list[p] for p, l in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] true_labels = [ [label_list[l] for p, l in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] result = seqeval.compute(predictions=true_predictions, references=true_labels, mode="strict", scheme="IOB2") return { "f1": result["overall_f1"] }
''' 下面设置训练参数, 输出文件夹为models_for_ner, 配置学习率为2e-5, 训练时batch大小为64, 验证时为batch大小128, 评估策略为训练完一个epoch之后进行评估, 模型保存策略同上, 指定最优模型的评估指标为f1, 设置训练完成后加载最优模型, 权重衰减大小为0.01, 日志记录的步长为50,即50个batch记录一次, 训练轮数为3 ''' args = TrainingArguments( learning_rate=2e-5, output_dir="models_for_ner", # 输出文件夹 per_device_train_batch_size=16, # 训练时的batch_size per_device_eval_batch_size=16, # 验证时的batch_size evaluation_strategy="epoch", # 评估策略 save_strategy="epoch", # 保存策略 metric_for_best_model="f1", # 评估指标 load_best_model_at_end=True, # 训练完成后加载最优模型 weight_decay=0.01, # 权重衰减策略 logging_steps=50, # log 打印的频率 num_train_epochs=3 # 训练epochs的次数 )
''' 设置完训练参数,就可以构建训练器了。 第一个参数指定模型, 第二个参数指定训练参数, 接下来依次指定训练数据集与验证数据集,这里验证数据集使用了测试集, 而后指定评估函数, 最后指定data_collator的值为DataCollatorForTokenClassification的实例对象。 ''' trainer = Trainer( model=model, # 预训练模型 args=args, # 训练参数 train_dataset=tokenized_datasets["train"], # 训练集 eval_dataset=tokenized_datasets["validation"], # 验证集 compute_metrics=eval_metric, # 指标评估的方法 data_collator=DataCollatorForTokenClassification(tokenizer=tokenizer) # DataCollator,填充到一个批次中最大长度,加快填充的速度 )
trainer.train()
from transformers import pipeline
# 使用pipeline进行推理,要指定id2label
model.config.id2label = {idx: label for idx, label in enumerate(label_list)}
# 如果模型是基于GPU训练的,那么推理时要指定device
# 对于NER任务,可以指定aggregation_strategy为simple,得到具体的实体的结果,而不是token的结果
ner_pipe = pipeline("token-classification", model=model, tokenizer=tokenizer, device=0, aggregation_strategy="simple")
line = '近一年多的时间内,北京市与上海市同时通车了有轨电车项目,这对于现代有轨电车这种技术在中国的发展,正是一个非常大的激励。--央视李明报道'
res = ner_pipe(line)
res
[{'entity_group': 'LOC', 'score': 0.9993908, 'word': '北 京 市', 'start': 9, 'end': 12}, {'entity_group': 'LOC', 'score': 0.9993803, 'word': '上 海 市', 'start': 13, 'end': 16}, {'entity_group': 'LOC', 'score': 0.9997332, 'word': '中 国', 'start': 42, 'end': 44}, {'entity_group': 'ORG', 'score': 0.8498471, 'word': '央 视', 'start': 61, 'end': 63}, {'entity_group': 'PER', 'score': 0.985396, 'word': '李 明', 'start': 63, 'end': 65}]
# 根据start和end取实际的结果
ner_result = {}
for r in res:
if r["entity_group"] not in ner_result:
ner_result[r["entity_group"]] = []
ner_result[r["entity_group"]].append(line[r["start"]: r["end"]])
ner_result
{'LOC': ['北京市', '上海市', '中国'], 'ORG': ['央视'], 'PER': ['李明']}
多项选择 (Multiple Choice) 任务是机器阅读理解任务中的一种形式,相较机器阅读理解定义而言,多项选择任务还需要提供答案的候选项。
即给定一个或者多个文档P,以及一个问题Q和对应的多个答案候选,输出问题Q的答案A,A是答案候选中的某一个选项。
context(上下文): 女: 怎么样?买到票了吗? 男: 火车站好多人啊,我排了整整一天的队,等排到我了,他们说没票了,要等过了年才有。 女: 没关系,过了年回来也是一样的。 男: 公司初六就上班了,我怕过了年来不及。要不今年您和我爸来上海过年吧? 女: 我这老胳膊老腿的不想折腾了。 男: 一点儿不折腾,等我帮你们买好票,你们直接过来就行。 Question(问题): 男的遇到了什么困难? Choices(选择): A. 公司不放假 B. 过年东西贵 C. 没买到车票 ✔ D. 找不到车站
如下图,我们可以把每一个选项与上下文及问题进行组合。
这里用代码说一下具体的实现
from transformers import AutoTokenizer import numpy as np model_path = r'D:\python\models\chinese-macbert-base' tokenizer = AutoTokenizer.from_pretrained(model_path) def process_function(examples): # examples, dict, keys: ["context", "quesiton", "choice", "answer"] context = [] question_choice = [] labels = [] for idx in range(len(examples["context"])): ctx = "\n".join(examples["context"][idx]) question = examples["question"][idx] choices = examples["choice"][idx] for choice in choices: context.append(ctx) question_choice.append(question + " " + choice) # 如果选择不够4个,就进行补全 if len(choices) < 4: for _ in range(4 - len(choices)): context.append(ctx) question_choice.append(question + " " + "不知道") labels.append(choices.index(examples["answer"][idx])) # 经过tokenizer后,input_ids的shape: 8 * 30 # truncation="only_first"表示仅仅截取context tokenized_examples = tokenizer(context, question_choice, truncation="only_first", max_length=30, padding="max_length") # 这里扩展一个维度:为num_choices # 扩展后,shape变为:2 * 4 * 30 tokenized_examples = {k: [v[i: i + 4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()} tokenized_examples["labels"] = labels return tokenized_examples if __name__ == '__main__': l = { 'id': [0, 1], 'context': [ ['男:你今天晚上有时间吗?我们一起去看电影吧?', '女:你喜欢恐怖片和爱情片,但是我喜欢喜剧片,科幻片一般。所以……'], ['男:足球比赛是明天上午八点开始吧?', '女:因为天气不好,比赛改到后天下午三点了。'] ], 'question': ['女的最喜欢哪种电影?', '根据对话,可以知道什么?'], 'choice': [ ['恐怖片', '爱情片', '喜剧片', '科幻片'], ['今天天气不好', '比赛时间变了', '校长忘了时间'] ], 'answer': ['喜剧片', '比赛时间变了'] } print(np.asarray(process_function(l)['input_ids']).shape) # (2, 4, 30)
然后,把[CLS]拿出做softmax,得到最终结果。
多项选择任务用到的模型是AutoModelForMultipleChoice,我们看下BertForMultipleChoice源码
class BertForMultipleChoice(BertPreTrainedModel): def __init__(self, config): super().__init__(config) self.bert = BertModel(config) classifier_dropout = ( config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob ) self.dropout = nn.Dropout(classifier_dropout) # 单项选择 self.classifier = nn.Linear(config.hidden_size, 1) # Initialize weights and apply final processing self.post_init() @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, num_choices, sequence_length")) @add_code_sample_docstrings( checkpoint=_CHECKPOINT_FOR_DOC, output_type=MultipleChoiceModelOutput, config_class=_CONFIG_FOR_DOC, ) def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple[torch.Tensor], MultipleChoiceModelOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): Labels for computing the multiple choice classification loss. Indices should be in `[0, ..., num_choices-1]` where `num_choices` is the size of the second dimension of the input tensors. (See `input_ids` above) """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict # input_ids原始的shape为[batch, num_choices, seq_len] # 多项选择的数量 num_choices = input_ids.shape[1] if input_ids is not None else inputs_embeds.shape[1] # 将input_ids的shape改成[batch * num_choices, seq_len] input_ids = input_ids.view(-1, input_ids.size(-1)) if input_ids is not None else None attention_mask = attention_mask.view(-1, attention_mask.size(-1)) if attention_mask is not None else None token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1)) if token_type_ids is not None else None position_ids = position_ids.view(-1, position_ids.size(-1)) if position_ids is not None else None inputs_embeds = ( inputs_embeds.view(-1, inputs_embeds.size(-2), inputs_embeds.size(-1)) if inputs_embeds is not None else None ) # 经过Bert outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # outputs[1]的shape为 [batch * num_choices, hidden_dim] # outputs[1]是pooled_output # 注意:pooled_output就是将[CLS]这个token再过一下全连接层+Tanh激活函数,作为该句子的特征向量 # 多项选择--用【CLS】这个token的向量,输入到全连接层 pooled_output = outputs[1] # shape为 [batch * num_choices, hidden_dim] pooled_output = self.dropout(pooled_output) # shape为 [batch * num_choices, 1] logits = self.classifier(pooled_output) # shape为 [batch , num_choices] reshaped_logits = logits.view(-1, num_choices) loss = None if labels is not None: loss_fct = CrossEntropyLoss() loss = loss_fct(reshaped_logits, labels) if not return_dict: output = (reshaped_logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return MultipleChoiceModelOutput( loss=loss, logits=reshaped_logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, )
模型仍然使用哈工大开源的chinese-macbert-base
这次使用的数据集是c3数据集,可以在huggingface官网中下载。
import evaluate
from datasets import DatasetDict
from transformers import AutoTokenizer, AutoModelForMultipleChoice, TrainingArguments, Trainer
import warnings
warnings.filterwarnings("ignore")
c3 = DatasetDict.load_from_disk("./c3/")
c3
DatasetDict({
test: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 1625
})
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 3816
})
})
c3["train"][:2]
{
'id': [0, 1],
'context': [
['男:你今天晚上有时间吗?我们一起去看电影吧?', '女:你喜欢恐怖片和爱情片,但是我喜欢喜剧片,科幻片一般。所以……'],
['男:足球比赛是明天上午八点开始吧?', '女:因为天气不好,比赛改到后天下午三点了。']
],
'question': ['女的最喜欢哪种电影?', '根据对话,可以知道什么?'],
'choice': [
['恐怖片', '爱情片', '喜剧片', '科幻片'],
['今天天气不好', '比赛时间变了', '校长忘了时间']
],
'answer': ['喜剧片', '比赛时间变了']
}
c3.pop("test")
c3
DatasetDict({
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 3816
})
})
model_path = '/root/autodl-fs/models/chinese-macbert-base' tokenizer = AutoTokenizer.from_pretrained(model_path) def process_function(examples): # examples, dict, keys: ["context", "quesiton", "choice", "answer"] # examples, 1000 context = [] question_choice = [] labels = [] for idx in range(len(examples["context"])): ctx = "\n".join(examples["context"][idx]) question = examples["question"][idx] choices = examples["choice"][idx] for choice in choices: context.append(ctx) question_choice.append(question + " " + choice) if len(choices) < 4: for _ in range(4 - len(choices)): context.append(ctx) question_choice.append(question + " " + "不知道") labels.append(choices.index(examples["answer"][idx])) tokenized_examples = tokenizer(context, question_choice, truncation="only_first", max_length=256, padding="max_length") # input_ids: 4000 * 256, tokenized_examples = {k: [v[i: i + 4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()} # 1000 * 4 *256 tokenized_examples["labels"] = labels return tokenized_examples
tokenized_c3 = c3.map(process_function, batched=True)
tokenized_c3
DatasetDict({
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 3816
})
})
import numpy as np
model = AutoModelForMultipleChoice.from_pretrained(model_path)
# 这里采用离线加载
accuracy_path = '/root/autodl-tmp/transformers-code/metrics/accuracy'
accuracy = evaluate.load(accuracy_path)
def compute_metric(pred):
predictions, labels = pred
predictions = np.argmax(predictions, axis=-1)
return accuracy.compute(predictions=predictions, references=labels)
args = TrainingArguments( output_dir="./muliple_choice", per_device_train_batch_size=4, per_gpu_eval_batch_size=4, num_train_epochs=1, logging_steps=50, evaluation_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, fp16=True ) trainer = Trainer( model=model, args=args, train_dataset=tokenized_c3["train"], eval_dataset=tokenized_c3["validation"], compute_metrics=compute_metric )
trainer.train()
from typing import Any import torch # 自定义多项选择的Pipeline class MultipleChoicePipeline: def __init__(self, model, tokenizer) -> None: self.model = model self.tokenizer = tokenizer self.device = model.device def preprocess(self, context, quesiton, choices): cs, qcs = [], [] for choice in choices: cs.append(context) qcs.append(quesiton + " " + choice) return tokenizer(cs, qcs, truncation="only_first", max_length=256, return_tensors="pt") def predict(self, inputs): inputs = {k: v.unsqueeze(0).to(self.device) for k, v in inputs.items()} return self.model(**inputs).logits def postprocess(self, logits, choices): predition = torch.argmax(logits, dim=-1).cpu().item() return choices[predition] def __call__(self, context, question, choices) -> Any: # 前处理 inputs = self.preprocess(context, question, choices) # 调用模型预测 logits = self.predict(inputs) # 后处理 result = self.postprocess(logits, choices) return result
pipe = MultipleChoicePipeline(model, tokenizer)
pipe("小明在北京上班", "小明在哪里上班?", ["北京", "上海", "河北", "海南", "河北", "海南"])
'北京'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。