SOFA 源码分析 —— 服务发布过程

作者:jcmp      发布时间:2021-05-05      浏览量:0
SOFA 包含了 RPC 框架,底层通信

SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA—RPC 源码中的例子,看看他是如何发布一个服务的。

示例代码

下面的代码在 com.alipay.sofa.rpc.quickstart.QuickStartServer 类下。

ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(9696) // 设置一个端口,默认12200 .setDaemon(false); // 非守护线程ProviderConfig providerConfig = new ProviderConfig() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定实现 .setServer(serverConfig); // 指定服务端providerConfig.export(); // 发布服务。

首先,创建一个 ServerConfig ,包含了端口,协议等基础信息,当然,这些都是手动设定的,在该类加载的时候,会自动加载很多配置文件中的服务器默认配置。比如 RpcConfigs 类,RpcRuntimeContext 上下文等。

然后呢,创建一个 ProviderConfig,也是个 config,不过多继承了一个 AbstractInterfaceConfig 抽象类,该类是接口级别的配置,而 ServerConfig 是 服务器级别的配置。虽然都继承了 AbstractIdConfig。

ProviderConfig 包含了接口名称,接口指定实现类,还有服务器的配置。

最后,ProviderConfig 调用 export 发布服务。

展示给我的 API 很简单,但内部是如何实现的呢?

在看源码之前,我们思考一下:如果我们自己来实现,怎么弄?

RPC 框架简单一点来说,就是使用动态代理和 Socket。

SOFA 使用 Netty 来做网络通信框架,我们之前也写过一个简单的 Netty RPC,主要是通过 handler 的 channelRead 方法来实现。

SOFA 是这么操作的吗?

一起来看看。

# 源码分析

上面的示例代码其实就是 3 个步骤,创建 ServerConfig,创建 ProviderConfig,调用 export 方法。

先看第一步,还是有点意思的。

虽然是空构造方法,但 ServerConfig 的属性都是自动初始化的,而他的父类 AbstractIdConfig 更有意思了,父类有 1 个地方值得注意:

static { RpcRuntimeContext.now();}

熟悉类加载的同学都知道,这是为了主动加载 RpcRuntimeContext ,看名字是 RPC 运行时上下文,所谓上下文,大约就是我们人类聊天中的 "老地方" 的意思。

这个上下文会在静态块中加载 Module(基于扩展点实现),注册 JVM 关闭钩子(类似 Tomcat)。还有很多配置信息。

然后呢?创建 ProviderConfig 对象。这个类比上面的那个类多继承了一个 AbstractInterfaceConfig,接口级别的配置。比如有些方法我不想发布啊,比如权重啊,比如超时啊,比如具体的实现类啊等等,当然还需要一个 ServerConfig 的属性(注册到 Server 中啊喂)。

最后就是发布了。export 方法。

ProviderCofing 拥有一个 export 方法,但并不是直接就在这里发布的,因为他是一个 config,不适合在config 里面做这些事情,违背单一职责。

SOFA 使用了一个 Bootstrap 类来进行操作。和大部分服务器类似,这里就是启动服务器的地方。因为这个类会多线程使用,比如并发的发布服务。而不是一个一个慢慢的发布服务。所以他不是单例的,而是和 Config 一起使用的,并缓存在 map 中。

ProviderBootstrap 目前有 3 个实现:Rest,Bolt,Dubbo。Bolt 是他的默认实现。

export 方法默认有个实现(Dubbo 的话就要重写了)。主要逻辑是执行 doExport 方法,其中包括延迟加载逻辑。

而 doExport 方法中,就是 SOFA 发布服务的逻辑所在了。

楼主将方法的异常处理逻辑去除,整体如下:

private void doExport() { if (exported) { return; } String key = providerConfig.buildKey(); String appName = providerConfig.getAppName(); // 检查参数 checkParameters(); // 注意同一interface,同一uniqleId,不同server情况 AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 if (cnt == null) { // 没有发布过 cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); } int c = cnt.incrementAndGet(); int maxProxyCount = providerConfig.getRepeatedExportLimit(); if (maxProxyCount > 0) { // 超过最大数量,直接抛出异常 } // 构造请求调用器 providerProxyInvoker = new ProviderProxyInvoker(providerConfig); // 初始化注册中心 if (providerConfig.isRegister()) { List registryConfigs = providerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { for (RegistryConfig registryConfig : registryConfigs) { RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry } } } // 将处理器注册到server List serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) { Server server = serverConfig.buildIfAbsent(); // 注册序列化接口 server.registerProcessor(providerConfig, providerProxyInvoker); if (serverConfig.isAutoStart()) { server.start(); } } // 注册到注册中心 providerConfig.setConfigListener(new ProviderAttributeListener()); register(); // 记录一些缓存数据 RpcRuntimeContext.cacheProviderConfig(this); exported = true; }

主要逻辑如下:

一起来详细看看。

Invoker 怎么构造的?很简单,最主要的就是过滤器。关于过滤器,我们之前已经写过一篇文章了。不再赘述。

关键看看 Server 是如何构造的。

关键代码 serverConfig.buildIfAbsent() ,类似 HashMap 的 putIfAbsent。如果不存在就创建。

Server 接口目前有 2 个实现,bolt 和 rest。当然,Server 也是基于扩展的,所以,不用怕,可以随便增加实现。

关键代码在 ServerFactory 的 getServer 中,其中会获取扩展点的 Server,然后,执行 Server 的 init 方法,我们看看默认 bolt 的 init 方法。

@Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; // 启动线程池 bizThreadPool = initThreadPool(serverConfig); boltServerProcessor = new BoltServerProcessor(this); }

保存了 serverConfig 的引用,启动了一个业务线程池,创建了一个 BoltServerProcessor 对象。

第一:这个线程池会在 Bolt 的 RpcHandler 中被使用,也就是说,复杂业务都是在这个线程池执行,不会影响 Netty 的 IO 线程。

第二:BoltServerProcessor 非常重要,他的构造方法包括了当前的 BoltServer,所以他俩是互相依赖的。关键点来了:

BoltServerProcessor 实现了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一个 Map> ,所以,当 RpcHandler 被执行 channelRead 方法的时候,一定会根据接口名称找到对应的 UserProcessor,并执行他的 handlerRequest 方法。

那么,RpcHandler 是什么时候创建并放置到 RpcHandler 中的呢?

具体是这样的:在 server.start() 执行的时候,该方法会初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,将 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。 然后 ,当 RpcServer 启动的时候,也就是 start 方法,会执行一个 init 方法,该方法内部就是设置 Netty 各种属性的地方,包括 Hander,其中有 2 行代码对我们很重要:

final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);pipeline.addLast("handler", rpcHandler);

创建了一个 RpcHandler,并添加到 pipeline 中,这个 Handler 的构造参数就是包含所有 BoltServerProcessor 的 Map。

所以,总的流程就是:

而大概的 UML 图就是下面这样的:

红色部分是 RPC 的核心,包含 Solt 的 Server,实现 UserProcessor 接口的 BoltServerProcessor,业务线程池,存储所有接口实现的 Map。

绿色部分是 Bolt 的接口和类,只要实现了 UserProcessor 接口,就能将具体实现替换,也既是处理具体数据的逻辑。

最后,看看关键类 BoltServerProcessor ,他是融合 RPC 和 Bolt 的胶水类。

该类会注册一个序列化器替代 Bolt 默认的。handleRequest 方法是这个类的核心方法。有很多逻辑,主要看这里:

// 查找服务Invoker invoker = boltServer.findInvoker(serviceName);// 真正调用response = doInvoke(serviceName, invoker, request);/** * 找到服务端Invoker * * @param serviceName 服务名 * @return Invoker对象 */public Invoker findInvoker(String serviceName) { return invokerMap.get(serviceName);}

根据服务名称,从 Map 中找到服务,然后调用 invoker 的 invoker 方法。

再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的调用链,使用 IDEA 的 Hierarchy 功能,查看该方法,最后停留在 ProcessTast 中,一个 Runnable.

根据经验,这个类肯定是被放到线程池了。什么时候放的呢?看看他的构造方法的 Hierarchy。

从图中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最终会调用 ProcessTask 的 构造方法。

那么 BoltServer 的用户线程池什么时候使用呢?还是使用 IDEA 的 Hierarchy 功能。

其实也是在这个过程中,当用户没有设置线程池,则使用系统线程池。

总结

好了,关于 SOFA 的服务发布和服务的接收过程,就介绍完了,可以说,整个框架还是非常轻量级的。基本操作就是:内部通过在 Netty的 Handler 中保存一个存储服务实现的 Map 完成远程调用。

其实和我们之前用 Netty 写的小 demo 类似。