赞
踩
cluster模块封装了在集群层面执行的任务,如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态,如果产生新的集群状态主节点会将集群状态广播给其他节点。
集群状态封装在clusterState中,支持增量同步
提交集群任务的主要时机有以下几种:
提交集群任务入口在ClusterService的submitStateUpdateTask方法,第一个参数是事件来源,第二个参数是要执行的具体任务
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener> void submitStateUpdateTask(String source, T updateTask) { submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask); } public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) { submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor); } public <T> void submitStateUpdateTasks(final String source, final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor<T> executor) { masterService.submitStateUpdateTasks(source, tasks, config, executor); }
最有代表性的任务是ClusterStateUpdateTask,它实现了ClusterStateTaskConfig、ClusterStateTaskExecutor
public abstract class ClusterStateUpdateTask
implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
ClusterStateTaskConfig包含了任务的配置信息和优先级
TimeValue timeout();
Priority priority();
ClusterStateTaskExecutor主要是定义要执行的任务,最主要的方法就是execute方法
ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
任务执行时会传入当前集群状态,任务运行过程中如果产生新的集群状态就返回新的集群状态,如果没有就返回原来的集群状态
ClusterStateTaskListener主要是提交任务后的回调处理
/** * A callback called when execute fails. */ void onFailure(String source, Exception e); /** * called when the task was rejected because the local node is no longer master. * Used only for tasks submitted to {@link MasterService}. */ default void onNoLongerMaster(String source) { onFailure(source, new NotMasterException("no longer master. source: [" + source + "]")); } /** * Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed * properly by all listeners. */ default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { }
MasterService主要负责集群任务管理和运行,只有主节点会提交集群任务到内部队列,并运行队列中的任务
public <T> void submitStateUpdateTasks(final String source, final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor<T> executor) { if (!lifecycle.started()) { return; } final ThreadContext threadContext = threadPool.getThreadContext(); final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.markAsSystemContext(); //封装任务 List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream() .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) .collect(Collectors.toList()); //提交任务 taskBatcher.submitTasks(safeTasks, config.timeout()); } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } } }
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException { if (tasks.isEmpty()) { return; } final BatchedTask firstTask = tasks.get(0); assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; // convert to an identity map to check for dups based on task identity //根据任务标识检查重复数据 final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap( BatchedTask::getTask, Function.identity(), (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, IdentityHashMap::new)); synchronized (tasksPerBatchingKey) { //添加相同batchingKey的任务,返回已存在batchingKey的任务 LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey, k -> new LinkedHashSet<>(tasks.size())); //检查是否存在相同batchingKey的任务 for (BatchedTask existing : existingTasks) { // check that there won't be two tasks with the same identity for the same batching key BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); if (duplicateTask != null) { throw new IllegalStateException("task [" + duplicateTask.describeTasks( Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued"); } } existingTasks.addAll(tasks); } //执行任务 if (timeout != null) { threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout)); } else { threadExecutor.execute(firstTask); } }
这里有去重逻辑,拥有相同ClusterStateTaskExecutor对象实例的任务只会执行一次,然后对于其他相同的实例直接赋值相同的执行结果。区分重复任务的方式时通过定义的任务本身,去重的方式不是将重复的数据删除,而是在执行完任务后赋予重复任务相同的结果。
ClusterStateTaskExecutor相同有两种情况可能是提交的任务本身重复,还有就是之前提交的任务已存在,但是尚未执行此时提交相同的任务就会保存到对应的列表中,只会执行一次
任务会被封装到UpdateTask中
class UpdateTask extends BatchedTask {
final ClusterStateTaskListener listener;
UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}
@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
提交到线程池运行调用run方法
@Override public void run() { //运行还没处理的任务 runIfNotProcessed(this); } void runIfNotProcessed(BatchedTask updateTask) { //具有相同batching key的任务只会执行一次 if (updateTask.processed.get() == false) { final List<BatchedTask> toExecute = new ArrayList<>(); final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>(); synchronized (tasksPerBatchingKey) { //获取任务列表 LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey); if (pending != null) { for (BatchedTask task : pending) { if (task.processed.getAndSet(true) == false) { logger.trace("will process {}", task); //构建要执行的任务列表 toExecute.add(task); processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); } else { logger.trace("skipping {}, already processed", task); } } } } if (toExecute.isEmpty() == false) { final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> { String tasks = updateTask.describeTasks(entry.getValue()); return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); //执行任务 run(updateTask.batchingKey, toExecute, tasksSummary); } } }
执行任务并发布集群状态的逻辑在MasterService中
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
//运行任务,并发布集群状态
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
private void runTasks(TaskInputs taskInputs) { final String summary = taskInputs.summary; if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, master service not started", summary); return; } logger.debug("executing cluster state update for [{}]", summary); //之前集群状态 final ClusterState previousClusterState = state(); //只在主节点执行 if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) { logger.debug("failing [{}]: local node is no longer master", summary); taskInputs.onNoLongerMaster(); return; } final long computationStartTime = threadPool.relativeTimeInMillis(); //执行task任务生成新的集群状态 final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState); taskOutputs.notifyFailedTasks(); final TimeValue computationTime = getTimeSince(computationStartTime); logExecutionTime(computationTime, "compute cluster state update", summary); if (taskOutputs.clusterStateUnchanged()) { final long notificationStartTime = threadPool.relativeTimeInMillis(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); final TimeValue executionTime = getTimeSince(notificationStartTime); logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); } else {//集群状态发生改变 final ClusterState newClusterState = taskOutputs.newClusterState; if (logger.isTraceEnabled()) { logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); } else { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); } final long publicationStartTime = threadPool.relativeTimeInMillis(); try { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { String nodesDeltaSummary = nodesDelta.shortSummary(); if (nodesDeltaSummary.length() > 0) { logger.info("{}, term: {}, version: {}, delta: {}", summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary); } } logger.debug("publishing cluster state version [{}]", newClusterState.version()); //发布集群状态 publish(clusterChangedEvent, taskOutputs, publicationStartTime); } catch (Exception e) { handleException(summary, publicationStartTime, newClusterState, e); } } }
执行方法前判断是不是主节点因为只有主节点可以运行集群任务,根据执行任务前的集群状态执行任务生成新的集群状态
执行任务获取任务执行结果,并生成新的集群状态
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
//执行提交的任务,并且返回新的集群状态
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
//根据分配分片结果生成新的集群状态
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
clusterTasksResult.executionResults);
}
获取任务列表,调用executor的execute方法
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) { ClusterTasksResult<Object> clusterTasksResult; try { List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); //执行任务,并返回新的集群状态 clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); if (previousClusterState != clusterTasksResult.resultingState && previousClusterState.nodes().isLocalNodeElectedMaster() && (clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) { throw new AssertionError("update task submitted to MasterService cannot remove master"); } } catch (Exception e) { ...... clusterTasksResult = ClusterTasksResult.builder() .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e) .build(previousClusterState); } ...... return clusterTasksResult; }
这里我们以gateway恢复集群状态为例
ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception; @Override public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception { //执行集群状态变更task,并且返回执行之后的集群状态结果 ClusterState result = execute(currentState); return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result); } @Override public void onSuccess(final ClusterState recoveredState) { logger.trace("successful state recovery, importing cluster state..."); clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask() { @Override public ClusterState execute(final ClusterState currentState) { final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState); return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState)); } }); } @Override public ClusterState execute(final ClusterState currentState) { if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { logger.debug("cluster is already recovered"); return currentState; } //状态信息恢复完成 final ClusterState newState = Function.<ClusterState>identity() .andThen(ClusterStateUpdaters::updateRoutingTable) .andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock) .apply(currentState); //开始分配分片 return allocationService.reroute(newState, "state recovered"); }
生成新的集群状态,开始分配分片,根据之前的集群状态和新生成的结果构造新的集群状态
private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult<?> executionResult) { //新的集群状态 ClusterState newClusterState = executionResult.resultingState; if (previousClusterState != newClusterState) { // only the master controls the version numbers //生成新的集群状态版本号,递增的 Builder builder = incrementVersion(newClusterState); //路由表发生了改变,也就是分片信息发送了改变,分片-node if (previousClusterState.routingTable() != newClusterState.routingTable()) { builder.routingTable(RoutingTable.builder(newClusterState.routingTable()) .version(newClusterState.routingTable().version() + 1).build()); } //集群元数据发生了改变 if (previousClusterState.metadata() != newClusterState.metadata()) { builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1)); } newClusterState = builder.build(); } return newClusterState; }
回到MasterService的runTasks方法中新的集群状态已经生成并返回,然后判断集群状态和之前的集群状态是否相同,如果发生变化则将进入集群状态发布阶段,将最新的集群状态广播到所有节点
//发布集群状态 publish(clusterChangedEvent, taskOutputs, publicationStartTime); protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) { final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() { @Override protected boolean blockingAllowed() { return isMasterUpdateThread() || super.blockingAllowed(); } }; //发布集群状态 clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); // indefinitely wait for publication to complete //无限期等待发布完成 try { FutureUtils.get(fut); onPublicationSuccess(clusterChangedEvent, taskOutputs); } catch (Exception e) { onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e); } }
@Override public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) { //新的集群状态 ClusterState newState = clusterChangedEvent.state(); assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source(); try { // state got changed locally (maybe because another master published to us) if (clusterChangedEvent.previousState() != this.committedState.get()) { throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update"); } pendingStatesQueue.addPending(newState); //发布集群状态 publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (FailedToCommitClusterStateException t) { // cluster service logs a WARN message logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", newState.version(), electMaster.minimumMasterNodes()); synchronized (stateMutex) { pendingStatesQueue.failAllStatesAndClear( new ElasticsearchException("failed to publish cluster state")); rejoin("zen-disco-failed-to-publish"); } publishListener.onFailure(t); return; } final DiscoveryNode localNode = newState.getNodes().getLocalNode(); final AtomicBoolean processedOrFailed = new AtomicBoolean(); pendingStatesQueue.markAsCommitted(newState.stateUUID(), new PendingClusterStatesQueue.StateProcessedListener() { @Override public void onNewClusterStateProcessed() { processedOrFailed.set(true); publishListener.onResponse(null); ackListener.onNodeAck(localNode, null); } @Override public void onNewClusterStateFailed(Exception e) { processedOrFailed.set(true); publishListener.onFailure(e); ackListener.onNodeAck(localNode, e); logger.warn(() -> new ParameterizedMessage( "failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e); } }); synchronized (stateMutex) { if (clusterChangedEvent.previousState() != this.committedState.get()) { publishListener.onFailure( new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes") ); return; } //经过二阶段提交状态已经发布到了集群,但不能保证所有节点都成功了,下面处理提交后的集群状态 boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() + " committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]"); if (sentToApplier == false && processedOrFailed.get() == false) { assert false : "cluster state published locally neither processed nor failed: " + newState; logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed", newState.version()); publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither " + "been processed nor failed")); } } }
首先准备发送集群状态的目标节点列表,剔除本节点。构建增量发布或全量发布集群状态,然后执行序列化并压缩,以便将状态发布出去
public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException { final DiscoveryNodes nodes; final SendingController sendingController; final Set<DiscoveryNode> nodesToPublishTo; final Map<Version, BytesReference> serializedStates; final Map<Version, BytesReference> serializedDiffs; final boolean sendFullVersion; try { //需要发送目的节点 nodes = clusterChangedEvent.state().nodes(); nodesToPublishTo = new HashSet<>(nodes.getSize()); DiscoveryNode localNode = nodes.getLocalNode(); final int totalMasterNodes = nodes.getMasterNodes().size(); for (final DiscoveryNode node : nodes) { if (node.equals(localNode) == false) { nodesToPublishTo.add(node); } } sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null; //全量状态 serializedStates = new HashMap<>(); //增量状态 serializedDiffs = new HashMap<>(); // we build these early as a best effort not to commit in the case of error. // sadly this is not water tight as it may that a failed diff based publishing to a node // will cause a full serialization based on an older version, which may fail after the // change has been committed. //构建序列化后的结果 buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); //发布状态返回结果处理 final BlockingClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener); sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); } catch (Exception e) { throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e); } try { //发布 innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates, serializedDiffs); } catch (FailedToCommitClusterStateException t) { throw t; } catch (Exception e) { // try to fail committing, in cause it's still on going if (sendingController.markAsFailed("unexpected error", e)) { // signal the change should be rejected throw new FailedToCommitClusterStateException("unexpected error", e); } else { throw e; } } }
全量状态保存在serializedStates,增量状态保存在serializedDiffs。每个集群状态都有自己为一个版本好,在发布集群状态时允许相邻版本好之间只发送增量内容
构造需要发送的状态,如果上次发布集群状态的节点不存在或设置了全量发送配置,则构建全量状态否则构建增量状态然后进行序列化并压缩
private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo, boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) { Diff<ClusterState> diff = null; for (final DiscoveryNode node : nodesToPublishTo) { try { //发送全量 if (sendFullVersion || !previousState.nodes().nodeExists(node)) { // will send a full reference if (serializedStates.containsKey(node.getVersion()) == false) { serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion())); } } else { //发送增量 // will send a diff if (diff == null) { diff = clusterState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion())); } } } catch (IOException e) { throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node); } } }
es使用二阶段提交来实现状态发布,第一步是push及先将状态数据发送到node节点,但不应用,如果得到超过半数的节点的返回确认,则执行第二步commit及发送提交请求,二阶段提交不能保证节点收到commit请求后可以正确应用,也就是它只能保证发了commit请求,但是无法保证单个节点上的状态应用是成功还是失败的
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo, final SendingController sendingController, final Discovery.AckListener ackListener, final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates, final Map<Version, BytesReference> serializedDiffs) { final ClusterState clusterState = clusterChangedEvent.state(); final ClusterState previousState = clusterChangedEvent.previousState(); //发布超时时间 final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); //发布起始时间 final long publishingStartInNanos = System.nanoTime(); //遍历节点异步发送全量或增量状态数据 for (final DiscoveryNode node : nodesToPublishTo) { // try and serialize the cluster state once (or per version), so we don't serialize it // per node when we send it over the wire, compress it while we are at it... // we don't send full version if node didn't exist in the previous version of cluster state //发生全量状态 if (sendFullVersion || !previousState.nodes().nodeExists(node)) { sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { //发布增量状态 sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController); } } //等待提交,等待第一阶段完成收到足够的响应或达到了超时时间 sendingController.waitForCommit(discoverySettings.getCommitTimeout()); final long commitTime = System.nanoTime() - publishingStartInNanos; ackListener.onCommit(TimeValue.timeValueNanos(commitTime)); try { long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime); final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler(); sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos))); if (sendingController.getPublishingTimedOut()) { DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); // everyone may have just responded if (pendingNodes.length > 0) { logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); } } // The failure is logged under debug when a sending failed. we now log a summary. Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes(); if (failedNodes.isEmpty() == false) { logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]", clusterChangedEvent.state().version(), failedNodes); } } catch (InterruptedException e) { // ignore & restore interrupt Thread.currentThread().interrupt(); } }
无论是发送全量数据还是发送增量数据最终都会调用到sendClusterStateToNode方法
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes, final DiscoveryNode node, final TimeValue publishTimeout, final SendingController sendingController, final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) { try { //调用底层的传输层发送 transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.getVersion()), stateRequestOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { //发布超时 if (sendingController.getPublishingTimedOut()) { logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); } //检查收到的响应是否过半,然后执行commit sendingController.onNodeSendAck(node); } @Override public void handleException(TransportException exp) { if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) { logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage()); sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp); sendingController.onNodeSendFailed(node, exp); } } }); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); sendingController.onNodeSendFailed(node, e); } }
调用transportService的sendRequest方法异步发送数据,rpc请求为internal:discovery/zen/publish/send对应节点注册的处理器为SendClusterStateRequestHandler
//发送处理 transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new, new SendClusterStateRequestHandler()); private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> { @Override public void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception { //处理状态变更请求 handleIncomingClusterStateRequest(request, channel); } } protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); final ClusterState incomingState; synchronized (lastSeenClusterStateMutex) { try { if (compressor != null) { in = compressor.streamInput(in); } in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); in.setVersion(request.version()); // If true we received full cluster state - otherwise diffs //true:全量状态,false:增量 if (in.readBoolean()) { incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode()); incomingState = diff.apply(lastSeenClusterState); compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); } else { logger.debug("received diff for but don't have any local cluster state - requesting full state"); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } } catch (IncompatibleClusterStateVersionException e) { incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw e; } catch (Exception e) { logger.warn("unexpected error while deserializing an incoming cluster state", e); throw e; } finally { IOUtils.close(in); } //触发监听器 incomingClusterStateListener.onIncomingClusterState(incomingState); lastSeenClusterState = incomingState; } //发送发回空结果 channel.sendResponse(TransportResponse.Empty.INSTANCE); }
保存集群状态,然后返回空结果
继续回到主节点发送数据的回调函数中,检查响应是否足够
public synchronized void onNodeSendAck(DiscoveryNode node) { if (committed) {//提交状态 assert sendAckedBeforeCommit.isEmpty(); sendCommitToNode(node, clusterState, this); } else if (committedOrFailed()) { logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version()); } else { // we're still waiting sendAckedBeforeCommit.add(node); if (node.isMasterNode()) { checkForCommitOrFailIfNoPending(node); } } } //检查返回ack的节点数,如果超过了半数就执行commit private synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) { logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])", masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit); neededMastersToCommit--; if (neededMastersToCommit == 0) { if (markAsCommitted()) { for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { sendCommitToNode(nodeToCommit, clusterState, this); } sendAckedBeforeCommit.clear(); } } decrementPendingMasterAcksAndChangeForFailure(); }
接收到了足够的响应后开始执行commit逻辑
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); transportService.sendRequest(node, COMMIT_ACTION_NAME, new CommitClusterStateRequest(clusterState.stateUUID()), stateRequestOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { if (sendingController.getPublishingTimedOut()) { logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version()); } sendingController.getPublishResponseHandler().onResponse(node); } @Override public void handleException(TransportException exp) { logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), exp); sendingController.getPublishResponseHandler().onFailure(node, exp); } }); } catch (Exception t) { logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), t); sendingController.getPublishResponseHandler().onFailure(node, t); } }
同样通过transportService发生RPC请求,内部请求的url为internal:discovery/zen/publish/commit
接收数据的节点注册的处理器为CommitClusterStateRequestHandler
//提交处理
transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,
new CommitClusterStateRequestHandler());
//提交集群状态处理
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {
handleCommitRequest(request, channel);
}
}
节点应用集群状态
@Override public void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) { //更新提交新状态 final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID, new PendingClusterStatesQueue.StateProcessedListener() { @Override public void onNewClusterStateProcessed() { processedListener.onResponse(null); } @Override public void onNewClusterStateFailed(Exception e) { processedListener.onFailure(e); } }); if (state != null) { synchronized (stateMutex) { //应用新的集群状态 processNextCommittedClusterState("master " + state.nodes().getMasterNode() + " committed version [" + state.version() + "]"); } } } //集群应用新的集群状态 clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])", this::clusterState, new ClusterApplyListener() { @Override public void onSuccess(String source) { try { pendingStatesQueue.markAsProcessed(newClusterState); } catch (Exception e) { onFailure(source, e); } } @Override public void onFailure(String source, Exception e) { logger.error(() -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e); try { // TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around // for too long. pendingStatesQueue.markAsFailed(newClusterState, e); } catch (Exception inner) { inner.addSuppressed(e); logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner); } } });
最终调用到ClusterApplierService的runTask方法
private void runTask(UpdateTask task) { if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source); return; } logger.debug("processing [{}]: execute", task.source); //获取之前的集群状态 final ClusterState previousClusterState = state.get(); //任务执行起始时间 long startTimeMS = currentTimeInMillis(); //简单的秒表,允许对许多任务进行计时 final StopWatch stopWatch = new StopWatch(); final ClusterState newClusterState; try { try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) { newClusterState = task.apply(previousClusterState); } } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.trace(() -> new ParameterizedMessage( "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", executionTime, previousClusterState.version(), task.source, previousClusterState), e); warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onFailure(task.source, e); return; } if (previousClusterState == newClusterState) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onSuccess(task.source); } else { if (logger.isTraceEnabled()) { logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source, newClusterState); } else { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source); } try { //执行状态更新 applyChanges(task, previousClusterState, newClusterState, stopWatch); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source, executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch); task.listener.onSuccess(task.source); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); assert applicationMayFail(); task.listener.onFailure(task.source, e); } } }
遍历所有状态应用者,调用集群状态的应用者的应用集群状态方法
//发送集群状态应用者
callClusterStateAppliers(clusterChangedEvent, stopWatch);
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
clusterStateAppliers.forEach(applier -> {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
applier.applyClusterState(clusterChangedEvent);
}
});
}
遍历所有集群状态监听器,调用集群状态变更回调函数
//发送集群状态监听器
callClusterStateListeners(clusterChangedEvent, stopWatch);
//执行集群状态变更后的回调
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
listener.clusterChanged(clusterChangedEvent);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
});
}
回到主节点执行回调函数handleResponse和handleException两个回调函数执行相同的处理逻辑,将latch减一,如果有的节点执行失败也不会执行修复逻辑。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。