赞
踩
命名实体识别(NER)是自然语言处理的基本任务之一,通常从文本中抽取出时间,地点,组织机构,人物等实体。目前大部分命名实体识别工作都是基于预训练模型上微调实现的。本博客是基于hugging face的Transformers实现的,框架采用的是tensorflow==2.4.0。
利用transformers中的BertTokenizer构建数据特征,包括input_ids,token_type_ids,attention_mask。
def create_inputs_targets(sentences, tags, tag2id, max_len, tokenizer): dataset_dict = { "input_ids": [], "token_type_ids": [], "attention_mask": [], "tags": [] } for sentence, tag in zip(sentences, tags): input_ids = [] target_tags = [] for idx, word in enumerate(sentence): ids = tokenizer.encode(word, add_special_tokens=False) input_ids.extend(ids.ids) # 这个判断ids的长度会避免很多错误,tokenizer中会出现多个值,对应的label也要相加,例如对一个韩文token后会出现多个值 num_tokens = len(ids) target_tags.extend([tag[idx]] * num_tokens) # Pad truncate,句子前后加'[CLS]','[SEP]' input_ids = input_ids[:max_len - 2] target_tags = target_tags[:max_len - 2] input_ids = [101] + input_ids + [102] # 这里'O'对应的是16, 这里是否对应的是tag2id中的[CLS][SEP] target_tags = [tag2id['O']] + target_tags + [tag2id['O']] token_type_ids = [0] * len(input_ids) attention_mask = [1] * len(input_ids) padding_len = max_len - len(input_ids) # vocab中 [PAD]的编码是0 input_ids = input_ids + ([0] * padding_len) attention_mask = attention_mask + ([0] * padding_len) token_type_ids = token_type_ids + ([0] * padding_len) # target 这里新加一个label是应该是对应[SEP]或者[CLS],或者是'O' # taget padding 'O' target_tags = target_tags + ([tag2id['O']] * padding_len) dataset_dict["input_ids"].append(input_ids) dataset_dict["token_type_ids"].append(token_type_ids) dataset_dict["attention_mask"].append(attention_mask) dataset_dict["tags"].append(target_tags) assert len(target_tags) == max_len, f'{len(input_ids)}, {len(target_tags)}' for key in dataset_dict: dataset_dict[key] = np.array(dataset_dict[key]) x = [ dataset_dict["input_ids"], dataset_dict["token_type_ids"], dataset_dict["attention_mask"], ] y = dataset_dict["tags"] return x, y
采用bert微调,代码如下:
def create_model(num_tags, max_len): # # BERT encoder encoder = TFBertModel.from_pretrained("bert-base-chinese") # # NER Model input_ids = layers.Input(shape=(None,), dtype=tf.int32, name="input_ids") token_type_ids = layers.Input(shape=(None,), dtype=tf.int32, name="token_type_ids") attention_mask = layers.Input(shape=(None,), dtype=tf.int32, name="attention_mask") embedding = encoder( input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask )[0] embedding = layers.Dropout(0.3)(embedding) tag_logits = layers.Dense(num_tags, activation='softmax')(embedding) model = keras.Model( inputs=[input_ids, token_type_ids, attention_mask], outputs=[tag_logits], ) optimizer = keras.optimizers.Adam(lr=3e-5) loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=False, reduction=tf.keras.losses.Reduction.NONE ) def masked_ce_loss(real, pred): mask = tf.math.logical_not(tf.math.equal(real, 17)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask return tf.reduce_mean(loss_) model.compile(optimizer=optimizer, loss=masked_ce_loss, metrics=['accuracy']) return model
采用tensorflow的高阶API keras,将模型保存为h5和pb。
def main(): train_file = "./data/train_example.txt" dev_file = "./data/dev_example.txt" tag2id_path = "./output/tag2id.json" output_path = "./output/" pb_path = "./output/1" if not os.path.exists(output_path): os.makedirs(output_path) if not os.path.join(pb_path): os.makedirs(pb_path) tag2id = dict() max_len = 64 batch_size = 4 epoch = 1 # load data train_data, train_label, tag2id = load_data(train_file, tag2id) print("train data size: ", len(train_data)) print("train label size: ", len(train_label)) print("label dict: ", tag2id) dev_data, dev_label, tag2id = load_data(dev_file, tag2id) print("dev data size: ", len(dev_data)) print("dev label size: ", len(dev_label)) print("label dict: ", tag2id) # save tag2id save_dict(tag2id, tag2id_path) # label encoder train_label = label_encoder(train_label, tag2id) print("train label: ", train_label[:3]) dev_label = label_encoder(dev_label, tag2id) print("dev label: ", dev_label[:3]) # get tokenizer tokenizer = get_tokenizer() # 准备模型数据 train_x, train_y = create_inputs_targets(train_data, train_label, tag2id, max_len, tokenizer) print("train data tokenizer: ", train_x[:3]) dev_x, dev_y = create_inputs_targets(dev_data, dev_label, tag2id, max_len, tokenizer) print("dev data tokenizer: ", dev_x[:3]) # create model model = create_model(len(tag2id), max_len) model.summary() history = model.fit(train_x, train_y, epochs=epoch, verbose=1, batch_size=batch_size, validation_data=(dev_x, dev_y), validation_batch_size=batch_size ) # , validation_split=0.1 # model save if not os.path.exists(output_path): os.makedirs(output_path) model_file = os.path.join(output_path, "ner_model.h5") model.save_weights(model_file, overwrite=True) # save pb model tf.keras.models.save_model(model, pb_path, save_format="tf") pred = model.predict(train_x, batch_size=batch_size) print("pred shape: ", pred.shape)
按照上面的方式构建模型输入的特征,进行模型训练。
def predict(test_data, max_len, tag2id): tokenizer = get_tokenizer() test_x, len_list = create_infer_inputs(test_data, max_len, tokenizer) print("test data tokenizer: ", test_x[:3]) model = create_model(len(tag2id), max_len) model.load_weights("./output/ner_model.h5") pred_logits = model.predict(test_x) id2tag = {value: key for key, value in tag2id.items()} # shape [batch_size, seq_len] pred = np.argmax(pred_logits, axis=2).tolist() predict_label = [] for i in range(len(len_list)): temp = [] temp_pred = pred[i] for j in range(min(len_list[i], max_len)): temp.append(id2tag[temp_pred[j]]) predict_label.append(temp) print("predict label: ", predict_label) return predict_label
利用Flask提供http服务,方便调用。
def bert_ner_infer(): params = json.loads(request.get_data(), encoding="utf-8") text = params["text"] url = params["url"] # tensorflow serving 地址 x, len_list = create_infer_inputs(text, max_len, tokenizer) print("len_list: ", len_list) input_ids = x[0].tolist() token_type_ids = x[1].tolist() attention_mask = x[2].tolist() data = json.dumps({"signature_name": "serving_default", "inputs": {"input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": attention_mask}}) headers = {"content-type": "application/json"} result = requests.post(url, data=data, headers=headers) result = json.loads(result.text) pred_logits = result["outputs"][0] pred = np.argmax(pred_logits, axis=1).tolist() print("pred: ", pred) predict_label = [] for j in range(min(len_list[0], max_len)): predict_label.append(id2tag[pred[j]]) return_result = {"predict": predict_label} return jsonify(return_result)
以上是利用transformers实现的命名实体识别。如有问题,欢迎指正。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。