4.dubbo源码-发布服务

作者:jcmp      发布时间:2021-05-12      浏览量:0
一、dubbo服务发布dubbo服务发布

一、dubbo服务发布

dubbo服务发布只需在spring.xml中如下配置即可:

二、export初始化

通过 2-dubbo结合spring 可知, 解析后封装到 ServiceBean 中; ServiceBean 定义如下,继承了dubbo定义的类ServiceConfig,实现了5个spring的接口,为了融入spring容器的启动过程中:

public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware { ... ...}

ServiceBean实现了 ApplicationListener 接口,当spring容器触发了ContextRefreshedEvent事件时,就会调用 ServiceConfig 中的 export() 方法发布 申明的dubbo服务,且在dubbo的info级别日志中有相应的日志:

public void onApplicationEvent(ApplicationEvent event) { if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) { if (isDelay() && ! isExported() && ! isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } }}

ServiceConfig.export()

ServiceConfig 中 export() 方法部分源码如下,如果 中申明了 delay (例如 ),那么延迟调用 doExport() 发布这个服务,否则直接调用 doExport() 发布服务:

public synchronized void export() { ... ... if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); }}

ServiceConfig.doExport() 的作用:

ServiceConfig.doExportUrls()

通过调用loadRegistries(true)得到所有registry的url地址,例如在 dubbo.properties 中通过配置 dubbo.registry.address=zookeeper://127.0.0.1:2181 ;protocols就是将要发布服务的协议集合(dubbo服务可以同时暴露多种协议),可以在 dubbo.properties 中配置,以dubbo协议为例:

ServiceConfig.doExportUrls() 源码如下:

private void doExportUrls() { List registryURLs = loadRegistries(true); // 一般只配置dubbo协议,那么protocols就是: for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); }}

ServiceConfig.doExportUrlsFor1Protocol()。

先把所有相关属性封装到Map中,例如protocol=dubbo,host=10.0.0.1,port=20880,path=com.alibaba.dubbo.demo.TestService等,然后构造dubbo定义的统一数据模型URL:

得到的url如下所示(这个url非常重要,贯穿整个dubbo服务的发布和调用过程,可以在服务发布后在dubbo-monitor中看到):

ServiceConfig.doExportUrlsFor1Protocol() 中根据scope判断服务的发布范围:

实现源码如下:

//配置为none不暴露if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } // 如果注册url地址存在,例如申明了注册的zk地址 if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { // 注册的zk地址可能是集群,那么需要遍历这些地址一一进行注册 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); // 如果申明了dubbo-monitor,那么再url地址上append类似monitor=monitor全地址 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // 默认都是dubbo协议,所以调用DubboProtol.export(Invoker) Exporter exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter exporter = protocol.export(invoker); exporters.add(exporter); } }}

三、Protocol.export()

com.alibaba.dubbo.rpc.Protocol中暴露服务接口申明:

RegistryProtocol.export()

源码如下:

public Exporter export(final Invoker originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper exporter = doLocalExport(originInvoker); //registry provider,根据发布的服务originInvoker得到Registry实例,由于一般都使用zookeeper为注册中心,所以这里得到的是ZookeeperRegistry; final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // 所以这里调用ZookeeperRegistry.register(URL)把需要发布的服务注册到zookeeper中 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter() { public Invoker getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } };}

核心调用 registry.register(registedProviderUrl)。

核心调用 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener) :

四、重试机制

注册服务失败后,会将url加入重试url集合中, failedRegistered.add(url); 重试任务在FailbackRegistry中实现:

public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // retryExecutor是一个单独的线程池Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); 默认重试周期是5s; this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { retry(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);}

五、监听机制

ChildListener zkListener = listeners.get(listener);if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener);}zkClient.create(path, false);// 在准备监听的path上添加ChildListenerList children = zkClient.addChildListener(path, zkListener);

for (String method : new HashSet(newMethodInvokerMap.keySet())) { List> methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));}return Collections.unmodifiableMap(newMethodInvokerMap);

其中 InvokerComparator 的定义如下,即直接根据url进行比较排序:

private static class InvokerComparator implements Comparator> { private static final InvokerComparator comparator = new InvokerComparator(); public static InvokerComparator getComparator() { return comparator; } private InvokerComparator() {} public int compare(Invoker o1, Invoker o2) { return o1.getUrl().toString().compareTo(o2.getUrl().toString()); }}

最后刷新本地缓存的方法和List关系: this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;

DubboProtocol.export()

dubbo协议发布服务会调用DubboProtocol.export(),

NettyServer.open()源码如下:

@Overriderotected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress());}

附dubbo官方给出的暴露服务时序图: https://dubbo.gitbooks.io/dubbo-dev-book/design.html。