赞
踩
(3)编写文件Continuous-RvNN-main/classifier/models/encoders/FOCN_LSTM.py,定义了一个名为 FOCN_LSTM 的 PyTorch 模型,这是一个基于注意力机制和循环神经网络的自动机器学习模型。通过对序列数据进行递归生成和注意力机制来捕捉序列中的信息,并且可以通过对惩罚项的优化来控制模型的生成过程。这是一个比较复杂的模型,用于处理序列生成等任务,具体的用途和效果可能需要根据具体的应用场景和数据进行调整和评估。文件FOCN_LSTM.py的具体实现流程如下所示。
- def __init__(self, config):
- super(FOCN_LSTM, self).__init__()
-
- self.config = config
- self.hidden_size = config["hidden_size"]
- self.cell_hidden_size = config["cell_hidden_size"]
- self.window_size = config["window_size"]
- self.stop_threshold = config["stop_threshold"]
- # self.switch_threshold = config["switch_threshold"]
- self.entropy_gamma = config["entropy_gamma"]
- self.structure_gamma = 0.01 # config["structure_gamma"]
- self.speed_gamma = config["speed_gamma"]
- self.in_dropout = config["in_dropout"]
- self.hidden_dropout = config["hidden_dropout"]
- self.recurrent_momentum = config["recurrent_momentum"]
- self.small_d = config["small_d"]
-
- self.START = nn.Parameter(T.randn(self.hidden_size))
- self.END = nn.Parameter(T.randn(self.hidden_size))
-
- if self.recurrent_momentum:
- self.past_transition_features = nn.Parameter(T.randn(self.small_d))
- self.past_non_transition_features = nn.Parameter(T.randn(self.small_d))
- self.conv_layer = Linear(self.window_size * self.hidden_size + self.small_d, self.hidden_size)
- else:
- self.conv_layer = Linear(self.window_size * self.hidden_size, self.hidden_size)
-
- self.scorer = Linear(self.hidden_size, 1)
-
- self.wcell0 = Linear(self.hidden_size, 2 * self.hidden_size,
- true_fan_in=self.hidden_size,
- true_fan_out=self.hidden_size)
- self.wcell1 = Linear(2 * self.hidden_size, 5 * self.hidden_size,
- true_fan_in=self.hidden_size,
- true_fan_out=self.hidden_size)
- # self.LN = nn.LayerNorm(self.hidden_size)
-
- self.eps = 1e-8
-
- # %%
- def sum_normalize(self, logits, dim=-1):
- return logits / T.sum(logits + self.eps, keepdim=True, dim=dim)
- def augment_sequence(self, sequence, input_mask):
- N, S, D = sequence.size()
- assert input_mask.size() == (N, S, 1)
-
- """
- AUGMENT SEQUENCE WITH START AND END TOKENS
- """
- # ADD START TOKEN
- START = self.START.view(1, 1, D).repeat(N, 1, 1)
- sequence = T.cat([START, sequence], dim=1)
- assert sequence.size() == (N, S + 1, D)
- input_mask = T.cat([T.ones(N, 1, 1).float().to(input_mask.device), input_mask], dim=1)
- assert input_mask.size() == (N, S + 1, 1)
-
- # ADD END TOKEN
- input_mask_no_end = T.cat([input_mask.clone(), T.zeros(N, 1, 1).float().to(input_mask.device)], dim=1)
- input_mask_yes_end = T.cat([T.ones(N, 1, 1).float().to(input_mask.device), input_mask.clone()], dim=1)
- END_mask = input_mask_yes_end - input_mask_no_end
- assert END_mask.size() == (N, S + 2, 1)
-
- END = self.END.view(1, 1, D).repeat(N, S + 2, 1)
- sequence = T.cat([sequence, T.zeros(N, 1, D).float().to(sequence.device)], dim=1)
- sequence = END_mask * END + (1 - END_mask) * sequence
-
- input_mask = input_mask_yes_end
- input_mask_no_start = T.cat([T.zeros(N, 1, 1).float().to(input_mask.device),
- input_mask[:, 1:, :]], dim=1)
-
- return sequence, input_mask, END_mask, input_mask_no_start, input_mask_no_end
- def compute_neighbor_probs(self, active_probs, input_mask):
- N, S, _ = input_mask.size()
- assert input_mask.size() == (N, S, 1)
- input_mask = input_mask.permute(0, 2, 1).contiguous()
- assert input_mask.size() == (N, 1, S)
-
- assert active_probs.size() == (N, S, 1)
- active_probs = active_probs.permute(0, 2, 1).contiguous()
- assert active_probs.size() == (N, 1, S)
-
- input_mask_flipped = T.flip(input_mask.clone(), dims=[2])
- active_probs_flipped = T.flip(active_probs.clone(), dims=[2])
-
- input_mask = T.stack([input_mask_flipped, input_mask], dim=1)
- active_probs = T.stack([active_probs_flipped, active_probs], dim=1)
-
- assert input_mask.size() == (N, 2, 1, S)
- assert active_probs.size() == (N, 2, 1, S)
-
- active_probs_matrix = active_probs.repeat(1, 1, S, 1) * input_mask
- assert active_probs_matrix.size() == (N, 2, S, S)
- right_probs_matrix = T.triu(active_probs_matrix, diagonal=1) # mask self and left
-
- right_probs_matrix_cumsum = T.cumsum(right_probs_matrix, dim=-1)
- assert right_probs_matrix_cumsum.size() == (N, 2, S, S)
- remainders = 1.0 - right_probs_matrix_cumsum
-
- remainders_from_left = T.cat([T.ones(N, 2, S, 1).float().to(remainders.device), remainders[:, :, :, 0:-1]],
- dim=-1)
- assert remainders_from_left.size() == (N, 2, S, S)
-
- remainders_from_left = T.max(T.zeros(N, 2, S, 1).float().to(remainders.device), remainders_from_left)
- assert remainders_from_left.size() == (N, 2, S, S)
-
- right_neighbor_probs = T.where(right_probs_matrix_cumsum > 1.0,
- remainders_from_left,
- right_probs_matrix)
-
- right_neighbor_probs = right_neighbor_probs * input_mask
-
- left_neighbor_probs = right_neighbor_probs[:, 0, :, :]
- left_neighbor_probs = T.flip(left_neighbor_probs, dims=[1, 2])
- right_neighbor_probs = right_neighbor_probs[:, 1, :, :]
-
- return left_neighbor_probs, right_neighbor_probs
- def make_window(self, sequence, left_child_probs, right_child_probs):
-
- N, S, D = sequence.size()
-
- left_children_list = []
- right_children_list = []
- left_children_k = sequence.clone()
- right_children_k = sequence.clone()
-
- for k in range(self.window_size // 2):
- left_children_k = T.matmul(left_child_probs, left_children_k)
- left_children_list = [left_children_k.clone()] + left_children_list
-
- right_children_k = T.matmul(right_child_probs, right_children_k)
- right_children_list = right_children_list + [right_children_k.clone()]
-
- windowed_sequence = left_children_list + [sequence] + right_children_list
- windowed_sequence = T.stack(windowed_sequence, dim=-2)
-
- assert windowed_sequence.size() == (N, S, self.window_size, D)
-
- return windowed_sequence
- # %%
- def initial_transform(self, sequence):
-
- N, S, D = sequence.size()
-
- contents = self.wcell0(sequence)
- contents = contents.view(N, S, 2, D)
- o = T.sigmoid(contents[:, :, 0, :])
- cell = T.tanh(contents[:, :, 1, :])
- transition = o * T.tanh(cell)
-
- return transition, cell
- def score_fn(self, windowed_sequence, transition_feats):
- N, S, W, D = windowed_sequence.size()
- windowed_sequence = windowed_sequence.view(N, S, W * D)
-
- if self.recurrent_momentum:
- windowed_sequence = T.cat([windowed_sequence, transition_feats], dim=-1)
-
- scores = self.scorer(gelu(self.conv_layer(windowed_sequence)))
-
- transition_scores = scores[:, :, 0].unsqueeze(-1)
- # reduce_probs = T.sigmoid(scores[:,:,1].unsqueeze(-1))
- no_op_scores = T.zeros_like(transition_scores).float().to(transition_scores.device)
- scores = T.cat([transition_scores, no_op_scores], dim=-1)
- scores = scores / self.temperature
- max_score = T.max(scores)
- exp_scores = T.exp(scores - max_score)
-
- return exp_scores
- def composer(self, child1, child2, cell_child1, cell_child2):
- N, S, D = child1.size()
-
- concated = T.cat([child1, child2], dim=-1)
- assert concated.size() == (N, S, 2 * D)
-
- contents = F.dropout(self.wcell1(concated), p=self.hidden_dropout, training=self.training)
- contents = contents.view(N, S, 5, D)
- gates = T.sigmoid(contents[:, :, 0:4, :])
- u = T.tanh(contents[:, :, 4, :])
- f1 = gates[..., 0, :]
- f2 = gates[..., 1, :]
- i = gates[..., 2, :]
- o = gates[..., 3, :]
-
- cell = f1 * cell_child1 + f2 * cell_child2 + i * u
- transition = o * T.tanh(cell)
-
- return transition, cell
- def compute_entropy_penalty(self, active_probs, last_token_mask):
- N, S = active_probs.size()
- active_prob_dist = self.sum_normalize(active_probs, dim=-1)
- nll_loss = - T.log(T.sum(last_token_mask * active_prob_dist, dim=1) + self.eps)
- nll_loss = nll_loss.view(N)
- return nll_loss
- def compute_speed_penalty(self, steps, input_mask):
- steps = T.max(steps, dim=1)[0]
- speed_penalty = steps.squeeze(-1) / (T.sum(input_mask.squeeze(-1), dim=1) - 2.0)
- return speed_penalty
- def encoder_block(self, sequence, input_mask):
-
- sequence, input_mask, END_mask, \
- input_mask_no_start, input_mask_no_end = self.augment_sequence(sequence, input_mask)
-
- N, S, D = sequence.size()
-
- """
- Initial Preparations
- """
- active_probs = T.ones(N, S, 1).float().to(sequence.device) * input_mask
- steps = T.zeros(N, S, 1).float().to(sequence.device)
- zeros_sequence = T.zeros(N, 1, 1).float().to(sequence.device)
- last_token_mask = T.cat([END_mask[:, 1:, :], zeros_sequence], dim=1)
- START_END_LAST_PAD_mask = input_mask_no_start * input_mask_no_end * (1.0 - last_token_mask)
- self.START_END_LAST_PAD_mask = START_END_LAST_PAD_mask
- halt_ones = T.ones(N).float().to(sequence.device)
- halt_zeros = T.zeros(N).float().to(sequence.device)
- improperly_terminated_mask = halt_ones.clone()
- update_mask = T.ones(N).float().to(sequence.device)
- left_transition_probs = T.zeros(N, S, 1).float().to(sequence.device)
-
- """
- Initial Transform
- """
- sequence, cell_sequence = self.initial_transform(sequence)
- sequence = sequence * input_mask
- cell_sequence = cell_sequence * input_mask
- """
- Start Recursion
- """
- t = 0
- while t < (S - 2):
- original_active_probs = active_probs.clone()
- original_sequence = sequence.clone()
- residual_sequence = sequence.clone()
- residual_cell_sequence = cell_sequence.clone()
- original_steps = steps.clone()
- original_cell_sequence = cell_sequence.clone()
-
- left_neighbor_probs, right_neighbor_probs \
- = self.compute_neighbor_probs(active_probs=active_probs.clone(),
- input_mask=input_mask.clone())
-
- windowed_sequence = self.make_window(sequence=sequence,
- left_child_probs=left_neighbor_probs,
- right_child_probs=right_neighbor_probs)
-
- if self.recurrent_momentum:
- transition_feats = left_transition_probs * self.past_transition_features.view(1, 1, -1) \
- + (1 - left_transition_probs) * self.past_non_transition_features.view(1, 1, -1)
- else:
- transition_feats = None
-
- exp_scores = self.score_fn(windowed_sequence, transition_feats)
- exp_transition_scores = exp_scores[:, :, 0].unsqueeze(-1)
- exp_no_op_scores = exp_scores[:, :, 1].unsqueeze(-1)
-
- exp_transition_scores = exp_transition_scores * START_END_LAST_PAD_mask
-
- if self.config["no_modulation"] is True:
- exp_scores = T.cat([exp_transition_scores,
- exp_no_op_scores], dim=-1)
- else:
- exp_left_transition_scores = T.matmul(left_neighbor_probs, exp_transition_scores)
- exp_right_transition_scores = T.matmul(right_neighbor_probs, exp_transition_scores)
-
- exp_scores = T.cat([exp_transition_scores,
- exp_no_op_scores,
- exp_left_transition_scores,
- exp_right_transition_scores], dim=-1)
-
- normalized_scores = self.sum_normalize(exp_scores, dim=-1)
- transition_probs = normalized_scores[:, :, 0].unsqueeze(-1)
- transition_probs = transition_probs * START_END_LAST_PAD_mask
-
- left_transition_probs = T.matmul(left_neighbor_probs, transition_probs)
- left_transition_probs = left_transition_probs * input_mask_no_start * input_mask_no_end
- left_sequence = windowed_sequence[:, :, self.window_size // 2 - 1, 0:self.hidden_size]
- left_cell_sequence = T.matmul(left_neighbor_probs, cell_sequence)
-
- transition_sequence, transition_cell_sequence = self.composer(child1=left_sequence,
- child2=sequence,
- cell_child1=left_cell_sequence,
- cell_child2=cell_sequence)
- transition_sequence = transition_sequence * input_mask
- transition_cell_sequence = transition_cell_sequence * input_mask
-
- tp = left_transition_probs
- sequence = tp * transition_sequence + (1 - tp) * residual_sequence
- sequence = sequence * input_mask
- cell_sequence = tp * transition_cell_sequence + (1 - tp) * residual_cell_sequence
- cell_sequence = cell_sequence * input_mask
- steps = steps + active_probs
-
- bounded_probs = transition_probs
- active_probs = active_probs * (1.0 - bounded_probs) * input_mask
-
- active_probs = T.where(update_mask.view(N, 1, 1).expand(N, S, 1) == 1.0,
- active_probs,
- original_active_probs)
-
- steps = T.where(update_mask.view(N, 1, 1).expand(N, S, 1) == 1.0,
- steps,
- original_steps)
-
- sequence = T.where(update_mask.view(N, 1, 1).expand(N, S, D) == 1.0,
- sequence,
- original_sequence)
-
- cell_sequence = T.where(update_mask.view(N, 1, 1).expand(N, S, D) == 1.0,
- cell_sequence,
- original_cell_sequence)
-
- t += 1
- discrete_active_status = T.where(active_probs > self.stop_threshold,
- T.ones_like(active_probs).to(active_probs.device),
- T.zeros_like(active_probs).to(active_probs.device))
-
- halt_condition_component = T.sum(discrete_active_status.squeeze(-1), dim=1) - 2.0
- update_mask = T.where((halt_condition_component <= 1) | (T.sum(input_mask.squeeze(-1), dim=-1) - 2.0 < t),
- halt_zeros,
- halt_ones)
-
- proper_termination_condition = T.sum(discrete_active_status * last_token_mask, dim=1).squeeze(-1)
- improperly_terminated_mask_ = T.where((halt_condition_component == 1) & (proper_termination_condition == 1),
- halt_zeros,
- halt_ones)
-
- improperly_terminated_mask = improperly_terminated_mask * improperly_terminated_mask_
-
- if T.sum(update_mask) == 0.0:
- break
-
- steps = steps * START_END_LAST_PAD_mask
- sequence = sequence * (1 - END_mask)
- active_probs = active_probs * (1 - END_mask)
- sequence = sequence[:, 1:-1, :] # remove START and END
- active_probs = active_probs[:, 1:-1, :] # remove START and END
-
- last_token_mask = END_mask[:, 2:, :]
- global_state = T.sum(sequence * last_token_mask, dim=1)
-
- assert active_probs.size(1) == sequence.size(1)
-
- entropy_penalty = self.compute_entropy_penalty(active_probs.squeeze(-1),
- last_token_mask.squeeze(-1))
-
- speed_penalty = self.compute_speed_penalty(steps, input_mask)
-
- entropy_penalty = entropy_penalty * improperly_terminated_mask
- penalty = self.entropy_gamma * entropy_penalty + self.speed_gamma * speed_penalty
-
- return sequence, global_state, penalty
- def forward(self, sequence, input_mask, **kwargs):
-
- if "temperature" in kwargs:
- self.temperature = kwargs["temperature"]
- else:
- self.temperature = 1.0
-
- self.temperature = 1.0 if self.temperature is None else self.temperature
-
- input_mask = input_mask.unsqueeze(-1)
- sequence = sequence * input_mask
-
- sequence, global_state, penalty = self.encoder_block(sequence, input_mask)
- sequence = sequence * input_mask
- return {"sequence": sequence, "penalty": penalty, "global_state": global_state}
(4)编写文件Continuous-RvNN-main/classifier/hypertrain.py,功能是使用 Hyperopt 库进行超参数搜索,在给定的搜索空间内,通过超参数搜索来寻找模型的最佳配置,以提高模型性能。超参数是机器学习模型的配置参数,它们不是通过训练得到的,而需要手动调整以获得最佳性能。文件hypertrain.py的具体实现代码如下所示。
- def blockPrint():
- sys.stdout = open(os.devnull, 'w')
-
- # Restore
- def enablePrint():
- sys.stdout = sys.__stdout__
-
- parser = get_args()
- args = parser.parse_args()
- search_space, config_processor = load_hyperconfig(args)
-
- print(search_space)
-
- hp_search_space = {}
- for key, val in search_space.items():
- hp_search_space[key] = hp.choice(key, val)
- space_keys = [k for k in search_space]
-
- hyperopt_config_path = Path("hypertune/tuned_configs/{}_{}.txt".format(args.model, args.dataset))
- hyperopt_checkpoint_path = Path("hypertune/checkpoints/{}_{}.pkl".format(args.model, args.dataset))
- Path('hypertune/checkpoints/').mkdir(parents=True, exist_ok=True)
- Path('hypertune/tuned_configs/').mkdir(parents=True, exist_ok=True)
-
- if args.hypercheckpoint:
- with open(hyperopt_checkpoint_path, "rb") as fp:
- data = pickle.load(fp)
- trials = data["trials"]
- tried_configs = data["tried_configs"]
- true_total_trials = data["true_total_trials"]
- print("\n\nCheckpoint Loaded\n\n")
- else:
- trials = Trials()
- tried_configs = {}
- true_total_trials = 0
-
-
- def generate_args_hash(args):
- hash = ""
- for key in args:
- hash += "{}".format(args[key])
- return hash
-
-
- successive_failures = 0
- max_successive_failures = 10
- failure_flag = False
-
-
- def run_wrapper(space):
- global args
- global tried_configs
- global failure_flag
- config = load_config(args)
- config["epochs"] = args.epochs
- hash = generate_args_hash(space)
-
- if hash not in tried_configs:
- print("Exploring: {}".format(space))
- for key in space:
- config[key] = space[key]
- config = config_processor(config)
-
- blockPrint()
- _, best_metric, _ = run(args, config)
- enablePrint()
-
- dev_score = compose_dev_metric(best_metric, args, config)
- tried_configs[hash] = -dev_score
- print("loss: {}".format(tried_configs[hash]))
- failure_flag = False
- return {'loss': -dev_score, 'status': STATUS_OK}
- else:
- #print("loss: {} (Skipped Trial)".format(tried_configs[hash]))
- failure_flag = True
- return {'loss': tried_configs[hash], 'status': STATUS_OK}
-
-
- max_trials = min(args.max_trials, np.prod([len(choices) for key, choices in search_space.items()]))
- save_intervals = 1
- i = len(trials.trials)
- successive_failures = 0
-
- while True:
- best = fmin(run_wrapper,
- space=hp_search_space,
- algo=hyperopt.rand.suggest,
- trials=trials,
- max_evals=len(trials.trials) + save_intervals)
-
- found_config = {}
- for key in best:
- found_config[key] = search_space[key][best[key]]
-
- if not failure_flag:
- true_total_trials += 1
- print("Best Config so far: ", found_config)
- print("Total Trials: {} out of {}".format(true_total_trials, max_trials))
- print("\n\n")
- successive_failures = 0
- display_string = ""
- for key, value in found_config.items():
- display_string += "{}: {}\n".format(key, value)
- with open(hyperopt_config_path, "w") as fp:
- fp.write(display_string)
-
- with open(hyperopt_checkpoint_path, "wb") as fp:
- pickle.dump({"trials": trials,
- "tried_configs": tried_configs,
- "true_total_trials": true_total_trials}, fp)
- else:
- successive_failures += 1
- if successive_failures % 1000 == 0:
- print("Successive failures: ", successive_failures)
-
- if true_total_trials >= max_trials:
- break
-
- if successive_failures > 100000:
- print("\n\nDiscontinuing due to too many successive failures.\n\n")
- break
对上述代码的具体说明如下所示:
(4-1)文本分类与情感分析算法:朴素贝叶斯分类器-CSDN博客
(4-2)文本分类与情感分析算法:支持向量机(SVM)-CSDN博客
(4-3)文本分类与情感分析算法:随机森林(Random Forest)-CSDN博客
(4-4)文本分类与情感分析算法:卷积神经网络(CNN)-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。