当前位置:   article > 正文

NLP(15)-序列标注任务_nlp数据标注任务

nlp数据标注任务

前言

仅记录学习过程,有问题欢迎讨论

什么时候应该使用Pooling层:

  • 如果针对每个字做标注,无需;若是针对整句话做分类,则需要pooling

NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字

CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关

  • shape为 label * label

发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵

采用bert:

PERSON类实体,准确率:0.655738, 召回率: 0.462428, F1: 0.542368
LOCATION类实体,准确率:0.655462, 召回率: 0.406250, F1: 0.501603
TIME类实体,准确率:0.846847, 召回率: 0.576687, F1: 0.686127
ORGANIZATION类实体,准确率:0.448276, 召回率: 0.305882, F1: 0.363631
Macro-F1: 0.523432
Micro-F1 0.543495

采用LSTM:

PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
Macro-F1: 0.476014
Micro-F1 0.479633
`

代码

实现一个NER代码,划分每一句话的B/M/E/O

config.py

"""
配置参数信息
"""
Config = {
    "model_path": "./output/",
    "model_name": "model.pt",
    "schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json",
    "train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt",
    "valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt",
    "vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt",
    "model_type": "bert",
    # 数据标注中计算loss
    "use_crf": True,
    # 文本向量大小
    "char_dim": 20,
    # 文本长度
    "max_len": 50,
    # 词向量大小
    "hidden_size": 64,
    # 训练 轮数
    "epoch_size": 15,
    # 批量大小
    "batch_size": 25,
    # 训练集大小
    "simple_size": 300,
    # 学习率
    "lr": 0.0001,
    # dropout
    "dropout": 0.5,
    # 优化器
    "optimizer": "adam",
    # 卷积核
    "kernel_size": 3,
    # 最大池 or 平均池
    "pooling_style": "max",
    # 模型层数
    "num_layers": 3,
    "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",
    # 输出层大小
    "output_size": 9,
    # 随机数种子
    "seed": 987
}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

load.py j加载数据文件

"""
数据加载
"""
import os
import numpy as np
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer


# 获取字表集
def load_vocab(path):
    vocab = {}
    with open(path, 'r', encoding='utf-8') as f:
        for index, line in enumerate(f):
            word = line.strip()
            # 0留给padding位置,所以从1开始
            vocab[word] = index + 1
        vocab['unk'] = len(vocab) + 1
    return vocab


class DataGenerator:
    def __init__(self, data_path, config):
        self.data_path = data_path
        self.config = config
        self.schema = self.load_schema(config["schema_path"])
        if self.config["model_type"] == "bert":
            self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        # 中文的语句list
        self.sentence_list = []
        self.data = self.load_data()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            return json.load(f)

    def load_data(self):
        dataset_x = []
        dataset_y = []
        with open(self.data_path, 'r', encoding='utf-8') as f:
            # 每句话
            segments = f.read().split("\n\n")
            # 每句话字符 如: 你 0
            for segment in segments:
                sentences = []
                labels = []
                for line in segment.split("\n"):

                    if line.strip() == "":
                        continue
                    char, label = line.split()
                    sentences.append(char)
                    labels.append(self.schema[label])
                self.sentence_list.append(' '.join(sentences))
                input_id = self.sentence_to_index(sentences)
                # labels 也需要padding相同长度
                labels = self.padding(labels,-1)
                # 标签和文本组成一个样本
                dataset_x.append(input_id)
                dataset_y.append(labels)
            data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))

        return data

    # 文本预处理
    # 转化为向量
    def sentence_to_index(self, text):
        input_ids = []
        vocab = self.vocab
        if self.config["model_type"] == "bert":
            # 中文的文本转化为tokenizer的id
            input_ids = self.tokenizer.encode(text, padding="max_length", truncation=True,
                                              max_length=self.config["max_len"])
        else:
            for char in text:
                input_ids.append(vocab.get(char, vocab['unk']))
            # 填充or裁剪
            input_ids = self.padding(input_ids)
        return input_ids

    # 数据预处理 裁剪or填充
    def padding(self, input_ids, padding_dot=0):
        length = self.config["max_len"]
        padded_input_ids = input_ids
        if len(input_ids) >= length:
            return input_ids[:length]
        else:
            padded_input_ids += [padding_dot] * (length - len(input_ids))
            return padded_input_ids


# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl


if __name__ == '__main__':
    from config import Config

    dg = DataGenerator(Config["train_data_path"], Config)
    print(len(dg))
    print(dg[0])



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

evaluate.py 评估模型文件

"""
模型效果测试
"""
import re
from collections import defaultdict

import numpy as np
import torch
from loader import load_data_batch


class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        # 选择验证集合
        self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)
        # self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        # 测试模式
        self.model.eval()
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"LOCATION": defaultdict(int),
                           "TIME": defaultdict(int),
                           "PERSON": defaultdict(int),
                           "ORGANIZATION": defaultdict(int)}
        for index, batch_data in enumerate(self.dataset):
            # 取batch_size 句话
            sentences = self.dataset.dataset.sentence_list[
                        index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data
            with torch.no_grad():
                pred_results = self.model(input_id)  # 不输入labels,使用模型当前参数进行预测
            self.write_stats(labels, pred_results, sentences)
            self.show_stats()
        return

    def write_stats(self, labels, pred_results, sentences):
        assert len(labels) == len(pred_results) == len(sentences)
        if not self.config['use_crf']:
            pred_results = torch.argmax(pred_results, dim=-1)
        for true_label, pred_label, sentence in zip(labels, pred_results, sentences):
            if not self.config["use_crf"]:
                pred_label = pred_label.cpu().detach().tolist()
            true_label = true_label.cpu().detach().tolist()
            true_entities = self.decode(sentence, true_label)
            pred_entities = self.decode(sentence, pred_label)
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
                self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])
                self.stats_dict[key]["样本实体数"] += len(true_entities[key])
                self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])
        return

    def show_stats(self):
        F1_scores = []
        for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])
            recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])
            F1 = (2 * precision * recall) / (precision + recall + 1e-5)
            F1_scores.append(F1)
            self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))
        self.logger.info("Macro-F1: %f" % np.mean(F1_scores))
        correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        micro_precision = correct_pred / (total_pred + 1e-5)
        micro_recall = correct_pred / (true_enti + 1e-5)
        micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)
        self.logger.info("Micro-F1 %f" % micro_f1)
        self.logger.info("--------------------")
        return

    # 相当于截取对应的句子
    def decode(self, sentence, labels):
        labels = "".join([str(x) for x in labels[:len(sentence)]])
        results = defaultdict(list)
        for location in re.finditer("(04+)", labels):
            s, e = location.span()
            results["LOCATION"].append(sentence[s:e])
        for location in re.finditer("(15+)", labels):
            s, e = location.span()
            results["ORGANIZATION"].append(sentence[s:e])
        for location in re.finditer("(26+)", labels):
            s, e = location.span()
            results["PERSON"].append(sentence[s:e])
        for location in re.finditer("(37+)", labels):
            s, e = location.span()
            results["TIME"].append(sentence[s:e])
        return results

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

model.py

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
from torchcrf import CRF

"""
建立网络模型结构
"""


class TorchModel(nn.Module):
    def __init__(self, config):
        super(TorchModel, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        output_size = config["output_size"]
        self.model_type = config["model_type"]
        num_layers = config["num_layers"]
        # self.use_bert = config["use_bert"]
        self.use_crf = config["use_crf"]

        self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
        if self.model_type == 'rnn':
            self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                  batch_first=True)
        elif self.model_type == 'lstm':
            # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
            self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
            hidden_size = hidden_size * 2

        elif self.model_type == 'bert':
            self.encoder = BertModel.from_pretrained(config["bert_model_path"])
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        elif self.model_type == 'cnn':
            self.encoder = CNN(config)
        elif self.model_type == "gated_cnn":
            self.encoder = GatedCNN(config)
        elif self.model_type == "bert_lstm":
            self.encoder = BertLSTM(config)
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        self.classify = nn.Linear(hidden_size, output_size)
        self.pooling_style = config["pooling_style"]
        self.crf_layer = CRF(output_size, batch_first=True)

        self.loss = nn.functional.cross_entropy  # loss采用交叉熵损失

    def forward(self, x, y=None):
        if self.model_type == 'bert':
            # 输入x为[batch_size, seq_len]
            # bert返回的结果是 (sequence_output, pooler_output)
            # sequence_output:batch_size, max_len, hidden_size
            # pooler_output:batch_size, hidden_size
            x = self.encoder(x)[0]
        else:
            x = self.emb(x)
            x = self.encoder(x)
        # 判断x是否是tuple
        if isinstance(x, tuple):
            x = x[0]

        # # 池化层
        # if self.pooling_style == "max":
        #     # shape[1]代表列数,shape是行和列数构成的元组
        #     self.pooling_style = nn.MaxPool1d(x.shape[1])
        # elif self.pooling_style == "avg":
        #     self.pooling_style = nn.AvgPool1d(x.shape[1])
        # x = self.pooling_style(x.transpose(1, 2)).squeeze()

        y_pred = self.classify(x)
        if y is not None:
            # 是否使用crf:
            if self.use_crf:
                mask = y.gt(-1)
                return - self.crf_layer(y_pred, y, mask, reduction="mean")
            else:
                # (number, class_num), (number)
                return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            if self.use_crf:
                return self.crf_layer.decode(y_pred)
            else:
                return y_pred

# 优化器的选择
def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["lr"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


# 定义CNN模型
class CNN(nn.Module):
    def __init__(self, config):
        super(CNN, self).__init__()
        hidden_size = config["hidden_size"]
        kernel_size = config["kernel_size"]
        pad = int((kernel_size - 1) / 2)
        self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)

    def forward(self, x):  # x : (batch_size, max_len, embeding_size)
        return self.cnn(x.transpose(1, 2)).transpose(1, 2)


# 定义GatedCNN模型
class GatedCNN(nn.Module):
    def __init__(self, config):
        super(GatedCNN, self).__init__()
        self.cnn = CNN(config)
        self.gate = CNN(config)

    # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
    def forward(self, x):
        a = self.cnn(x)
        b = self.gate(x)
        b = torch.sigmoid(b)
        return torch.mul(a, b)


# 定义BERT-LSTM模型
class BertLSTM(nn.Module):
    def __init__(self, config):
        super(BertLSTM, self).__init__()
        self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
        self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)

    def forward(self, x):
        x = self.bert(x)[0]
        x, _ = self.rnn(x)
        return x

# if __name__ == "__main__":
#     from config import Config
#
#     Config["output_size"] = 2
#     Config["vocab_size"] = 20
#     Config["max_length"] = 5
#     Config["model_type"] = "bert"
#     Config["use_bert"] = True
#     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
#     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
#     # sequence_output, pooler_output = model(x)
#     # print(x[1], type(x[2]), len(x[2]))
#
#     model = TorchModel(Config)
#     label = torch.LongTensor([0,1])
#     print(model(x, label))

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153

代码二

添加标点符号任务:
config

"""
配置参数信息
"""
Config = {
    "model_path": "./output/",
    "model_name": "model.pt",
    "schema_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\schema.json",
    "train_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\train_corpus",
    "valid_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\valid_corpus",
    "vocab_path": r"D:\NLP\video\第七周\data\vocab.txt",
    "model_type": "lstm",
    # 数据标注中计算loss
    "use_crf": False,
    # 文本向量大小
    "char_dim": 20,
    # 文本长度
    "max_len": 50,
    # 词向量大小
    "hidden_size": 64,
    # 训练 轮数
    "epoch_size": 15,
    # 批量大小
    "batch_size": 25,
    # 训练集大小
    "simple_size": 300,
    # 学习率
    "lr": 0.001,
    # dropout
    "dropout": 0.5,
    # 优化器
    "optimizer": "adam",
    # 卷积核
    "kernel_size": 3,
    # 最大池 or 平均池
    "pooling_style": "max",
    # 模型层数
    "num_layers": 2,
    "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",
    # 输出层大小
    "output_size": 4,
    # 随机数种子
    "seed": 987
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

evaluate

"""
模型效果测试
"""
import re
from collections import defaultdict

import numpy as np
import torch
from loader import load_data_batch


class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        # 选择验证集合
        self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)
        self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果
        self.schema = self.dataset.dataset.schema
        self.index_to_label = dict((y, x) for x, y in self.schema.items())

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = dict(zip(self.schema.keys(), [defaultdict(int) for i in range(len(self.schema))]))
        self.model.eval()
        for index, batch_data in enumerate(self.dataset):
            # 句子 batch_size 化
            sentence = self.dataset.dataset.sentence_list[
                       index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]
            input_id, labels = batch_data
            with torch.no_grad():
                pred_results = self.model(input_id)
                self.write_stats(labels, pred_results, sentence)
            self.show_stats()
        return

    # 计算准确率的
    def write_stats(self, ture_labels, pred_results, sentences):
        assert len(ture_labels) == len(pred_results) == len(sentences), print(len(ture_labels), len(pred_results), len(sentences))
        if not self.config["use_crf"]:
            # 获取最大下标值
            pred_results = torch.argmax(pred_results, dim=-1)
        for true_label, pred_result, sentence in zip(ture_labels, pred_results, sentences):
            if not self.config["use_crf"]:
                # 和sentence 等长
                pred_result = pred_result.cpu().detach().tolist()[:len(sentence)]
            # 和sentence 等长
            true_label = true_label.cpu().detach().tolist()[:len(sentence)]
            for pred, index in zip(pred_result, true_label):
                if index == -1:
                    continue
                key = self.index_to_label[index]
                self.stats_dict[key]["correct"] += 1 if pred == index else 0
                self.stats_dict[key]["total"] += 1
        return

    # 显示准确率
    def show_stats(self):
        total = []
        for key in self.schema:
            acc = self.stats_dict[key]["correct"] / (1e-5 + self.stats_dict[key]["total"])
            self.logger.info("符号%s预测准确率:%f" % (key, acc))
            total.append(acc)
        self.logger.info("平均acc:%f" % np.mean(total))
        self.logger.info("--------------------")
        return

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

load

"""
数据加载
"""
import os
import numpy as np
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer


# 获取字表集
def load_vocab(path):
    vocab = {}
    with open(path, 'r', encoding='utf-8') as f:
        for index, line in enumerate(f):
            word = line.strip()
            # 0留给padding位置,所以从1开始
            vocab[word] = index + 1
        vocab['unk'] = len(vocab) + 1
    return vocab


class DataGenerator:
    def __init__(self, data_path, config):
        self.data_path = data_path
        self.config = config
        self.schema = self.load_schema(config["schema_path"])
        self.max_len = config["max_len"]
        if self.config["model_type"] == "bert":
            self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        # 中文的语句list
        self.sentence_list = []
        self.load_data()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            return json.load(f)

    def load_data(self):
        self.data = []
        with open(self.data_path, 'r', encoding='utf-8') as f:
            for line in f:
                if len(line) > self.max_len:
                    for i in range(len(line) // self.max_len):
                        input_id, label = self.process_sentence(line[i * self.max_len:(i + 1) * self.max_len])
                        self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)])

                else:
                    input_id, label = self.process_sentence(line)
                    self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)])
        return

    # 处理文本 输出为不带target标点的文本 + label(目标是当前char的下一个char是否为target dot)
    def process_sentence(self, sentence):
        sentence_without_target = []
        labels = []
        # sentence[:-1] 因为取了 next_char
        for index, char in enumerate(sentence[:-1]):
            if char in self.schema:
                continue
            # 不是target 标点
            sentence_without_target.append(char)
            next_char = sentence[index + 1]
            if next_char in self.schema:
                labels.append(self.schema[next_char])
            else:
                labels.append(0)
        # 向量化
        input_id = self.sentence_to_index(sentence_without_target)
        labels = self.padding(labels, -1)
        # 保存一下原始的句子
        self.sentence_list.append(' '.join(sentence_without_target))
        return input_id, labels

    # 文本预处理
    # 转化为向量
    def sentence_to_index(self, text):
        input_ids = []
        vocab = self.vocab

        for char in text:
            input_ids.append(vocab.get(char, vocab['unk']))
        # 填充or裁剪
        input_ids = self.padding(input_ids)
        return input_ids

    # 数据预处理 裁剪or填充
    def padding(self, input_ids, padding_dot=0):
        length = self.config["max_len"]
        padded_input_ids = input_ids
        if len(input_ids) >= length:
            return input_ids[:length]
        else:
            padded_input_ids += [padding_dot] * (length - len(input_ids))
            return padded_input_ids


# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl


if __name__ == '__main__':
    from config import Config

    dg = DataGenerator(Config["train_data_path"], Config)
    print(len(dg))
    print(dg[0])

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

main

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from loader import load_data_batch
from evaluate import Evaluator

# [DEBUG, INFO, WARNING, ERROR, CRITICAL]


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""
# 通过设置随机种子来复现上一次的结果(避免随机性)
seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)


def main(config):
    # 保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    # 加载数据
    dataset = load_data_batch(config["train_data_path"], config)
    # 加载模型
    model = TorchModel(config)
    # 是否使用gpu
    if torch.cuda.is_available():
        logger.info("gpu可以使用,迁移模型至gpu")
        model.cuda()
    # 选择优化器
    optim = choose_optimizer(config, model)
    # 加载效果测试类
    evaluator = Evaluator(config, model, logger)
    for epoch in range(config["epoch_size"]):
        epoch += 1
        logger.info("epoch %d begin" % epoch)
        epoch_loss = []
        # 训练模型
        model.train()
        for index, batch_data in enumerate(dataset):
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            # x, y = dataiter
            # 反向传播
            optim.zero_grad()
            x, y = batch_data     # 输入变化时这里需要修改,比如多输入,多输出的情况
            # 计算梯度
            loss = model(x, y)
            # 梯度更新
            loss.backward()
            # 优化器更新模型
            optim.step()
            # 记录损失
            epoch_loss.append(loss.item())
        logger.info("epoch average loss: %f" % np.mean(epoch_loss))
        # 测试模型效果
        acc = evaluator.eval(epoch)
    # 可以用model_type model_path epoch 三个参数来保存模型
    # model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))
    # torch.save(model.state_dict(), model_path)  # 保存模型权重
    return


if __name__ == "__main__":
    main(Config)

    # for model in ["cnn"]:
    #     Config["model_type"] = model
    #     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])

    # 对比所有模型
    # 中间日志可以关掉,避免输出过多信息
    # 超参数的网格搜索
    # for model in ["gated_cnn"]:
    #     Config["model_type"] = model
    #     for lr in [1e-3, 1e-4]:
    #         Config["learning_rate"] = lr
    #         for hidden_size in [128]:
    #             Config["hidden_size"] = hidden_size
    #             for batch_size in [64, 128]:
    #                 Config["batch_size"] = batch_size
    #                 for pooling_style in ["avg"]:
    #                     Config["pooling_style"] = pooling_style
    # 可以把输出放入文件中 便于查看
    #                     print("最后一轮准确率:", main(Config), "当前配置:", Config)



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

model

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from transformers import BertModel
from torchcrf import CRF

"""
建立网络模型结构
"""


class TorchModel(nn.Module):
    def __init__(self, config):
        super(TorchModel, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        output_size = config["output_size"]
        self.model_type = config["model_type"]
        num_layers = config["num_layers"]
        # self.use_bert = config["use_bert"]
        self.use_crf = config["use_crf"]

        self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
        if self.model_type == 'rnn':
            self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                  batch_first=True)
        elif self.model_type == 'lstm':
            # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
            self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
            hidden_size = hidden_size * 2

        elif self.model_type == 'bert':
            self.encoder = BertModel.from_pretrained(config["bert_model_path"])
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        elif self.model_type == 'cnn':
            self.encoder = CNN(config)
        elif self.model_type == "gated_cnn":
            self.encoder = GatedCNN(config)
        elif self.model_type == "bert_lstm":
            self.encoder = BertLSTM(config)
            # 需要使用预训练模型的hidden_size
            hidden_size = self.encoder.config.hidden_size
        self.classify = nn.Linear(hidden_size, output_size)
        self.pooling_style = config["pooling_style"]
        self.crf_layer = CRF(output_size, batch_first=True)
        # 添加() 使用torch.nn
        self.loss = torch.nn.CrossEntropyLoss(ignore_index=-1)  # loss采用交叉熵损失

    def forward(self, x, y=None):
        if self.model_type == 'bert':
            # 输入x为[batch_size, seq_len]
            # bert返回的结果是 (sequence_output, pooler_output)
            # sequence_output:batch_size, max_len, hidden_size
            # pooler_output:batch_size, hidden_size
            x = self.encoder(x)[0]
        else:
            x = self.emb(x)
            x = self.encoder(x)
        # 判断x是否是tuple
        if isinstance(x, tuple):
            x = x[0]

        # # 池化层
        # if not self.use_crf:
        #     if self.pooling_style == "max":
        #         # shape[1]代表列数,shape是行和列数构成的元组
        #         self.pooling_style = nn.MaxPool1d(x.shape[1])
        #     elif self.pooling_style == "avg":
        #         self.pooling_style = nn.AvgPool1d(x.shape[1])
        #     x = self.pooling_style(x.transpose(1, 2)).squeeze()

        y_pred = self.classify(x)
        if y is not None:
            # 是否使用crf:
            if self.use_crf:
                mask = y.gt(-1)
                return - self.crf_layer(y_pred, y, mask, reduction="mean")
            else:
                # (number, class_num), (number)
                # 返回的y_pred是(batch_size, max_len, class_num)
                # y是(batch_size, max_len)
                # 所以计算loss 应该 展开为y_pred[batch_size*max_len,class_num] y[batch_size*max_len]
                return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            if self.use_crf:
                return self.crf_layer.decode(y_pred)
            else:
                return y_pred

# 优化器的选择
def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["lr"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


# 定义CNN模型
class CNN(nn.Module):
    def __init__(self, config):
        super(CNN, self).__init__()
        hidden_size = config["hidden_size"]
        kernel_size = config["kernel_size"]
        pad = int((kernel_size - 1) / 2)
        self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)

    def forward(self, x):  # x : (batch_size, max_len, embeding_size)
        return self.cnn(x.transpose(1, 2)).transpose(1, 2)


# 定义GatedCNN模型
class GatedCNN(nn.Module):
    def __init__(self, config):
        super(GatedCNN, self).__init__()
        self.cnn = CNN(config)
        self.gate = CNN(config)

    # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
    def forward(self, x):
        a = self.cnn(x)
        b = self.gate(x)
        b = torch.sigmoid(b)
        return torch.mul(a, b)


# 定义BERT-LSTM模型
class BertLSTM(nn.Module):
    def __init__(self, config):
        super(BertLSTM, self).__init__()
        self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
        self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)

    def forward(self, x):
        x = self.bert(x)[0]
        x, _ = self.rnn(x)
        return x

# if __name__ == "__main__":
#     from config import Config
#
#     Config["output_size"] = 2
#     Config["vocab_size"] = 20
#     Config["max_length"] = 5
#     Config["model_type"] = "bert"
#     Config["use_bert"] = True
#     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
#     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
#     # sequence_output, pooler_output = model(x)
#     # print(x[1], type(x[2]), len(x[2]))
#
#     model = TorchModel(Config)
#     label = torch.LongTensor([0,1])
#     print(model(x, label))

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157

predict:

# -*- coding: utf-8 -*-
import torch
import re
import json
import numpy as np
from collections import defaultdict
from config import Config
from model import TorchModel
"""
模型效果测试
"""

class SentenceLabel:
    def __init__(self, config, model_path):
        self.config = config
        self.schema = self.load_schema(config["schema_path"])
        self.index_to_sign = dict((y, x) for x, y in self.schema.items())
        self.vocab = self.load_vocab(config["vocab_path"])
        self.model = TorchModel(config)
        self.model.load_state_dict(torch.load(model_path))
        self.model.eval()
        print("模型加载完毕!")

    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            schema = json.load(f)
            self.config["class_num"] = len(schema)
        return schema

    # 加载字表或词表
    def load_vocab(self, vocab_path):
        token_dict = {}
        with open(vocab_path, encoding="utf8") as f:
            for index, line in enumerate(f):
                token = line.strip()
                token_dict[token] = index + 1  # 0留给padding位置,所以从1开始
        self.config["vocab_size"] = len(token_dict)
        return token_dict

    def predict(self, sentence):
        input_id = []
        for char in sentence:
            input_id.append(self.vocab.get(char, self.vocab["<UNK>"]))
        with torch.no_grad():
            res = self.model(torch.LongTensor([input_id]))[0]
            res = torch.argmax(res, dim=-1)
        labeled_sentence = ""
        for char, label_index in zip(sentence, res):
            labeled_sentence += char + self.index_to_sign[int(label_index)]
        return labeled_sentence

if __name__ == "__main__":
    sl = SentenceLabel(Config, "model_output/epoch_10.pth")

    sentence = "客厅的颜色比较稳重但不沉重相反很好的表现了欧式的感觉给人高雅的味道"
    res = sl.predict(sentence)
    print(res)

    sentence = "双子座的健康运势也呈上升的趋势但下半月有所回落"
    res = sl.predict(sentence)
    print(res)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/784241
推荐阅读
相关标签
  

闽ICP备14008679号