[从源码学设计]蚂蚁金服SOFARegistry之服务上线

[从源码学设计]蚂蚁金服SOFARegistry之服务上线

目录

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第十三篇,介绍从SessionServer角度看的服务上线。

本文以介绍业务为主,顺便整理逻辑,设计和模式。因为注册过程牵扯模块太多,所以本文仅仅专注在注册过程中Session Server的部分。

0x01 业务领域

1.1 应用场景

服务的上下线过程是指服务通过代码调用执行常规注册(Publisher#register) 和下线(Publisher#unregister)操作,不考虑因为服务宕机等意外情况导致的下线场景。

1.1.1 服务发布

一个典型的 “RPC 调用的服务寻址” 应用场景,服务的提供方通过如下两个步骤完成服务发布:

  1. 注册,将自己以 Publisher 的角色注册到 SOFARegistry;
  2. 发布,将需要发布的数据 (通常是IP 地址、端口、调用方式等) 发布到 SOFARegistry;

与此相对应的,服务的调用方通过如下步骤实现服务调用:

  1. 注册,将自己以 Subscriber 的角色注册到 SOFARegistry;
  2. 订阅,收到 SOFARegistry 推送的服务数据;

1.1.2 SessionServer的必要性

在SOFARegistry中,所有 Client 在注册和订阅数据时,根据 dataInfoId 做一致性 Hash,计算出应该访问哪一台 DataServer,然后与该 DataServer 建立长连接。

由于每个 Client 通常都会注册和订阅比较多的 dataInfoId 数据,因此我们可以预见每个 Client 均会与好几台 DataServer 建立连接。这个架构存在的问题是:“每台 DataServer 承载的连接数会随 Client 数量的增长而增长,每台 Client 极端的情况下需要与每台 DataServer 都建连,因此通过 DataServer 的扩容并不能线性的分摊 Client 连接数”。

所以,为数据分片层(DataServer)专门设计一个连接代理层是非常重要的,所以 SOFARegistry 就有了 SessionServer 这一层。随着 Client 数量的增长,可以通过扩容 SessionServer 就解决了单机的连接数瓶颈问题。

1.2 问题点

因为SessionServer是一个中间层,所以看起来好像比较简单,表面上看,就是接受,转发。

但是实际上,在大型系统中,应该如何在逻辑上,物理上实现模块分割,解耦都是非常有必要的。

1.3 阿里方案

我们主要看看阿里方案的注册部分。

1.3.1 注册过程

[从源码学设计]蚂蚁金服SOFARegistry之服务上线

服务的上下线过程,是指服务通过代码调用做正常的注册(publisher.register) 和 下线(publisher.unregister),不考虑因为服务宕机等意外情况导致的下线。如上图,大概呈现了“一次服务注册过程”的服务数据在内部流转过程。

  1. Client 调用 publisher.register 向 SessionServer 注册服务。
  2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
  3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
  4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
  5. SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
  6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。

因为篇幅所限,本文讨论的是前两点

1.3.2 图示

下图展示了 Publisher 注册的代码流转过程

这个过程也是采用了 Handler - Task & Strategy - Listener 的方式来处理,任务在代码内部的处理流程和订阅过程基本一致。

[从源码学设计]蚂蚁金服SOFARegistry之服务上线

0x02 Client SDK

PublisherRegistration 是Client的接口,发布数据的关键代码如下:

// 构造发布者注册表
PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");

// 将注册表注册进客户端并发布数据
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy");

// 如需覆盖上次发布的数据可以使用发布者模型重新发布数据
publisher.republish("10.10.1.1:12200?xx=zz");

发布数据的关键是构造 PublisherRegistration,该类包含三个属性:

属性名 属性类型 描述
dataId String 数据ID,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成。
group String 数据分组,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成,默认值 DEFAULT_GROUP。
appName String 应用 appName。

0x03 Session server

流程来到了Session server。

3.1 Bean

首先,可以通过Beans来入手。

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(publisherHandler());
    list.add(subscriberHandler());
    list.add(watcherHandler());
    list.add(clientNodeConnectionHandler());
    list.add(cancelAddressRequestHandler());
    list.add(syncConfigHandler());
    return list;
}

serverHandlers 是Bolt Server 的响应函数组合。

@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry() {
    return new SessionRegistry();
}

从Bean角度看,目前的逻辑是如图所示,这里有了一次解耦Strategy:

Beans


+-----------------------------------+
| Bolt Server(in openSessionServer) |        +---------------------------------+
|                                   |    +-> | DefaultPublisherHandlerStrategy |
|    +----------------------+       |    |   +---------+-----------------------+
|    |    serverHandlers    |       |    |             |
|    |                      |       |    |             |
|    | +------------------+ |       |    |             |
|    | | PublisherHandle+----------------+             v
|    | |                  | |       |          +-------+-------+
|    | | watcherHandler   | |       |          |SessionRegistry|
|    | |                  | |       |          +---------------+
|    | |     ......       | |       |
|    | +------------------+ |       |
|    +----------------------+       |
+-----------------------------------+

服务发布者和Session Server一般都应该处于一个Data Center之中,这就是阿里等实践的单体概念.

3.2 入口

PublisherHandler 是 Session Server对Client的接口,是Bolt Server 的响应函数。

public class PublisherHandler extends AbstractServerHandler {
    @Autowired
    private ExecutorManager          executorManager;

    @Autowired
    private PublisherHandlerStrategy publisherHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) throws RemotingException {

        RegisterResponse result = new RegisterResponse();
        PublisherRegister publisherRegister = (PublisherRegister) message;
        publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
        return result;
    }

逻辑如下图所示:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |
                              |    |                      |       |
+--------+  PublisherRegister |    | +------------------+ |       |
| Client +---------------------------> PublisherHandler | |       |
+--------+          1         |    | |                  | |       |
               +              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
               |              +-----------------------------------+
               |

3.3 策略

整体上,这里是采用 Handler - Task & Strategy - Listener 的方式来处理。

什么是策略模式(Strategy Pattern)

在软件开发过程中常常遇到这样的情况,实现某一个功能有很多种算法或实现策略,我们可以根据环境或者条件的不同选择不同的算法或者策略来完成该功能。如果将这些算法或者策略抽象出来,提供一个统一的接口,不同的算法或者策略有不同的实现类,这样在程序客户端就可以通过注入不同的实现对象来实现算法或者策略的动态替换,这种模式的可扩展性和可维护性也更高,这就是策略模式。

策略模式的定义(Strategy Pattern)

  • 策略模式: 定义了算法族,分别封装起来,让它们之间可以相互替换,此模式让算法的变化独立与使用算法的客户。

  • 简单理解: 定义了一系列算法。每个算法封装起来。各个算法之间可以互相替换。且算法的变化不会影响到使用算法的客户。属于行为型模式。

在策略模式(Strategy Pattern)中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式

在策略模式中,我们创建表示各种策略的对象和一个行为随着策略对象改变而改变的 context 对象。策略对象改变 context 对象的执行算法。

3.3.1 目录结构

从目录结构看,有很多Strategy的定义和实现,应该蚂蚁内部希望根据不同情况制定不同的策略,其中有些是目前留出的接口

com/alipay/sofa/registry/server/session/strategy

.
├── DataChangeRequestHandlerStrategy.java
├── PublisherHandlerStrategy.java
├── ReceivedConfigDataPushTaskStrategy.java
├── ReceivedDataMultiPushTaskStrategy.java
├── SessionRegistryStrategy.java
├── SubscriberHandlerStrategy.java
├── SubscriberMultiFetchTaskStrategy.java
├── SubscriberRegisterFetchTaskStrategy.java
├── SyncConfigHandlerStrategy.java
├── TaskMergeProcessorStrategy.java
├── WatcherHandlerStrategy.java
└── impl
    ├── DefaultDataChangeRequestHandlerStrategy.java
    ├── DefaultPublisherHandlerStrategy.java
    ├── DefaultPushTaskMergeProcessor.java
    ├── DefaultReceivedConfigDataPushTaskStrategy.java
    ├── DefaultReceivedDataMultiPushTaskStrategy.java
    ├── DefaultSessionRegistryStrategy.java
    ├── DefaultSubscriberHandlerStrategy.java
    ├── DefaultSubscriberMultiFetchTaskStrategy.java
    ├── DefaultSubscriberRegisterFetchTaskStrategy.java
    ├── DefaultSyncConfigHandlerStrategy.java
    └── DefaultWatcherHandlerStrategy.java 

3.3.2 DefaultPublisherHandlerStrategy

从目前代码看,只是设置,分类,转发。即设置Publisher的缺省信息,并且根据 event type 不同执行register或者unRegister。

public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
    @Autowired
    private Registry            sessionRegistry;

    @Override
    public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
        try {
            String ip = channel.getRemoteAddress().getAddress().getHostAddress();
            int port = channel.getRemoteAddress().getPort();
            publisherRegister.setIp(ip);
            publisherRegister.setPort(port);

            if (StringUtils.isBlank(publisherRegister.getZone())) {
                publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
            }

            if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
                publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
            }

            Publisher publisher = PublisherConverter.convert(publisherRegister);
            publisher.setProcessId(ip + ":" + port);
            publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
            if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.register(publisher);
            } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.unRegister(publisher);
            }
            registerResponse.setSuccess(true);
            registerResponse.setVersion(publisher.getVersion());
            registerResponse.setRegistId(publisherRegister.getRegistId());
            registerResponse.setMessage("Publisher register success!");
        } 
    }
}

逻辑如下图所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |                          |                               |
               +              |    | |  watcherHandler  | |       |                          +-------------------------------+
               |              |    | |                  | |       |
               |              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
                              +-----------------------------------+

手机如图
[从源码学设计]蚂蚁金服SOFARegistry之服务上线

3.4 核心逻辑组件

前面代码中,策略会调用到 sessionRegistry.register(publisher),即注册功能。

从SessionRegistry的内部成员变量就能够看出来,这是 Session Server 核心逻辑所在,相对于业务引擎。

主要提供了如下功能:

  • register(StoreData data) :注册新publisher或者subscriber data

  • cancel(List connectIds) :取消publisher或者subscriber data

  • remove(List connectIds) :移除publisher或者subscriber data

  • unRegister(StoreData data) :注销publisher或者subscriber data

  • .....

具体成员变量如下:

public class SessionRegistry implements Registry {

    /**
     * store subscribers
     */
    @Autowired
    private Interests                 sessionInterests;

    /**
     * store watchers
     */
    @Autowired
    private Watchers                  sessionWatchers;

    /**
     * store publishers
     */
    @Autowired
    private DataStore                 sessionDataStore;

    /**
     * transfer data to DataNode
     */
    @Autowired
    private DataNodeService           dataNodeService;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    @Autowired
    private TaskListenerManager       taskListenerManager;

    /**
     * calculate data node url
     */
    @Autowired
    private NodeManager               dataNodeManager;

    @Autowired
    private SessionServerConfig       sessionServerConfig;

    @Autowired
    private Exchange                  boltExchange;

    @Autowired
    private SessionRegistryStrategy   sessionRegistryStrategy;

    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;

    @Autowired
    private DataIdMatchStrategy       dataIdMatchStrategy;

    @Autowired
    private RenewService              renewService;

    @Autowired
    private WriteDataAcceptor         writeDataAcceptor;

    private volatile boolean          enableDataRenewSnapshot = true;
}

register函数生成一个WriteDataRequest,然后调用了 writeDataAcceptor.accept 完成处理。

@Override
public void register(StoreData storeData) {

    WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
            new Wrapper<StoreData, Boolean>() {
                @Override
                public Boolean call() {

                    switch (storeData.getDataType()) {
                        case PUBLISHER:
                            Publisher publisher = (Publisher) storeData;

                            sessionDataStore.add(publisher);

                            // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
                            // are handed over to WriteDataAcceptor
                            writeDataAcceptor.accept(new WriteDataRequest() {
                                @Override
                                public Object getRequestBody() {
                                    return publisher;
                                }

                                @Override
                                public WriteDataRequestType getRequestType() {
                                    return WriteDataRequestType.PUBLISHER;
                                }

                                @Override
                                public String getConnectId() {
                                    return publisher.getSourceAddress().getAddressString();
                                }

                                @Override
                                public String getDataServerIP() {
                                    Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
                                    return dataNode.getNodeUrl().getIpAddress();
                                }
                            });

                            sessionRegistryStrategy.afterPublisherRegister(publisher);
                            break;
                        case SUBSCRIBER:
                            Subscriber subscriber = (Subscriber) storeData;

                            sessionInterests.add(subscriber);

                            sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                            break;
                        case WATCHER:
                            Watcher watcher = (Watcher) storeData;

                            sessionWatchers.add(watcher);

                            sessionRegistryStrategy.afterWatcherRegister(watcher);
                            break;
                        default:
                            break;
                    }
                    return null;
                }

                @Override
                public Supplier<StoreData> getParameterSupplier() {
                    return () -> storeData;
                }

            }, wrapperInterceptorManager);

    try {
        wrapperInvocation.proceed();
    } catch (Exception e) {
        throw new RuntimeException("Proceed register error!", e);
    }

}

目前逻辑如下图所示:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                                    3  | register
               |              |    +----------------------+       |                                       |
                              +-----------------------------------+                                       |
                                                                                                          v
                                                                                      +-------------------+-------------------+
                                                                                      |           SessionRegistry             |
                                                                                      |                                       |
                                                                                      |                                       |
                                                                                      |  storeData.getDataType() == PUBLISHER |
                                                                                      +---------------------------------------+

手机如下:
[从源码学设计]蚂蚁金服SOFARegistry之服务上线

3.4.1 SessionRegistryStrategy

这里又出现一个策略,目前也只有一个实现,应该也是想要未来做成替换,目前功能只是简单的留下了接口为空。

我们可以看出阿里处处想解耦的思路

public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
    @Override
    public void afterPublisherRegister(Publisher publisher) {

    }
}

3.4.2 存储模块

前文在注册过程中有:

sessionDataStore.add(publisher);

这里就是Session的 数据存储模块,也是系统的核心存储了在本Session注册的所有Publisher

public class SessionDataStore implements DataStore {
    /**
     * publisher store
     */
    private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry      = new ConcurrentHashMap<>();

    /*** index */
    private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>>  connectIndex  = new ConcurrentHashMap<>();
}

这里记录了两种存储方式,分别是按照 dataInfoId 和 connectId 来存储。

存储时候,会从版本号和时间戳两个维度来比较

@Override
public void add(Publisher publisher) {
    Publisher.internPublisher(publisher);

    write.lock();
    try {
        Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());

        if (publishers == null) {
            ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
            publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
            if (publishers == null) {
                publishers = newmap;
            }
        }

        Publisher existingPublisher = publishers.get(publisher.getRegisterId());

        if (existingPublisher != null) {

            if (existingPublisher.getVersion() != null) {
                long oldVersion = existingPublisher.getVersion();
                Long newVersion = publisher.getVersion();
                if (newVersion == null) {
                    return;
                } else if (oldVersion > newVersion) {
                    return;
                } else if (oldVersion == newVersion) {
                    Long newTime = publisher.getRegisterTimestamp();
                    long oldTime = existingPublisher.getRegisterTimestamp();
                    if (newTime == null) {
                        return;
                    }
                    if (oldTime > newTime) {
                        return;
                    }
                }
            }
        }
        publishers.put(publisher.getRegisterId(), publisher);
        addToConnectIndex(publisher);

    } finally {
        write.unlock();
    }
}

3.5 Acceptor模块

在SessionServer本身存储完成之后,接下来就是通知Data Server了。

下面工作的目的是:把收到的注册信息转发给 DataServer,但是为了接耦,这里做了比较复杂的工作。

3.5.1 总体Acceptor

WriteDataAcceptorImpl 负责处理具体Publisher的写入。首先需要把写入请求统一起来

使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap(); 来统一存储所有的写入请求

这里根据不同的Connection来处理不同连接的写入请求

具体如下:

public class WriteDataAcceptorImpl implements WriteDataAcceptor {

    @Autowired
    private TaskListenerManager             taskListenerManager;

    @Autowired
    private SessionServerConfig             sessionServerConfig;

    @Autowired
    private RenewService                    renewService;

    /**
     * acceptor for all write data request
     * key:connectId
     * value:writeRequest processor
     *
     */
    private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();

    public void accept(WriteDataRequest request) {
        String connectId = request.getConnectId();
        WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
                key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

        writeDataProcessor.process(request);
    }
  
    public void remove(String connectId) {
        writeDataProcessors.remove(connectId);
    }
}

目前逻辑如下图所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +-----------------------------------------------------+                    +-------------+-------------------------+
               | |           WriteDataAcceptorImpl                     |  WriteDataRequest  |           SessionRegistry             |
               | |                                                     | <------------------+                                       |
               | |                                                     |                    |                                       |
               | | Map<String, WriteDataProcessor> writeDataProcessors |                    |  storeData.getDataType() == PUBLISHER |
               | |                                                     |                    +---------------------------------------+
               + +-----------------------------------------------------+

手机如图
[从源码学设计]蚂蚁金服SOFARegistry之服务上线

3.5.2 具体处理

前面已经把所有请求统一起来,现在就需要针对每一个连接的写入继续处理

这里关键是如下数据结构,就是每一个连接的写入请求 放到了queue中。使用 Queue 的目的是为了内部做缓存,统一发给 DataServer。

ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue

针对每个请求不同,做不同处理。

对于我们的例子,处理如下:

case PUBLISHER: {
		doPublishAsync(request);
}

而最终是向taskListenerManager发送给请求TaskType.PUBLISH_DATA_TASK,该请求将被PublishDataTaskListener调用publishDataTask来处理。

这里有一个listener解耦,我们接下来讲解。

private void doPublishAsync(WriteDataRequest request) {
    sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}

private void sendEvent(Object eventObj, TaskType taskType) {
		TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
		taskListenerManager.sendTaskEvent(taskEvent);
}

具体代码如下:

public class WriteDataProcessor {
    private final TaskListenerManager               taskListenerManager;

    private final SessionServerConfig               sessionServerConfig;

    private final RenewService                      renewService;

    private final String                            connectId;

    private Map<String, AtomicLong>                 lastUpdateTimestampMap = new ConcurrentHashMap<>();

    private AtomicBoolean                           writeDataLock          = new AtomicBoolean(
                                                                               false);

    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue          = new ConcurrentLinkedQueue();

    private AtomicInteger                           acceptorQueueSize      = new AtomicInteger(0);

    public void process(WriteDataRequest request) {
        // record the last update time by pub/unpub
        if (isWriteRequest(request)) {
            refreshUpdateTime(request.getDataServerIP());
        }

        if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
            // snapshot has high priority, so handle directly
            doHandle(request);
        } else {
            // If locked, insert the queue;
            // otherwise, try emptying the queue (to avoid residue) before processing the request.
            if (writeDataLock.get()) {
                addQueue(request);
            } else {
                flushQueue();
                doHandle(request);
            }
        }

    }

    private void doHandle(WriteDataRequest request) {
        switch (request.getRequestType()) {
            case PUBLISHER: {
                doPublishAsync(request);
            }
                break;
            case UN_PUBLISHER: {
                doUnPublishAsync(request);
            }
                break;
            case CLIENT_OFF: {
                doClientOffAsync(request);
            }
                break;
            case RENEW_DATUM: {
                if (renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                doRenewAsync(request);
            }
                break;
            case DATUM_SNAPSHOT: {
                if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                halt();
                try {
                    doSnapshotAsync(request);
                } finally {
                    resume();
                }
            }
                break;
    }
      
    private void doPublishAsync(WriteDataRequest request) {
        sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
    }
      
    private void sendEvent(Object eventObj, TaskType taskType) {
        TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
        taskListenerManager.sendTaskEvent(taskEvent);
    }
}

如下图所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +---------------------------------------------------------+                    +---------+-----------------------------+
               | |           WriteDataAcceptorImpl                         |  WriteDataRequest  |           SessionRegistry             |
               | |                                                         | <------------------+                                       |
               | |                                                         |       4            |   sessionDataStore.add(publisher)     |
               | | Map<connectId , WriteDataProcessor> writeDataProcessors |                    |                                       |
               | |                                                         |                    |  storeData.getDataType() == PUBLISHER |
               | +----------------------+----------------------------------+                    |                                       |
               |                process | 5                                                     +---------------------------------------+
               |                        v
               |    +-------------------+---------------------+                     +--------------------------+
               |    |          WriteDataProcessor             |                     |  PublishDataTaskListener |
               |    |                                         |  PUBLISH_DATA_TASK  |                          |
               |    | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> |      PublishDataTask     |
               |    |                                         |      6              +--------------------------+
               +    +-----------------------------------------+

手机如图 :
[从源码学设计]蚂蚁金服SOFARegistry之服务上线

3.6 Listener 解耦

前面在逻辑上都是一体化的,在这里,进行了一次解耦。

3.6.1 解耦引擎

DefaultTaskListenerManager 是解耦的机制,可以看到,其中添加了listener,当用户调用sendTaskEvent时候,将遍历所有的listeners,调用对应的listener。

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners() {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for (TaskListener taskListener : taskListeners) {
            taskListener.handleEvent(taskEvent);
        }
    }
}

3.6.2 Listener

PublishDataTaskListener是对应的处理函数,在其support函数中,声明了支持PUBLISH_DATA_TASK。这样就完成了解耦。

public class PublishDataTaskListener implements TaskListener {

    @Autowired
    private DataNodeService dataNodeService;

    @Autowired
    private TaskProcessor   dataNodeSingleTaskProcessor;

    @Autowired
    private ExecutorManager executorManager;

    @Override
    public TaskType support() {
        return TaskType.PUBLISH_DATA_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {

        SessionTask publishDataTask = new PublishDataTask(dataNodeService);

        publishDataTask.setTaskEvent(event);

        executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
    }
}

3.7 Task调度

上面找到了Listener,Listener中通过如下代码启动了执行业务的task来处理。但是这背后的机制需要探究。

executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));

3.7.1 ExecutorManager

ExecutorManager 之中,对于线程池做了统一的启动,关闭。publishDataExecutor就是其中之一。

ExecutorManager相关代码摘取如下:

public class ExecutorManager {

    private final ScheduledThreadPoolExecutor scheduler;

    private final ThreadPoolExecutor          publishDataExecutor;

    private static final String               PUBLISH_DATA_EXECUTOR                      = "PublishDataExecutor";

    public ExecutorManager(SessionServerConfig sessionServerConfig) {
      
        publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
                k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
                        sessionServerConfig.getPublishDataExecutorMinPoolSize(),
                        sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
                        sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
                        new NamedThreadFactory("PublishData-executor", true)));
    }
  
		public ThreadPoolExecutor getPublishDataExecutor() {
        return publishDataExecutor;
    }
}

其中ExecutorManager的bean如下:

@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
    return new ExecutorManager(sessionServerConfig);
}

3.7.2 Processor

Processor是任务定义,内部封装了task。

public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {

    @Override
    public ProcessingResult process(SessionTask task) {
        try {
            task.execute();
            return ProcessingResult.Success;
        } catch (Throwable throwable) {
            if (task instanceof Retryable) {
                Retryable retryAbleTask = (Retryable) task;
                if (retryAbleTask.checkRetryTimes()) {
                    return ProcessingResult.TransientError;
                }
            }
            return ProcessingResult.PermanentError;
        }
    }

    @Override
    public ProcessingResult process(List<SessionTask> tasks) {
        return null;
    }
}

3.7.3 业务Task

PublishDataTask的execute 之中 ,调用dataNodeService.register(publisher)进行注册。

public class PublishDataTask extends AbstractSessionTask {

    private final DataNodeService dataNodeService;

    private Publisher             publisher;

    public PublishDataTask(DataNodeService dataNodeService) {
        this.dataNodeService = dataNodeService;
    }

    @Override
    public void execute() {
        dataNodeService.register(publisher);
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        //taskId create from event
        if (taskEvent.getTaskId() != null) {
            setTaskId(taskEvent.getTaskId());
        }

        Object obj = taskEvent.getEventObj();
        if (obj instanceof Publisher) {
            this.publisher = (Publisher) obj;
        } 
    }
}

具体如下

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
                       |
     PUBLISH_DATA_TASK |
                       |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       |
                       v
              +--------+--------+
              | PublishDataTask |
              +-----------------+

3.8 转发服务信息

经过listener解耦之后,PublishDataTask就调用了dataNodeService.register(publisher),于是接下来就是转发服务信息给Data Server

就是根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer

此处就是调用DataNodeServiceImpl的register函数来把请求转发给Data Server。

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;

    @Autowired
    private SessionServerConfig   sessionServerConfig;

    private AsyncHashedWheelTimer asyncHashedWheelTimer;
}

可以看到,建立了PublishDataRequest,然后通过Bolt Client,发送给Data Server。

@Override
public void register(final Publisher publisher) {
    String bizName = "PublishData";
    Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
    try {
        sendRequest(bizName, request);
    } catch (RequestException e) {
        doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(),
            sessionServerConfig.getPublishDataTaskRetryFirstDelay(),
            sessionServerConfig.getPublishDataTaskRetryIncrementDelay());
    }
}

private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
        Response response = dataNodeExchanger.request(request);
        Object result = response.getResult();
        CommonResponse commonResponse = (CommonResponse) result;
        return commonResponse;
}

如下:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       v
              +--------+--------+
              | PublishDataTask |
              +--------+--------+
              register |
                       |
            +----------v----------+
            | DataNodeServiceImpl |
            +----------+----------+
    PublishDataRequest |
                       v
            +----------+----------+  Client.sendSync   +------------+
            |  DataNodeExchanger  +------------------> | Data Server|
            +---------------------+ PublishDataRequest +------------+

如何知道发给哪一个Data Sever?DataNodeExchanger 中有:

@Override
public Response request(Request request) throws RequestException {

    Response response;
    URL url = request.getRequestUrl();
    try {
        Client sessionClient = getClient(url);

        final Object result = sessionClient
                .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());

        response = () -> result;
    } 

    return response;
}

于是去DataNodeServiceImpl寻找

private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
    return new Request<PublishDataRequest>() {
        private AtomicInteger retryTimes = new AtomicInteger();

        @Override
        public PublishDataRequest getRequestBody() {
            PublishDataRequest publishDataRequest = new PublishDataRequest();
            publishDataRequest.setPublisher(publisher);
            publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
                .getSessionProcessId());
            return publishDataRequest;
        }

        @Override
        public URL getRequestUrl() {
            return getUrl(publisher.getDataInfoId());
        }

        @Override
        public AtomicInteger getRetryTimes() {
            return retryTimes;
        }
    };
}

private URL getUrl(String dataInfoId) {
        Node dataNode = dataNodeManager.getNode(dataInfoId);
        //meta push data node has not port
        String dataIp = dataNode.getNodeUrl().getIpAddress();
        return new URL(dataIp, sessionServerConfig.getDataServerPort());
}

在 DataNodeManager中有:

@Override
public DataNode getNode(String dataInfoId) {
    DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
    return dataNode;
}

可见是通过dataInfoId计算出hash,然后 从DataNodeManager之中获取对应的DataNode,得到其url

于是,上图拓展为:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |  1
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |  2
                       v
              +--------+--------+        4     +---------------+
              | PublishDataTask |     +------> |DataNodeManager|
              +--------+--------+     |        +---------------+
              register |  3           |  consistentHash|
                       |              |                | 5
            +----------v----------+---+                v
            | DataNodeServiceImpl |       6      +-----+----+
            +----------+----------+ <------------+ DataNode |
    PublishDataRequest | 7              url      +----------+
                       v
            +----------+----------+
            |  DataNodeExchanger  |
            +----------+----------+
                       |
       Client.sendSync | PublishDataRequest
                       |
                       v 8
                 +-----+------+
                 | Data Server|
                 +------------+

0x04 总结

回顾下“一次服务注册过程”的服务数据在内部流转过程。

  1. Client 调用 publisher.register 向 SessionServer 注册服务。
  2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
  3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
  4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
  5. SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
  6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。

因为篇幅所限,本文讨论的是前两点,后续会有文章介绍另外几点

0xFF 参考

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析

蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制

蚂蚁金服开源通信框架 SOFABolt 协议框架解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

蚂蚁通信框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

发表评论

评论已关闭。

相关文章