赞
踩
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十六篇,介绍SOFARegistry网络操作之配置信息如何处理。
比如某些系统相关的服务,需要由控制台来设定。所以Meta Server对控制台提供了接口,当Meta Server 接受到控制台请求后,会和 Data Server,Session Server 进行交互,比如Meta Server 提供了如下接口:
@Bean
@ConditionalOnMissingBean
public StopPushDataResource stopPushDataResource() {
return new StopPushDataResource();
}
@Bean
public BlacklistDataResource blacklistDataResource() {
return new BlacklistDataResource();
}
@Bean
public RenewSwitchResource renewSwitchResource() {
return new RenewSwitchResource();
}
对外提供http接口,是因为这是正常基本操作。但是Server之间依然是Bolt协议操作。
此处推导如下:在DataServer端,如何把配置信息单独摘出来。
DataServer之中,配置相关目录如下,可以看到有Handler,服务,task以及provideData。
│ ├── metaserver
│ │ ├── DefaultMetaServiceImpl.java
│ │ ├── IMetaServerService.java
│ │ ├── MetaServerConnectionFactory.java
│ │ ├── handler
│ │ │ ├── NotifyProvideDataChangeHandler.java
│ │ │ ├── ServerChangeHandler.java
│ │ │ └── StatusConfirmHandler.java
│ │ ├── provideData
│ │ │ ├── ProvideDataProcessor.java
│ │ │ ├── ProvideDataProcessorManager.java
│ │ │ └── processor
│ │ │ └── DatumExpireProvideDataProcessor.java
│ │ └── task
配置相关数据结构如下:
ProvideData是对外的交互接口,里面是版本号和服务标示dataInfoId。
public class ProvideData implements Serializable {
private ServerDataBox provideData;
private String dataInfoId;
private Long version;
}
ServerDataBox是具体业务,其定义如下
public class ServerDataBox implements Serializable {
/** Null for locally instantiated, otherwise for internalized */
private byte[] bytes;
/** Only available if bytes != null */
private int serialization;
/** Actual object, lazy deserialized */
private Object object;
}
关于ServerDataBox,目前在Data Server只有一处使用。使用的是boolean类型,也就是控制开关配置。
public void changeDataProcess(ProvideData provideData) {
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
这里为了打通流程,需要先提一下 meta server 内部与 metaServerService.fetchData(dataInfoId) 相关的流程。
处于解耦的目的,Meta Server 把某些业务功能分割成四个层次,基本逻辑是:
Http Resource ———> TaskListener ———> Task ————> Service
首先给出流程图如下,下文会逐步介绍流程:
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ 7 +-------+ 1 | | +---------------------------------------+ | Admin | +---> | +--------------------- | | Data Server | +-------+ | |fireDataChangeNotify| | | | | +--------------------+ | 6 | +-----------------------------------+ | +------------------------+ | | metaClientHandlers | | | +---------------------+ dataNodeExchanger.request | | +-------------------------------+ | | | 3 | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | | | +----------+----------+ NotifyProvideDataChange | | +-------------------------------+ | | | NotifyProvideDataChange ^ | | | | | | | +-----------------------------------+ | | 5 | notifyProvideDataChange +---------------------------------------+ v | +---------+-----------------------------------+ | | DefaultTaskListenerManager | | | | +----+----------------------------+ | +-----------------------------------------+ | 4 | | | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask | | | | | | | | | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+ | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
手机图示如下 :
前面提到,Meta Server 通过 Http协议给Admin提供了一些控制接口,下面我们就以 BlacklistDataResource 为例研究下。
可以看到,blacklistPush 函数中会先存储在 persistenceDataDBService 中,然后 fireDataChangeNotify 间接发送 NotifyProvideDataChange。
@Path("blacklist") public class BlacklistDataResource { @RaftReference private DBService persistenceDataDBService; @Autowired private TaskListenerManager taskListenerManager; /** * update blacklist * e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update */ @POST @Path("update") @Produces(MediaType.APPLICATION_JSON) public Result blacklistPush(String config) { PersistenceData persistenceData = createDataInfo(); persistenceData.setData(config); boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID, persistenceData); fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID, DataOperator.UPDATE); Result result = new Result(); result.setSuccess(true); return result; } private PersistenceData createDataInfo() { DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID); PersistenceData persistenceData = new PersistenceData(); persistenceData.setDataId(dataInfo.getDataId()); persistenceData.setGroup(dataInfo.getDataType()); persistenceData.setInstanceId(dataInfo.getInstanceId()); persistenceData.setVersion(System.currentTimeMillis()); return persistenceData; } private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) { NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId, version, dataOperator); TaskEvent taskEvent = new TaskEvent(notifyProvideDataChange, TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK); taskListenerManager.sendTaskEvent(taskEvent); } }
这里对应上图的:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
可以看到,DBService也是基于 Raft,这说明在MetaServer集群内部自己维护了一致性。
@RaftReference
private DBService persistenceDataDBService;
PersistenceDataDBService 类精简版定义如下:
@RaftService public class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService { private ConcurrentHashMap<String, Object> serviceMap = new ConcurrentHashMap<>(); @Override public boolean put(String key, Object value) { Object ret = serviceMap.put(key, value); return true; } @Override public DBResponse get(String key) { Object ret = serviceMap.get(key); return ret != null ? DBResponse.ok(ret).build() : DBResponse.notfound().build(); } @Override public boolean update(String key, Object value) { Object ret = serviceMap.put(key, value); return true; } @Override public Set<String> getSnapshotFileNames() { if (!snapShotFileNames.isEmpty()) { return snapShotFileNames; } snapShotFileNames.add(this.getClass().getSimpleName()); return snapShotFileNames; } }
可以看出来,主要采用了ConcurrentHashMap来进行存储,Raft机制则用文件系统完成快照备份。
如前所述,为了解耦,Meta Server 把一些消息处理转发等功能封装为TaskListener,由 TaskListenerManager 在逻辑上负责统一执行。这里就以ProvideData相关功能为例,对应的Bean是。
@Configuration public static class MetaServerTaskConfiguration { ...... @Bean public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) { TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener( sessionNodeSingleTaskProcessor()); taskListenerManager.addTaskListener(taskListener); return taskListener; } @Bean public TaskListenerManager taskListenerManager() { return new DefaultTaskListenerManager(); } }
Listener的执行引擎如下,可以看出来是遍历listener列表进行处理,如果某listener可以处理,就执行。
public class DefaultTaskListenerManager implements TaskListenerManager { private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create(); @Override public Multimap<TaskType, TaskListener> getTaskListeners() { return taskListeners; } @Override public void addTaskListener(TaskListener taskListener) { taskListeners.put(taskListener.support(), taskListener); } @Override public void sendTaskEvent(TaskEvent taskEvent) { Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType()); for (TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); } } }
对应业务Listener如下:
public class PersistenceDataChangeNotifyTaskListener implements TaskListener { @Autowired private MetaServerConfig metaServerConfig; private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher; public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) { singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher( TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor); } @Override public TaskType support() { return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK; } @Override public void handleEvent(TaskEvent event) { MetaServerTask persistenceDataChangeNotifyTask = new PersistenceDataChangeNotifyTask( metaServerConfig); persistenceDataChangeNotifyTask.setTaskEvent(event); singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(), persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime()); } }
这里对应了如下:
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ +-------+ 1 | | | Admin | +---> | +--------------------+ | +-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | | 3 | | NotifyProvideDataChange | | v +---------+-----------------------------------+ | DefaultTaskListenerManager | | | | +-----------------------------------------+ | | | persistenceDataChangeNotifyTaskListener | | | | | | | | receiveStatusConfirmNotifyTaskListener | | | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
Listener会调用到Task。
处理Task如下,需要区分根据NoteType不同,来调用不同的服务:
public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask { private final SessionNodeService sessionNodeService; private final DataNodeService dataNodeService; final private MetaServerConfig metaServerConfig; private NotifyProvideDataChange notifyProvideDataChange; @Override public void execute() { Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes(); if (nodeTypes.contains(NodeType.DATA)) { dataNodeService.notifyProvideDataChange(notifyProvideDataChange); } if (nodeTypes.contains(NodeType.SESSION)) { sessionNodeService.notifyProvideDataChange(notifyProvideDataChange); } } @Override public void setTaskEvent(TaskEvent taskEvent) { Object obj = taskEvent.getEventObj(); if (obj instanceof NotifyProvideDataChange) { this.notifyProvideDataChange = (NotifyProvideDataChange) obj; } } }
这里对应
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ +-------+ 1 | | | Admin | +---> | +--------------------+ | +-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | | 3 | | NotifyProvideDataChange | | v +-------------------+-------------------------+ | DefaultTaskListenerManager | | | +---------------------------------+ | +-----------------------------------------+ | 4 | | | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask | | | | | | | | | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+ | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
task会调用服务来执行具体业务,具体业务服务如下,这里会向DataServer或者SessionServer发送推送。
public class DataNodeServiceImpl implements DataNodeService { @Autowired private NodeExchanger dataNodeExchanger; @Autowired private StoreService dataStoreService; @Autowired private AbstractServerHandler dataConnectionHandler; @Override public NodeType getNodeType() { return NodeType.DATA; } @Override public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) { NodeConnectManager nodeConnectManager = getNodeConnectManager(); Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null); // add register confirm StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA); Map<String, DataNode> dataNodes = storeService.getNodes(); for (InetSocketAddress connection : connections) { if (!dataNodes.keySet().contains(connection.getAddress().getHostAddress())) { continue; } try { Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() { @Override public NotifyProvideDataChange getRequestBody() { return notifyProvideDataChange; } @Override public URL getRequestUrl() { return new URL(connection); } }; dataNodeExchanger.request(request); } } } }
这里对应
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ +-------+ 1 | | | Admin | +---> | +--------------------+ | +-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | +---------------------+ | 3 | DataNodeServiceImpl | | +----------+----------+ | NotifyProvideDataChange ^ | | | 5 | notifyProvideDataChange v | +---------+-----------------------------------+ | | DefaultTaskListenerManager | | | | +----+----------------------------+ | +-----------------------------------------+ | 4 | | | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask | | | | | | | | | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+ | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
发送之后,就是
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ +-------+ 1 | | | Admin | +---> | +--------------------+ | +-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | 3 | NotifyProvideDataChange v +-------------------+-------------------------+ | DefaultTaskListenerManager | | | +---------------------------------+ | +-----------------------------------------+ | 4 | | | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask | | | receiveStatusConfirmNotifyTaskListener | | | | | | dataNodeChangePushTaskListener | | +----+----------------------------+ | | sessionNodeChangePushTaskListener | | | | +-----------------------------------------+ | | +---------------------------------------------+ 5 | notifyProvideDataChange | +-------------------------------------------+ | v +---v---------+-------+ | DataNodeServiceImpl | +---------------------------------------+ +-------------+-------+ | Data Server 7 | | 6 | | | dataNodeExchanger.request | +-----------------------------------+ | +->------------------------------>+ | metaClientHandlers | | NotifyProvideDataChange | | +-------------------------------+ | | | | | notifyPro|ideDataChangeHandler| | | | | +-------------------------------+ | | | +-----------------------------------+ | +---------------------------------------+
现在我们知道了,在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。
执行序列来到了DataServer。我们先要做一些前提准备。
Bean metaClientHandlers是 MetaNodeExchanger 的响应函数。而 notifyProvideDataChangeHandler 是 metaClientHandlers 的一部分。
@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(serverChangeHandler());
list.add(statusConfirmHandler());
list.add(notifyProvideDataChangeHandler());
return list;
}
MetaNodeExchanger 在 DefaultMetaServiceImpl.getMetaServerMap 调用 metaNodeExchanger.connect 的时候,会设置这个 metaClientHandlers。这样就把notifyProvideDataChangeHandler同MetaServer以Bolt方式联系了起来。
public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public Map<String, Set<String>> getMetaServerMap() {
Connection connection = null;
connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
.next(), dataServerConfig.getMetaServerPort()))).getConnection();
}
}
MetaNodeExchanger定义如下,其作用是统一处理DataServer内部关于MetaServer的交互。
public class MetaNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private IMetaServerService metaServerService; @Resource(name = "metaClientHandlers") private Collection<AbstractClientHandler> metaClientHandlers; public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.META_SERVER_TYPE, url, metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()])); } } } //try to connect data Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
NotifyProvideDataChangeHandler 在 interest 函数中,设定了自己可以处理 NotifyProvideDataChange 类型消息。这样当 MetaServer 通知有 NotifyProvideDataChange 的时候,就会调用 metaServerService.fetchData(dataInfoId); 获取 ProvideData,进行后续处理。
public class NotifyProvideDataChangeHandler extends AbstractClientHandler { @Autowired private IMetaServerService metaServerService; @Autowired private ProvideDataProcessor provideDataProcessorManager; @Override public Object doHandle(Channel channel, Object request) { NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request; String dataInfoId = notifyProvideDataChange.getDataInfoId(); if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) { ProvideData provideData = metaServerService.fetchData(dataInfoId); provideDataProcessorManager.changeDataProcess(provideData); } return null; } @Override public Class interest() { return NotifyProvideDataChange.class; } }
在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。
于是NotifyProvideDataChangeHandler将作出响应。
在 NotifyProvideDataChangeHandler 之中 ,有如下
ProvideData provideData = metaServerService.fetchData(dataInfoId);
然后调用 DefaultMetaServiceImpl 中 fetchData 来去 Meta Server 获取 ProvideData。
@Override public ProvideData fetchData(String dataInfoId) { Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); String leader = getLeader().getIp(); if (connectionMap.containsKey(leader)) { Connection connection = connectionMap.get(leader); if (connection.isFine()) { try { Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() { @Override public FetchProvideDataRequest getRequestBody() { return new FetchProvideDataRequest(dataInfoId); } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection.getRemotePort()); } }; Response response = metaNodeExchanger.request(request); Object result = response.getResult(); if (result instanceof ProvideData) { return (ProvideData) result; } } } } String newip = refreshLeader().getIp(); return null; }
现在图示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | | 1 | +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | +-----------------------------+ | | | +---------------------------------+ | +------------------------------------+ | | ^ | | serverChangeHandler | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | +--------------------------------+ | | 2 | 3 | | | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | | | | | | | | | | | | | | | | | | | | | | | | | ProvideData provideData = +--------------------------------------------------------------------------------------+ | | | | | | get ProvideData from Meta Server FetchProvideDataRequest | | | metaServerService.fetchData | | | | | | | | | | | | | | | | | | | | | | | | changeDataProcess(provideData) | | | | | | | | | | | +--------------------------------+ | | | +------------------------------------+ | +------------------------------------------+
手机上如下:
执行序列回到了MetaServer,它收到了FetchProvideDataRequest。
FetchProvideDataRequestHandler是响应函数。函数逻辑相对简单,就是从DBService之中根据DataInfoId获取数据,返回给调用者。
public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> { @RaftReference private DBService persistenceDataDBService; @Override public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) { DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId()); if (ret.getOperationStatus() == OperationStatus.SUCCESS) { PersistenceData data = (PersistenceData) ret.getEntity(); ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()), fetchProvideDataRequest.getDataInfoId(), data.getVersion()); return provideData; } else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) { ProvideData provideData = new ProvideData(null, fetchProvideDataRequest.getDataInfoId(), null); return provideData; } } } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override public Class interest() { return FetchProvideDataRequest.class; } }
由此可见,这里的关键是 DBService。
于是从MetaServer角度看,流程如下:
+----------------------------------------------+ | Data Server | | | | +---------------------------------------+ | | | NotifyProvideDataChangeHandler | | | | | | | | | | | |metaSer^erSer^ice.fetchData(dataInfoId)| | | +---------------------------------------+ | +----------------------------------------------+ | ^ | | 1 | | FetchProvideDataRequest | | ProvideData | | | | 4 +-----------------------------------------+ | Meta Server | | | | | | | | +--------------------v---+-------+ | | | FetchProvideDataRequestHandler | | | +--------------+---+-------------+ | | 2 | ^ | | | | DBResponse | | get(DataInfoId) | | 3 | | v | | | +---------+---+------------+ | | | PersistenceDataDBService | | | +--------------------------+ | +-----------------------------------------+
Session Server 也会发起 FetchProvideDataRequest。在 SessionServerBootstrap 中有如下函数,都会发起请求,获取配置信息。
private void fetchStopPushSwitch(URL leaderUrl) { FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest( ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID); Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl); if (ret instanceof ProvideData) { ProvideData provideData = (ProvideData) ret; provideDataProcessorManager.fetchDataProcess(provideData); } } private void fetchEnableDataRenewSnapshot(URL leaderUrl) { FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest( ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT); Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl); if (data instanceof ProvideData) { ProvideData provideData = (ProvideData) data; provideDataProcessorManager.fetchDataProcess(provideData); } } private void fetchBlackList() { blacklistManager.load(); }
在 NotifyProvideDataChangeHandler 之中,如下语句用来处理ProvideData。就是在fetchData之中。
在请求响应处理中
Response response = metaNodeExchanger.request(request);
Object result = response.getResult();
if (result instanceof ProvideData) {
return (ProvideData) result;
}
就是如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | | 1 | +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | +-----------------------------+ | | | +---------------------------------+ | +------------------------------------+ | | ^ | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | | 2 | 3 | | 4 | | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | | | | | | | | | | | | | | | | get ProvideData from Meta Server | | | | | | | | | | | | | ProvideData provideData = +--------------------------------------------------------------------------------------+ | | | | | | | | | | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+ | | | | | | ProvideData | | | | | | | | | | | | | | | changeDataProcess(provideData) | | | | | | | | | | | +--------------------------------+ | | | +------------------------------------+ | +------------------------------------------+
手机如下:
继续处理是如下:
provideDataProcessorManager.changeDataProcess(provideData);
这就牵扯了如何用引擎处理。
这里生成了处理引擎 ProvideDataProcessorManager,添加了一个处理handler DatumExpireProvideDataProcessor。
@Configuration public static class DataProvideDataConfiguration { @Bean public ProvideDataProcessor provideDataProcessorManager() { return new ProvideDataProcessorManager(); } @Bean public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) { ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor(); ((ProvideDataProcessorManager) provideDataProcessorManager) .addProvideDataProcessor(datumExpireProvideDataProcessor); return datumExpireProvideDataProcessor; } }
这里的套路依然很熟悉,即ProvideDataProcessor引擎,也就是ProvideDataProcessorManager也继承了ProvideDataProcessor,但是在support之中设置了 return false
,这样引擎遍历执行时候,就不会执行自己了。
public class ProvideDataProcessorManager implements ProvideDataProcessor { private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>(); public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) { provideDataProcessors.add(provideDataProcessor); } @Override public void changeDataProcess(ProvideData provideData) { for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) { if (provideDataProcessor.support(provideData)) { provideDataProcessor.changeDataProcess(provideData); } } } @Override public boolean support(ProvideData provideData) { return false; } }
这里的 DatumLeaseManager 就可以对应到前面讲的 AfterWorkingProcess。
Handler之中调用DatumLeaseManager完成配置数据的部署。
public class DatumExpireProvideDataProcessor implements ProvideDataProcessor { @Autowired private DatumLeaseManager datumLeaseManager; @Override public void changeDataProcess(ProvideData provideData) { if (checkInvalid(provideData)) { return; } boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire); } private boolean checkInvalid(ProvideData provideData) { boolean invalid = provideData == null || provideData.getProvideData() == null || provideData.getProvideData().getObject() == null; return invalid; } @Override public boolean support(ProvideData provideData) { return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId()); } }
最终,图示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | | 1 | +-----------------------------+ | | getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | +-----------------------------+ | | | +---------------------------------+ | +------------------------------------+ | | ^ | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | | 2 | 3 | | 4 | | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | | | | | | | | | | | | | | | | get ProvideData from Meta Server | | | | | | | | | | | | | ProvideData provideData = +--------------------------------------------------------------------------------------+ | | | | | | | | | | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+ | | | | | | ProvideData | | | | | | | | | | | | 5 +---------------------------------------------+ | | | changeDataProcess(provideData)+--------------+ | ProvideDataProcessor | | | | | | | | | | | | +--------------------------------+ | | +-------> | changeDataProcess(ProvideData provideData) | | +------------------------------------+ | | | +------------------------------------------+ +---------------------------------------------+
手机图例如下:
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFABolt 源码分析13 - Connection 事件处理机制的设计
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。