[从源码学设计]蚂蚁金服SOFARegistry之配置信息

[从源码学设计]蚂蚁金服SOFARegistry之配置信息

目录

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第十七篇,介绍SOFARegistry网络操作之配置信息如何处理。

0x01 业务范畴

1.1 配置作用

比如某些系统相关的服务,需要由控制台来设定。所以Meta Server对控制台提供了接口,当Meta Server 接受到控制台请求后,会和 Data Server,Session Server 进行交互,比如Meta Server 提供了如下接口

@Bean
@ConditionalOnMissingBean
public StopPushDataResource stopPushDataResource() {
    return new StopPushDataResource();
}

@Bean
public BlacklistDataResource blacklistDataResource() {
    return new BlacklistDataResource();
}

@Bean
public RenewSwitchResource renewSwitchResource() {
    return new RenewSwitchResource();
}

对外提供http接口,是因为这是正常基本操作。但是Server之间依然是Bolt协议操作。

1.2 学习方向

此处推导如下:在DataServer端,如何把配置信息单独摘出来。

0x02 数据结构

2.1 目录结构

DataServer之中,配置相关目录如下,可以看到有Handler,服务,task以及provideData。

│   ├── metaserver
│   │   ├── DefaultMetaServiceImpl.java
│   │   ├── IMetaServerService.java
│   │   ├── MetaServerConnectionFactory.java
│   │   ├── handler
│   │   │   ├── NotifyProvideDataChangeHandler.java
│   │   │   ├── ServerChangeHandler.java
│   │   │   └── StatusConfirmHandler.java
│   │   ├── provideData
│   │   │   ├── ProvideDataProcessor.java
│   │   │   ├── ProvideDataProcessorManager.java
│   │   │   └── processor
│   │   │       └── DatumExpireProvideDataProcessor.java
│   │   └── task

2.2 数据结构定义

配置相关数据结构如下:

ProvideData是对外的交互接口,里面是版本号和服务标示dataInfoId。

public class ProvideData implements Serializable {
    private ServerDataBox provideData;
    private String        dataInfoId;
    private Long          version;
}

ServerDataBox是具体业务,其定义如下

public class ServerDataBox implements Serializable {
    /** Null for locally instantiated, otherwise for internalized */
    private byte[]            bytes;
    /** Only available if bytes != null */
    private int               serialization;
    /** Actual object, lazy deserialized */
    private Object            object;
}  

关于ServerDataBox,目前在Data Server只有一处使用。使用的是boolean类型,也就是控制开关配置。

public void changeDataProcess(ProvideData provideData) {
    boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
        .getObject());
    datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}

0x03 Meta Server 内流程

这里为了打通流程,需要先提一下 meta server 内部与 metaServerService.fetchData(dataInfoId) 相关的流程。

处于解耦的目的,Meta Server 把某些业务功能分割成四个层次,基本逻辑是:

Http Resource ———>  TaskListener ———>   Task  ————>   Service   

首先给出流程图如下,下文会逐步介绍流程:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+                                                      7
+-------+   1   |                        |                                                                       +---------------------------------------+
| Admin | +---> | +--------------------- |                                                                       |    Data Server                        |
+-------+       | |fireDataChangeNotify| |                                                                       |                                       |
                | +--------------------+ |                                                     6                 | +-----------------------------------+ |
                +------------------------+                                                                       | |    metaClientHandlers             | |
                    |                                     +---------------------+    dataNodeExchanger.request   | | +-------------------------------+ | |
                    |        3                            | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | |
                    |                                     +----------+----------+     NotifyProvideDataChange    | | +-------------------------------+ | |
                    |   NotifyProvideDataChange                      ^                                           | |                                   | |
                    |                                                |                                           | +-----------------------------------+ |
                    |                                             5  | notifyProvideDataChange                   +---------------------------------------+
                    v                                                |
          +---------+-----------------------------------+            |
          |       DefaultTaskListenerManager            |            |
          |                                             |       +----+----------------------------+
          | +-----------------------------------------+ | 4     |                                 |
          | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
          | |                                         | |       |                                 |
          | | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
          | |                                         | |
          | | dataNodeChangePushTaskListener          | |
          | |                                         | |
          | | sessionNodeChangePushTaskListener       | |
          | +-----------------------------------------+ |
          +---------------------------------------------+

手机图示如下 :

[从源码学设计]蚂蚁金服SOFARegistry之配置信息

3.1 Admin请求响应

前面提到,Meta Server 通过 Http协议给Admin提供了一些控制接口,下面我们就以 BlacklistDataResource 为例研究下。

可以看到,blacklistPush 函数中会先存储在 persistenceDataDBService 中,然后 fireDataChangeNotify 间接发送 NotifyProvideDataChange。

@Path("blacklist")
public class BlacklistDataResource {

    @RaftReference
    private DBService           persistenceDataDBService;

    @Autowired
    private TaskListenerManager taskListenerManager;

    /**
     * update blacklist
     * e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update
     */
    @POST
    @Path("update")
    @Produces(MediaType.APPLICATION_JSON)
    public Result blacklistPush(String config) {
        PersistenceData persistenceData = createDataInfo();
        persistenceData.setData(config);
        boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID,
                persistenceData);

        fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID,
            DataOperator.UPDATE);

        Result result = new Result();
        result.setSuccess(true);
        return result;
    }

    private PersistenceData createDataInfo() {
        DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID);
        PersistenceData persistenceData = new PersistenceData();
        persistenceData.setDataId(dataInfo.getDataId());
        persistenceData.setGroup(dataInfo.getDataType());
        persistenceData.setInstanceId(dataInfo.getInstanceId());
        persistenceData.setVersion(System.currentTimeMillis());
        return persistenceData;
    }

    private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) {

        NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId,
            version, dataOperator);

        TaskEvent taskEvent = new TaskEvent(notifyProvideDataChange,
            TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK);
        taskListenerManager.sendTaskEvent(taskEvent);
    }

}

这里对应上图的:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+

3.2 DBService

可以看到,DBService也是基于 Raft,这说明在MetaServer集群内部自己维护了一致性。

@RaftReference
private DBService           persistenceDataDBService;

PersistenceDataDBService 类精简版定义如下:

@RaftService
public class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService {

    private ConcurrentHashMap<String, Object> serviceMap        = new ConcurrentHashMap<>();

    @Override
    public boolean put(String key, Object value) {
        Object ret = serviceMap.put(key, value);
        return true;
    }

    @Override
    public DBResponse get(String key) {
        Object ret = serviceMap.get(key);
        return ret != null ? DBResponse.ok(ret).build() : DBResponse.notfound().build();
    }

    @Override
    public boolean update(String key, Object value) {
        Object ret = serviceMap.put(key, value);
        return true;
    }
      
    @Override
    public Set<String> getSnapshotFileNames() {
        if (!snapShotFileNames.isEmpty()) {
            return snapShotFileNames;
        }
        snapShotFileNames.add(this.getClass().getSimpleName());
        return snapShotFileNames;
    }      
}

可以看出来,主要采用了ConcurrentHashMap来进行存储,Raft机制则用文件系统完成快照备份。

3.3 Bean

如前所述,为了解耦,Meta Server 把一些消息处理转发等功能封装为TaskListener,由 TaskListenerManager 在逻辑上负责统一执行。这里就以ProvideData相关功能为例,对应的Bean是。

@Configuration
public static class MetaServerTaskConfiguration {
  
    ......

    @Bean
    public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) {
        TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener(
            sessionNodeSingleTaskProcessor());
        taskListenerManager.addTaskListener(taskListener);
        return taskListener;
    }

    @Bean
    public TaskListenerManager taskListenerManager() {
        return new DefaultTaskListenerManager();
    }
}

3.4 Listener

Listener的执行引擎如下,可以看出来是遍历listener列表进行处理,如果某listener可以处理,就执行。

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners() {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for (TaskListener taskListener : taskListeners) {
            taskListener.handleEvent(taskEvent);
        }
    }
}

对应业务Listener如下:

public class PersistenceDataChangeNotifyTaskListener implements TaskListener {

    @Autowired
    private MetaServerConfig                       metaServerConfig;

    private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher;

    public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) {
        singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher(
            TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor);
    }

    @Override
    public TaskType support() {
        return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {
        MetaServerTask persistenceDataChangeNotifyTask = new PersistenceDataChangeNotifyTask(
            metaServerConfig);
        persistenceDataChangeNotifyTask.setTaskEvent(event);
        singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(),
            persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime());
    }
}

这里对应了如下:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |
                    |        3
                    |
                    |   NotifyProvideDataChange
                    |
                    |
                    v
          +---------+-----------------------------------+
          |       DefaultTaskListenerManager            |
          |                                             |
          | +-----------------------------------------+ |
          | | persistenceDataChangeNotifyTaskListener | |
          | |                                         | |
          | | receiveStatusConfirmNotifyTaskListener  | |
          | |                                         | |
          | | dataNodeChangePushTaskListener          | |
          | |                                         | |
          | | sessionNodeChangePushTaskListener       | |
          | +-----------------------------------------+ |
          +---------------------------------------------+

3.5 Task

Listener会调用到Task。

处理Task如下,需要区分根据NoteType不同,来调用不同的服务:

public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask {

    private final SessionNodeService sessionNodeService;

    private final DataNodeService    dataNodeService;

    final private MetaServerConfig   metaServerConfig;

    private NotifyProvideDataChange  notifyProvideDataChange;

    @Override
    public void execute() {
        Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes();
        if (nodeTypes.contains(NodeType.DATA)) {
            dataNodeService.notifyProvideDataChange(notifyProvideDataChange);
        }
        if (nodeTypes.contains(NodeType.SESSION)) {
            sessionNodeService.notifyProvideDataChange(notifyProvideDataChange);
        }
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        Object obj = taskEvent.getEventObj();
        if (obj instanceof NotifyProvideDataChange) {
            this.notifyProvideDataChange = (NotifyProvideDataChange) obj;
        } 
    }
}

这里对应

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |
                    |        3
                    |
                    |   NotifyProvideDataChange
                    |
                    |
                    v
+-------------------+-------------------------+
|       DefaultTaskListenerManager            |
|                                             |       +---------------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| |                                         | |       |                                 |
| | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
| |                                         | |
| | dataNodeChangePushTaskListener          | |
| |                                         | |
| | sessionNodeChangePushTaskListener       | |
| +-----------------------------------------+ |
+---------------------------------------------+

3.6 服务

task会调用服务来执行具体业务,具体业务服务如下,这里会向DataServer或者SessionServer发送推送。

public class DataNodeServiceImpl implements DataNodeService {

    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private StoreService          dataStoreService;

    @Autowired
    private AbstractServerHandler dataConnectionHandler;

    @Override
    public NodeType getNodeType() {
        return NodeType.DATA;
    }

    @Override
    public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) {

        NodeConnectManager nodeConnectManager = getNodeConnectManager();
        Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);

        // add register confirm
        StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
        Map<String, DataNode> dataNodes = storeService.getNodes();

        for (InetSocketAddress connection : connections) {

            if (!dataNodes.keySet().contains(connection.getAddress().getHostAddress())) {
                continue;
            }

            try {
                Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() {

                    @Override
                    public NotifyProvideDataChange getRequestBody() {
                        return notifyProvideDataChange;
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(connection);
                    }
                };

                dataNodeExchanger.request(request);
            } 
        }
    }
}

这里对应

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
          |                                     +---------------------+
          |        3                            | DataNodeServiceImpl |
          |                                     +----------+----------+
          |   NotifyProvideDataChange                      ^
          |                                                |
          |                                             5  | notifyProvideDataChange
          v                                                |
+---------+-----------------------------------+            |
|       DefaultTaskListenerManager            |            |
|                                             |       +----+----------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| |                                         | |       |                                 |
| | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
| |                                         | |
| | dataNodeChangePushTaskListener          | |
| |                                         | |
| | sessionNodeChangePushTaskListener       | |
| +-----------------------------------------+ |
+---------------------------------------------+

发送之后,就是

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |       3
                    |   NotifyProvideDataChange
                    v
+-------------------+-------------------------+
|       DefaultTaskListenerManager            |
|                                             |       +---------------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | receiveStatusConfirmNotifyTaskListener  | |       |                                 |
| | dataNodeChangePushTaskListener          | |       +----+----------------------------+
| | sessionNodeChangePushTaskListener       | |            |
| +-----------------------------------------+ |            |
+---------------------------------------------+         5  | notifyProvideDataChange
                                                           |
               +-------------------------------------------+
               |
               v
 +---v---------+-------+
 | DataNodeServiceImpl |                         +---------------------------------------+
 +-------------+-------+                         |    Data Server                      7 |
               |                       6         |                                       |
               |     dataNodeExchanger.request   | +-----------------------------------+ |
               +->------------------------------>+ |    metaClientHandlers             | |
                      NotifyProvideDataChange    | | +-------------------------------+ | |
                                                 | | | notifyPro|ideDataChangeHandler| | |
                                                 | | +-------------------------------+ | |
                                                 | +-----------------------------------+ |
                                                 +---------------------------------------+

现在我们知道了,在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。

0x04 调用路径 in Data Server

执行序列来到了DataServer。我们先要做一些前提准备。

4.1 Bean

Bean metaClientHandlers是 MetaNodeExchanger 的响应函数。而 notifyProvideDataChangeHandler 是 metaClientHandlers 的一部分。

@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(serverChangeHandler());
    list.add(statusConfirmHandler());
    list.add(notifyProvideDataChangeHandler());
    return list;
}

4.2 网络交互

MetaNodeExchanger 在 DefaultMetaServiceImpl.getMetaServerMap 调用 metaNodeExchanger.connect 的时候,会设置这个 metaClientHandlers。这样就把notifyProvideDataChangeHandler同MetaServer以Bolt方式联系了起来。

public class DefaultMetaServiceImpl implements IMetaServerService {
  @Override
  public Map<String, Set<String>> getMetaServerMap() {
      Connection connection = null;
              connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
                  .next(), dataServerConfig.getMetaServerPort()))).getConnection();
  }
}

MetaNodeExchanger定义如下,其作用是统一处理DataServer内部关于MetaServer的交互。

public class MetaNodeExchanger implements NodeExchanger {
    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private IMetaServerService                metaServerService;

    @Resource(name = "metaClientHandlers")
    private Collection<AbstractClientHandler> metaClientHandlers;

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
                        metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));
                }
            }
        }
        //try to connect data
        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }
        return channel;
    }
}

4.3 Handler 定义

NotifyProvideDataChangeHandler 在 interest 函数中,设定了自己可以处理 NotifyProvideDataChange 类型消息。这样当 MetaServer 通知有 NotifyProvideDataChange 的时候,就会调用 metaServerService.fetchData(dataInfoId); 获取 ProvideData,进行后续处理。

public class NotifyProvideDataChangeHandler extends AbstractClientHandler {

    @Autowired
    private IMetaServerService   metaServerService;

    @Autowired
    private ProvideDataProcessor provideDataProcessorManager;


    @Override
    public Object doHandle(Channel channel, Object request) {
        NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;
        String dataInfoId = notifyProvideDataChange.getDataInfoId();
        if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {
            ProvideData provideData = metaServerService.fetchData(dataInfoId);
            provideDataProcessorManager.changeDataProcess(provideData);
        }
        return null;
    }
  
    @Override
    public Class interest() {
        return NotifyProvideDataChange.class;
    }  
}

4.4 调用 Handler

在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。

于是NotifyProvideDataChangeHandler将作出响应。

4.5 获取 ProvideData

在 NotifyProvideDataChangeHandler 之中 ,有如下

ProvideData provideData = metaServerService.fetchData(dataInfoId);

然后调用 DefaultMetaServiceImpl 中 fetchData 来去 Meta Server 获取 ProvideData。

@Override
public ProvideData fetchData(String dataInfoId) {

    Map<String, Connection> connectionMap = metaServerConnectionFactory
        .getConnections(dataServerConfig.getLocalDataCenter());
    String leader = getLeader().getIp();
    if (connectionMap.containsKey(leader)) {
        Connection connection = connectionMap.get(leader);
        if (connection.isFine()) {
            try {
                Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() {

                    @Override
                    public FetchProvideDataRequest getRequestBody() {
                        return new FetchProvideDataRequest(dataInfoId);
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(connection.getRemoteIP(), connection.getRemotePort());
                    }
                };

                Response response = metaNodeExchanger.request(request);
                Object result = response.getResult();
                if (result instanceof ProvideData) {
                    return (ProvideData) result;
                } 
            } 
        }
    }
    String newip = refreshLeader().getIp();
    return null;
}

现在图示如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+
|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                |
|                           |     |                                                  |  1      | +-----------------------------+ |
|    getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | |
|                           |     +-----------------------------------+--------------+         | |                             | |
+---------------------------+                                         ^                        | |                             | |
                                                                      |                        | |  notifyProvideDataChange    | |
                                                                      |                        | |              +              | |
+------------------------------------------+                          |                        | |              |              | |
|     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ |
|                                          |                                                   +---------------------------------+
| +------------------------------------+   |                                                                    |       ^
| |  serverChangeHandler               |   |                                                                    |       |
| |                                    |   |                                                                    |       |
| |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       |
| |                                    |   |                                                                    |       |
| | +--------------------------------+ |   |                                     2                              |     3 |
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |
| | |                                | |   |                                                                            |
| | |                                | |   |                                                                            |
| | |                                | |   |                                                                            |
| | |  ProvideData provideData = +--------------------------------------------------------------------------------------+
| | |                                | |   |         get ProvideData from Meta Server          FetchProvideDataRequest
| | |  metaServerService.fetchData   | |   |
| | |                                | |   |
| | |                                | |   |
| | |                                | |   |
| | | changeDataProcess(provideData) | |   |
| | |                                | |   |
| | +--------------------------------+ |   |
| +------------------------------------+   |
+------------------------------------------+

手机上如下:

[从源码学设计]蚂蚁金服SOFARegistry之配置信息

0x05 回到了MetaServer

执行序列回到了MetaServer,它收到了FetchProvideDataRequest。

5.1 ata Server请求响应

FetchProvideDataRequestHandler是响应函数。函数逻辑相对简单,就是从DBService之中根据DataInfoId获取数据,返回给调用者。

public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> {

    @RaftReference
    private DBService           persistenceDataDBService;

    @Override
    public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) {
            DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId());
            if (ret.getOperationStatus() == OperationStatus.SUCCESS) {
                PersistenceData data = (PersistenceData) ret.getEntity();
                ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()),
                    fetchProvideDataRequest.getDataInfoId(), data.getVersion());
                return provideData;
            } else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) {
                ProvideData provideData = new ProvideData(null,
                    fetchProvideDataRequest.getDataInfoId(), null);
                return provideData;
            } 
        }
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    public Class interest() {
        return FetchProvideDataRequest.class;
    }
}

由此可见,这里的关键是 DBService。

于是从MetaServer角度看,流程如下:

+----------------------------------------------+
|                 Data Server                  |
|                                              |
| +---------------------------------------+    |
| |    NotifyProvideDataChangeHandler     |    |
| |                                       |    |
| |                                       |    |
| |metaSer^erSer^ice.fetchData(dataInfoId)|    |
| +---------------------------------------+    |
+----------------------------------------------+
                         |   ^
                         |   |
                    1    |   |
 FetchProvideDataRequest |   | ProvideData
                         |   |
                         |   |  4
 +-----------------------------------------+
 |    Meta Server        |   |             |
 |                       |   |             |
 |  +--------------------v---+-------+     |
 |  | FetchProvideDataRequestHandler |     |
 |  +--------------+---+-------------+     |
 |              2  |   ^                   |
 |                 |   | DBResponse        |
 | get(DataInfoId) |   |  3                |
 |                 v   |                   |
 |       +---------+---+------------+      |
 |       | PersistenceDataDBService |      |
 |       +--------------------------+      |
 +-----------------------------------------+

5.2 Session Server对应处理

Session Server 也会发起 FetchProvideDataRequest。在 SessionServerBootstrap 中有如下函数,都会发起请求,获取配置信息。

private void fetchStopPushSwitch(URL leaderUrl) {
    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
        ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID);
    Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
    if (ret instanceof ProvideData) {
        ProvideData provideData = (ProvideData) ret;
        provideDataProcessorManager.fetchDataProcess(provideData);
    } 
}

private void fetchEnableDataRenewSnapshot(URL leaderUrl) {
    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
        ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT);
    Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
    if (data instanceof ProvideData) {
        ProvideData provideData = (ProvideData) data;
        provideDataProcessorManager.fetchDataProcess(provideData);
    }
}

private void fetchBlackList() {
    blacklistManager.load();
}

0x06 DataServer

6.1 处理 ProvideData

在 NotifyProvideDataChangeHandler 之中,如下语句用来处理ProvideData。就是在fetchData之中。

在请求响应处理中

            Response response = metaNodeExchanger.request(request);
            Object result = response.getResult();
            if (result instanceof ProvideData) {
                return (ProvideData) result;
            } 

就是如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+
|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                |
|                           |     |                                                  |  1      | +-----------------------------+ |
|    getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | |
|                           |     +-----------------------------------+--------------+         | |                             | |
+---------------------------+                                         ^                        | |                             | |
                                                                      |                        | |  notifyProvideDataChange    | |
                                                                      |                        | |              +              | |
+------------------------------------------+                          |                        | |              |              | |
|     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ |
|                                          |                                                   +---------------------------------+
| +------------------------------------+   |                                                                    |       ^    |
| |  serverChangeHandler               |   |                                                                    |       |    |
| |                                    |   |                                                                    |       |    |
| |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       |    |
| |                                    |   |                                                                    |       |    |
| | +--------------------------------+ |   |                                     2                              |     3 |    | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    |
| | |                                | |   |                                                                            |    |
| | |                                | |   |                     get ProvideData from Meta Server                       |    |
| | |                                | |   |                                                                            |    |
| | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    |
| | |                                | |   |                                                                                 |
| | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | |                                | |   |                          ProvideData
| | |                                | |   |
| | |                                | |   |
| | | changeDataProcess(provideData) | |   |
| | |                                | |   |
| | +--------------------------------+ |   |
| +------------------------------------+   |
+------------------------------------------+

手机如下:

[从源码学设计]蚂蚁金服SOFARegistry之配置信息

继续处理是如下:

provideDataProcessorManager.changeDataProcess(provideData);

这就牵扯了如何用引擎处理。

6.1.1 Bean

这里生成了处理引擎 ProvideDataProcessorManager,添加了一个处理handler DatumExpireProvideDataProcessor。

@Configuration
public static class DataProvideDataConfiguration {

    @Bean
    public ProvideDataProcessor provideDataProcessorManager() {
        return new ProvideDataProcessorManager();
    }

    @Bean
    public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
        ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();
        ((ProvideDataProcessorManager) provideDataProcessorManager)
            .addProvideDataProcessor(datumExpireProvideDataProcessor);
        return datumExpireProvideDataProcessor;
    }

}

6.1.2 处理引擎 ProvideDataProcessorManager

这里的套路依然很熟悉,即ProvideDataProcessor引擎,也就是ProvideDataProcessorManager也继承了ProvideDataProcessor,但是在support之中设置了 return false,这样引擎遍历执行时候,就不会执行自己了。

public class ProvideDataProcessorManager implements ProvideDataProcessor {

    private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>();

    public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {
        provideDataProcessors.add(provideDataProcessor);
    }

    @Override
    public void changeDataProcess(ProvideData provideData) {
        for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {
            if (provideDataProcessor.support(provideData)) {
                provideDataProcessor.changeDataProcess(provideData);
            }
        }
    }

    @Override
    public boolean support(ProvideData provideData) {
        return false;
    }
}

6.1.3 处理Handler

这里的 DatumLeaseManager 就可以对应到前面讲的 AfterWorkingProcess。

Handler之中调用DatumLeaseManager完成配置数据的部署。

public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {

    @Autowired
    private DatumLeaseManager   datumLeaseManager;

    @Override
    public void changeDataProcess(ProvideData provideData) {
        if (checkInvalid(provideData)) {
            return;
        }
        boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
            .getObject());
        datumLeaseManager.setRenewEnable(enableDataDatumExpire);
    }

    private boolean checkInvalid(ProvideData provideData) {
        boolean invalid = provideData == null || provideData.getProvideData() == null
                          || provideData.getProvideData().getObject() == null;
        return invalid;
    }

    @Override
    public boolean support(ProvideData provideData) {
        return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId());
    }
}

最终,图示如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+
|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                |
|                           |     |                                                  |  1      | +-----------------------------+ |
|    getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | |
|                           |     +-----------------------------------+--------------+         | |                             | |
+---------------------------+                                         ^                        | |                             | |
                                                                      |                        | |  notifyProvideDataChange    | |
                                                                      |                        | |              +              | |
+------------------------------------------+                          |                        | |              |              | |
|     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ |
|                                          |                                                   +---------------------------------+
| +------------------------------------+   |                                                                    |       ^    |
| |  serverChangeHandler               |   |                                                                    |       |    |
| |                                    |   |                                                                    |       |    |
| |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       |    |
| |                                    |   |                                                                    |       |    |
| | +--------------------------------+ |   |                                     2                              |     3 |    | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    |
| | |                                | |   |                                                                            |    |
| | |                                | |   |                     get ProvideData from Meta Server                       |    |
| | |                                | |   |                                                                            |    |
| | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    |
| | |                                | |   |                                                                                 |
| | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | |                                | |   |                          ProvideData
| | |                                | |   |
| | |                                | |   |     5           +---------------------------------------------+
| | | changeDataProcess(provideData)+--------------+         |   ProvideDataProcessor                      |
| | |                                | |   |       |         |                                             |
| | +--------------------------------+ |   |       +-------> | changeDataProcess(ProvideData provideData)  |
| +------------------------------------+   |                 |                                             |
+------------------------------------------+                 +---------------------------------------------+

手机图例如下:

[从源码学设计]蚂蚁金服SOFARegistry之配置信息

0xFF 参考

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析

蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制

蚂蚁金服开源通信框架 SOFABolt 协议框架解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

蚂蚁通信框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

发表评论

评论已关闭。

相关文章