首頁技術(shù)文章正文

Java培訓(xùn):dubbo源碼解析-服務(wù)暴露與發(fā)現(xiàn)

更新時(shí)間:2022-09-19 來源:黑馬程序員 瀏覽量:

  概述

  dubbo是一個(gè)簡單易用的RPC框架,通過簡單的提供者,消費(fèi)者配置就能完成無感的網(wǎng)絡(luò)調(diào)用。那么在dubbo中是如何將提供者的服務(wù)暴露出去,消費(fèi)者又是如何獲取到提供者相關(guān)信息的呢?這就是本章我們要討論的內(nèi)容。

  dubbo與spring的整合

  在了解dubbo的服務(wù)注冊和服務(wù)發(fā)現(xiàn)之前,我們首先需要掌握一個(gè)知識(shí)點(diǎn):Spring中自定義Schema。

  Spring自定義Schema

  Dubbo 現(xiàn)在的設(shè)計(jì)是完全無侵入,也就是使用者只依賴于配置契約。在 Dubbo 中,可以使用 XML 配置相關(guān)信息,也可以用來引入服務(wù)或者導(dǎo)出服務(wù)。配置完成,啟動(dòng)工程,Spring 會(huì)讀取配置文件,生成注入相關(guān)Bean。那 Dubbo 如何實(shí)現(xiàn)自定義 XML 被 Spring 加載讀取呢?

  從 Spring 2.0 開始,Spring 開始提供了一種基于 XML Schema 格式擴(kuò)展機(jī)制,用于定義和配置 bean。

  入門案例

  學(xué)習(xí)和使用Spring XML Schema 擴(kuò)展機(jī)制并不難,需要下面幾個(gè)步驟:

  1. 創(chuàng)建配置屬性的JavaBean對象

  2. 創(chuàng)建一個(gè) XML Schema 文件,描述自定義的合法構(gòu)建模塊,也就是xsd文件。

  3. 自定義處理器類,并實(shí)現(xiàn)`NamespaceHandler`接口。

  4. 自定義解析器,實(shí)現(xiàn)`BeanDefinitionParser`接口(最關(guān)鍵的部分)。

  5. 編寫Spring.handlers和Spring.schemas文件配置所有部件

  定義JavaBean對象,在spring中此對象會(huì)根據(jù)配置自動(dòng)創(chuàng)建

public class User {
    private String id;  
    private String name;  
    private Integer age;
    //省略getter setter方法
}

  在META-INF下定義`user.xsd`文件,使用xsd用于描述標(biāo)簽的規(guī)則

<?xml version="1.0" encoding="UTF-8"?>  
<xsd:schema  
    xmlns="http://m.409rqu1.cn/schema/user"
    xmlns:xsd="http://www.w3.org/2001/XMLSchema"  
    xmlns:beans="http://www.springframework.org/schema/beans"  
    targetNamespace="http://m.409rqu1.cn/schema/user"
    elementFormDefault="qualified"  
    attributeFormDefault="unqualified">  
    <xsd:import namespace="http://www.springframework.org/schema/beans" />  
    <xsd:element name="user">
        <xsd:complexType>  
            <xsd:complexContent>  
                <xsd:extension base="beans:identifiedType">  
                    <xsd:attribute name="name" type="xsd:string" />  
                    <xsd:attribute name="age" type="xsd:int" />  
                </xsd:extension>  
            </xsd:complexContent>  
        </xsd:complexType>  
    </xsd:element>  
</xsd:schema>

  Spring讀取xml文件時(shí),會(huì)根據(jù)標(biāo)簽的命名空間找到其對應(yīng)的NamespaceHandler,我們在NamespaceHandler內(nèi)會(huì)注冊標(biāo)簽對應(yīng)的解析器BeanDefinitionParser。

package com.itheima.schema;

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

public class UserNamespaceHandler extends NamespaceHandlerSupport {
    public void init() {
        registerBeanDefinitionParser("user", new UserBeanDefinitionParser());
    }
}

  BeanDefinitionParser是標(biāo)簽對應(yīng)的解析器,Spring讀取到對應(yīng)標(biāo)簽時(shí)會(huì)使用該類進(jìn)行解析;

public class UserBeanDefinitionParser extends
        AbstractSingleBeanDefinitionParser {
   
    protected Class getBeanClass(Element element) {
        return User.class;
    }

    protected void doParse(Element element, BeanDefinitionBuilder bean) {
        String name = element.getAttribute("name");
        String age = element.getAttribute("age");
        String id = element.getAttribute("id");
        if (StringUtils.hasText(id)) {
            bean.addPropertyValue("id", id);
        }
        if (StringUtils.hasText(name)) {
            bean.addPropertyValue("name", name);
        }
        if (StringUtils.hasText(age)) {
            bean.addPropertyValue("age", Integer.valueOf(age));
        }
    }
}

  定義spring.handlers文件,內(nèi)部保存命名空間與NamespaceHandler類的對應(yīng)關(guān)系;必須放在classpath下的META-INF文件夾中。

  ```proprties

  http\://m.409rqu1.cn/schema/user=com.itheima.schema.UserNamespaceHandler

  ```

  定義spring.schemas文件,內(nèi)部保存命名空間對應(yīng)的xsd文件位置;必須放在classpath下的META-INF文件夾中。

http\://m.409rqu1.cn/schema/user.xsd=META-INF/user.xsd

  代碼準(zhǔn)備好了之后,就可以在spring工程中進(jìn)行使用和測試,定義spring配置文件,導(dǎo)入對應(yīng)約束。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:util="http://www.springframework.org/schema/util" 
xmlns:task="http://www.springframework.org/schema/task" 
xmlns:aop="http://www.springframework.org/schema/aop" 
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:itheima="http://m.409rqu1.cn/schema/user"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://m.409rqu1.cn/schema/user http://m.409rqu1.cn/schema/user.xsd">

    <itheima:user id="user" name="zhangsan" age="12"></itheima:user>

</beans>

  編寫測試類,通過spring容器獲取對象user

public class SchemaDemo {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring/applicationContext.xml");
        User user = (User)ctx.getBean("user");
        System.out.println(user);
    }
}

  dubbo中的相關(guān)對象

  Dubbo是運(yùn)行在spring容器中,dubbo的配置文件也是通過spring的配置文件applicationContext.xml來加載,所以dubbo的自定義配置標(biāo)簽實(shí)現(xiàn),其實(shí)同樣依賴spring的xml schema機(jī)制

1663555701881_1.jpg

  可以看出Dubbo所有的組件都是由`DubboBeanDefinitionParser`解析,并通過registerBeanDefinitionParser方法來注冊到spring中最后解析對應(yīng)的對象。這些對象中我們重點(diǎn)關(guān)注的有以下兩個(gè):

  ServiceBean:服務(wù)提供者暴露服務(wù)的核心對象

  ReferenceBean:服務(wù)消費(fèi)者發(fā)現(xiàn)服務(wù)的核心對象

  RegistryConfig:定義注冊中心的核心配置對象

  服務(wù)暴露

  前面主要探討了 Dubbo 中 schema 、 XML 的相關(guān)原理 , 這些內(nèi)容對理解框架整體至關(guān)重要 , 在此基礎(chǔ)上我們繼續(xù)探討服務(wù)是如何依靠前面的配置進(jìn)行服務(wù)暴露。

  名詞解釋

  在 Dubbo 的核心領(lǐng)域模型中:

  Invoker 是實(shí)體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn),也可能一個(gè)集群實(shí)現(xiàn)。在服務(wù)提供方,Invoker用于調(diào)用服務(wù)提供類。在服務(wù)消費(fèi)方,Invoker用于執(zhí)行遠(yuǎn)程調(diào)用。

  - Protocol 是服務(wù)域,它是 Invoker 暴露和引用的主功能入口,它負(fù)責(zé) Invoker 的生命周期管理。

  - export:暴露遠(yuǎn)程服務(wù)

  - refer:引用遠(yuǎn)程服務(wù)

  - proxyFactory:獲取一個(gè)接口的代理類

  - getInvoker:針對server端,將服務(wù)對象,如DemoServiceImpl包裝成一個(gè)Invoker對象

  - getProxy:針對client端,創(chuàng)建接口的代理對象,例如DemoService的接口。

  - Invocation 是會(huì)話域,它持有調(diào)用過程中的變量,比如方法名,參數(shù)等

  整體流程

  在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)暴露原理

1663555743810_2.jpg

  在整體上看,Dubbo 框架做服務(wù)暴露分為兩大部分 , 第一步將持有的服務(wù)實(shí)例通過代理轉(zhuǎn)換成 Invoker, 第二步會(huì)把 Invoker 通過具體的協(xié)議 ( 比如 Dubbo ) 轉(zhuǎn)換成 Exporter, 框架做了這層抽象也大大方便了功能擴(kuò)展 。

  服務(wù)提供方暴露服務(wù)的藍(lán)色初始化鏈,時(shí)序圖如下:

1663555767803_3.jpg

  源碼分析

  (1) 導(dǎo)出入口

  服務(wù)導(dǎo)出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個(gè)事件響應(yīng)方法,該方法會(huì)在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。方法代碼如下:

public void onApplicationEvent(ContextRefreshedEvent event) {
    // 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出
    if (isDelay() && !isExported() && !isUnexported()) {
        // 導(dǎo)出服務(wù)
        export();
    }
}

  onApplicationEvent 方法在經(jīng)過一些判斷后,會(huì)決定是否調(diào)用 export 方法導(dǎo)出服務(wù)。在export 根據(jù)配置執(zhí)行相應(yīng)的動(dòng)作。最終進(jìn)入到doExportUrls導(dǎo)出服務(wù)方法。

private void doExportUrls() {
    // 加載注冊中心鏈接
    List<URL> registryURLs = loadRegistries(true);
    // 遍歷 protocols,并在每個(gè)協(xié)議下導(dǎo)出服務(wù)
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

  關(guān)于多協(xié)議多注冊中心導(dǎo)出服務(wù)首先是根據(jù)配置,以及其他一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個(gè)模塊之間傳遞。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    // 如果協(xié)議名為空,或空串,則將協(xié)議名變量設(shè)置為 dubbo
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    Map<String, String> map = new HashMap<String, String>();
   
    //略

    // 獲取上下文路徑
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }

    // 獲取 host 和 port
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 組裝 URL
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
   
    // 省略無關(guān)代碼
}

  上面的代碼首先是將一些信息,比如版本、時(shí)間戳、方法名以及各種配置對象的字段信息放入到 map 中,最后將 map 和主機(jī)名等數(shù)據(jù)傳給 URL 構(gòu)造方法創(chuàng)建 URL 對象。前置工作做完,接下來就可以進(jìn)行服務(wù)導(dǎo)出了。服務(wù)導(dǎo)出分為導(dǎo)出到本地 (JVM),和導(dǎo)出到遠(yuǎn)程。在深入分析服務(wù)導(dǎo)出的源碼前,我們先來從宏觀層面上看一下服務(wù)導(dǎo)出邏輯。如下:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
   
    // 省略無關(guān)代碼
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 如果 scope = none,則什么都不做
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // scope != remote,導(dǎo)出到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // scope != local,導(dǎo)出到遠(yuǎn)程
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    //省略無關(guān)代碼
                   
                    // 為服務(wù)提供類(ref)生成 Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 導(dǎo)出服務(wù),并生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
               
            // 不存在注冊中心,僅導(dǎo)出服務(wù)
            } else {
                //略
            }
        }
    }
    this.urls.add(url);
}

  上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式,分別如下:

  - scope = none,不導(dǎo)出服務(wù)

  - scope != remote,導(dǎo)出到本地

  - scope != local,導(dǎo)出到遠(yuǎn)程

  不管是導(dǎo)出到本地,還是遠(yuǎn)程。進(jìn)行服務(wù)導(dǎo)出之前,均需要先創(chuàng)建 Invoker,這是一個(gè)很重要的步驟。因此下面先來分析 Invoker 的創(chuàng)建過程。Invoker 是由 ProxyFactory 創(chuàng)建而來,Dubbo 默認(rèn)的 ProxyFactory 實(shí)現(xiàn)類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創(chuàng)建過程。如下:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // 為目標(biāo)類創(chuàng)建 Wrapper
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 創(chuàng)建匿名 Invoker 類對象,并實(shí)現(xiàn) doInvoke 方法。
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 調(diào)用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會(huì)調(diào)用目標(biāo)方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

  如上,JavassistProxyFactory 創(chuàng)建了一個(gè)繼承自 AbstractProxyInvoker 類的匿名對象,并覆寫了抽象方法 doInvoke。

  (2) 導(dǎo)出服務(wù)到本地

  Invoke創(chuàng)建成功之后,接下來我們來看本地導(dǎo)出

private void exportLocal(URL url) {
    // 如果 URL 的協(xié)議頭等于 injvm,說明已經(jīng)導(dǎo)出到本地了,無需再次導(dǎo)出
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
            .setProtocol(Constants.LOCAL_PROTOCOL)    // 設(shè)置協(xié)議頭為 injvm
            .setHost(LOCALHOST)
            .setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // 創(chuàng)建 Invoker,并導(dǎo)出服務(wù),這里的 protocol 會(huì)在運(yùn)行時(shí)調(diào)用 InjvmProtocol 的 export 方法
        Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
    }
}

  exportLocal 方法比較簡單,首先根據(jù) URL 協(xié)議頭決定是否導(dǎo)出服務(wù)。若需導(dǎo)出,則創(chuàng)建一個(gè)新的 URL 并將協(xié)議頭、主機(jī)名以及端口設(shè)置成新的值。然后創(chuàng)建 Invoker,并調(diào)用 InjvmProtocol 的 export 方法導(dǎo)出服務(wù)。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 創(chuàng)建 InjvmExporter
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

  如上,InjvmProtocol 的 export 方法僅創(chuàng)建了一個(gè) InjvmExporter,無其他邏輯。到此導(dǎo)出服務(wù)到本地就分析完了。

  (3) 導(dǎo)出服務(wù)到遠(yuǎn)程

  接下來,我們繼續(xù)分析導(dǎo)出服務(wù)到遠(yuǎn)程的過程。導(dǎo)出服務(wù)到遠(yuǎn)程包含了服務(wù)導(dǎo)出與服務(wù)注冊兩個(gè)過程。先來分析服務(wù)導(dǎo)出邏輯。我們把目光移動(dòng)到 RegistryProtocol 的 export 方法上。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 導(dǎo)出服務(wù)
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // 獲取注冊中心 URL
    URL registryUrl = getRegistryUrl(originInvoker);

    // 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
   
    // 獲取已注冊的服務(wù)提供者 URL,比如:
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // 獲取 register 參數(shù)
    boolean register = registeredProviderUrl.getParameter("register", true);

    // 向服務(wù)提供者與消費(fèi)者注冊表中注冊服務(wù)提供者
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // 根據(jù) register 的值決定是否注冊服務(wù)
    if (register) {
        // 向注冊中心注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // 獲取訂閱 URL,比如:
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    // 創(chuàng)建監(jiān)聽器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 創(chuàng)建并返回 DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

  上面代碼看起來比較復(fù)雜,主要做如下一些操作:

  1. 調(diào)用 doLocalExport 導(dǎo)出服務(wù)

  2. 向注冊中心注冊服務(wù)

  3. 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)

  4. 創(chuàng)建并返回 DestroyableExporter

  下面先來分析 doLocalExport 方法的邏輯,如下:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    // 訪問緩存
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                // 創(chuàng)建 Invoker 為委托類對象
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
           
                // 寫緩存
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

  接下來,我們把重點(diǎn)放在 Protocol 的 export 方法上。假設(shè)運(yùn)行時(shí)協(xié)議為 dubbo,此處的 protocol 變量會(huì)在運(yùn)行時(shí)加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // 獲取服務(wù)標(biāo)識(shí),理解成服務(wù)坐標(biāo)也行。由服務(wù)組名,服務(wù)名,服務(wù)版本號(hào)以及端口組成。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // 創(chuàng)建 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 將 <key, exporter> 鍵值對放入緩存中
    exporterMap.put(key, exporter);

    //省略無關(guān)代碼

    // 啟動(dòng)服務(wù)器
    openServer(url);
    // 優(yōu)化序列化
    optimizeSerialization(url);
    return exporter;
}

  (4) 開啟Netty服務(wù)

  如上,我們重點(diǎn)關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,其他邏輯看不懂也沒關(guān)系,不影響理解服務(wù)導(dǎo)出過程。下面分析 openServer 方法。

private void openServer(URL url) {
    // 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key,用于標(biāo)識(shí)當(dāng)前的服務(wù)器實(shí)例
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        // 訪問緩存
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 創(chuàng)建服務(wù)器實(shí)例
            serverMap.put(key, createServer(url));
        } else {
            // 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
            server.reset(url);
        }
    }
}

  接下來分析服務(wù)器實(shí)例的創(chuàng)建過程。如下:

private ExchangeServer createServer(URL url) {
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
    // 添加心跳檢測配置到 url 中
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 獲取 server 參數(shù),默認(rèn)為 netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    // 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    // 添加編碼解碼器參數(shù)
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 創(chuàng)建 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server...");
    }
                                   
    // 獲取 client 參數(shù),可指定 netty,mina
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 獲取所有的 Transporter 實(shí)現(xiàn)類名稱集合,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 檢測當(dāng)前 Dubbo 所支持的 Transporter 實(shí)現(xiàn)類名稱列表中,
        // 是否包含 client 所表示的 Transporter,若不包含,則拋出異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type...");
        }
    }
    return server;
}

  如上,createServer 包含三個(gè)核心的邏輯。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實(shí)例。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應(yīng)的代碼比較直白了,無需多說。但創(chuàng)建服務(wù)器的操作目前還不是很清晰,我們繼續(xù)往下看。

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger,默認(rèn)為 HeaderExchanger。
    // 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
    return getExchanger(url).bind(url, handler);
}

  上面代碼比較簡單,就不多說了。下面看一下 HeaderExchanger 的 bind 方法。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // 創(chuàng)建 HeaderExchangeServer 實(shí)例,該方法包含了多個(gè)邏輯,分別如下:
    //   1. new HeaderExchangeHandler(handler)
    //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
    //   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

  HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素?cái)?shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法
    return getTransporter().bind(url, handler);
}

  如上,getTransporter() 方法獲取的 Transporter 是在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建的,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類。TransporterAdaptive 會(huì)在運(yùn)行時(shí)根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認(rèn)為 NettyTransporter。調(diào)用`NettyTransporter.bind(URL, ChannelHandler)`方法。創(chuàng)建一個(gè)`NettyServer`實(shí)例。調(diào)用`NettyServer.doOPen()`方法,服務(wù)器被開啟,服務(wù)也被暴露出來了。

  (5) 服務(wù)注冊

  本節(jié)內(nèi)容以 Zookeeper 注冊中心作為分析目標(biāo),其他類型注冊中心大家可自行分析。下面從服務(wù)注冊的入口方法開始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
   
    // ${導(dǎo)出服務(wù)}
   
    // 省略其他代碼
   
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
   
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 訂閱 override 數(shù)據(jù)
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    // 省略部分代碼
}

  RegistryProtocol 的 export 方法包含了服務(wù)導(dǎo)出,注冊,以及數(shù)據(jù)訂閱等邏輯。其中服務(wù)導(dǎo)出邏輯上一節(jié)已經(jīng)分析過了,本節(jié)將分析服務(wù)注冊邏輯,相關(guān)代碼如下:

public void register(URL registryUrl, URL registedProviderUrl) {
    // 獲取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注冊服務(wù)
    registry.register(registedProviderUrl);
}

  register 方法包含兩步操作,第一步是獲取注冊中心實(shí)例,第二步是向注冊中心注冊服務(wù)。接下來分兩節(jié)內(nèi)容對這兩步操作進(jìn)行分析。

  這里以 Zookeeper 注冊中心為例進(jìn)行分析。下面先來看一下 getRegistry 方法的源碼,這個(gè)方法由 AbstractRegistryFactory 實(shí)現(xiàn)。如下:

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    LOCK.lock();
    try {
        // 訪問緩存
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
       
        // 緩存未命中,創(chuàng)建 Registry 實(shí)例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry...");
        }
       
        // 寫入緩存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        LOCK.unlock();
    }
}

protected abstract Registry createRegistry(URL url);

  如上,getRegistry 方法先訪問緩存,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry。在此方法中就是通過`new ZookeeperRegistry(url, zookeeperTransporter)`實(shí)例化一個(gè)注冊中心

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
   
    // 獲取組名,默認(rèn)為 dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        // group = "/" + group
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加狀態(tài)監(jiān)聽器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

  在上面的代碼代碼中,我們重點(diǎn)關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用,這個(gè)方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊中心的創(chuàng)建過程就結(jié)束了。接下來,再來分析一下 Zookeeper 客戶端的創(chuàng)建過程。

public ZookeeperClient connect(URL url) {
    // 創(chuàng)建 CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}

  繼續(xù)向下看。

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {

    private final CuratorFramework client;
   
    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            // 創(chuàng)建 CuratorFramework 構(gòu)造器
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(5000);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            // 構(gòu)建 CuratorFramework 實(shí)例
            client = builder.build();
            //省略無關(guān)代碼
           
            // 啟動(dòng)客戶端
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

  CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動(dòng) CuratorFramework 實(shí)例。至此Zookeeper客戶端就已經(jīng)啟動(dòng)了

  下面我們將 Dubbo 的 demo 跑起來,然后通過 Zookeeper 可視化客戶端 [ZooInspector](https://github.com/apache/zookeeper/tree/b79af153d0f98a4f3f3516910ed47234d7b3d74e/src/contrib/zooinspector) 查看節(jié)點(diǎn)數(shù)據(jù)。如下:

  ![img](http://dubbo.apache.org/docs/zh-cn/source_code_guide/sources/images/service-registry.png)

  從上圖中可以看到DemoService 這個(gè)服務(wù)對應(yīng)的配置信息最終被注冊到了zookeeper節(jié)點(diǎn)下。搞懂了服務(wù)注冊的本質(zhì),那么接下來我們就可以去閱讀服務(wù)注冊的代碼了。

protected void doRegister(URL url) {
    try {
        // 通過 Zookeeper 客戶端創(chuàng)建節(jié)點(diǎn),節(jié)點(diǎn)路徑由 toUrlPath 方法生成,路徑格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register...");
    }
}

  如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點(diǎn)。節(jié)點(diǎn)路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:

public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        // 如果要?jiǎng)?chuàng)建的節(jié)點(diǎn)類型非臨時(shí)節(jié)點(diǎn),那么這里要檢測節(jié)點(diǎn)是否存在
        if (checkExists(path)) {
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 遞歸創(chuàng)建上一級路徑
        create(path.substring(0, i), false);
    }
   
    // 根據(jù) ephemeral 的值創(chuàng)建臨時(shí)或持久節(jié)點(diǎn)
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
    }
}

  好了,到此關(guān)于服務(wù)注冊的過程就分析完了。整個(gè)過程可簡單總結(jié)為:先創(chuàng)建注冊中心實(shí)例,之后再通過注冊中心實(shí)例注冊服務(wù)。

  總結(jié)

  1. 在有注冊中心,需要注冊提供者地址的情況下,ServiceConfig 解析出的 URL 格式為:`registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服務(wù)名}/{版本號(hào)}")`

  2. 基于 Dubbo SPI 的自適應(yīng)機(jī)制,通過 URL `registry://` 協(xié)議頭識(shí)別,就調(diào)用 RegistryProtocol#export() 方法

  1. 將具體的服務(wù)類名,比如 `DubboServiceRegistryImpl`,通過 ProxyFactory 包裝成 Invoker 實(shí)例

  2. 調(diào)用 doLocalExport 方法,使用 DubboProtocol 將 Invoker 轉(zhuǎn)化為 Exporter 實(shí)例,并打開 Netty 服務(wù)端監(jiān)聽客戶請求

  3. 創(chuàng)建 Registry 實(shí)例,連接 Zookeeper,并在服務(wù)節(jié)點(diǎn)下寫入提供者的 URL 地址,注冊服務(wù)

  4. 向注冊中心訂閱 override 數(shù)據(jù),并返回一個(gè) Exporter 實(shí)例

  3. 根據(jù) URL 格式中的 `"dubbo://service-host/{服務(wù)名}/{版本號(hào)}"`中協(xié)議頭 `dubbo://` 識(shí)別,調(diào)用 DubboProtocol#export() 方法,開發(fā)服務(wù)端口

  4. RegistryProtocol#export() 返回的 Exporter 實(shí)例存放到 ServiceConfig 的 `List exporters` 中

  服務(wù)發(fā)現(xiàn)

  在學(xué)習(xí)了服務(wù)暴露原理之后 , 接下來重點(diǎn)探討服務(wù)是如何消費(fèi)的 。 這里主要講解如何通過注冊中心進(jìn)行服務(wù)發(fā)現(xiàn)進(jìn)行遠(yuǎn)程服務(wù)調(diào)用等細(xì)節(jié) 。

  服務(wù)發(fā)現(xiàn)流程

  在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)消費(fèi)原理

1663556438235_4.jpg

  在整體上看 , Dubbo 框架做服務(wù)消費(fèi)也分為兩大部分 , 第一步通過持有遠(yuǎn)程服務(wù)實(shí)例生成Invoker, 這個(gè) Invoker 在客戶端是核心的遠(yuǎn)程代理對象 。 第二步會(huì)把 Invoker 通過動(dòng)態(tài)代理轉(zhuǎn)換成實(shí)現(xiàn)用戶接口的動(dòng)態(tài)代理引用 。

  服務(wù)消費(fèi)方引用服務(wù)的藍(lán)色初始化鏈,時(shí)序圖如下:

1663556451271_5.jpg

  源碼分析

  (1) 引用入口

  服務(wù)引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實(shí)現(xiàn)了這個(gè)方法。

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    // 檢測 ref 是否為空,為空則通過 init 方法創(chuàng)建
    if (ref == null) {
        // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
        init();
    }
    return ref;
}

  Dubbo 提供了豐富的配置,用于調(diào)整和優(yōu)化框架行為,性能等。Dubbo 在引用或?qū)С龇?wù)時(shí),首先會(huì)對這些配置進(jìn)行檢查和處理,以保證配置的正確性。

private void init() {
    // 創(chuàng)建代理類
    ref = createProxy(map);

  此方法代碼很長,主要完成的配置加載,檢查,以及創(chuàng)建引用的代理對象。這里要從 createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創(chuàng)建代理對象的。但實(shí)際上并非如此,該方法還會(huì)調(diào)用其他方法構(gòu)建以及合并 Invoker 實(shí)例。具體細(xì)節(jié)如下。

private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    ...........
    isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl)
    // 本地引用略
    if (isJvmRefer) {

    } else {
        // 點(diǎn)對點(diǎn)調(diào)用略
        if (url != null && url.length() > 0) {

           
        } else {
            // 加載注冊中心 url
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
        }

        // 單個(gè)注冊中心或服務(wù)提供者(服務(wù)直連,下同)
        if (urls.size() == 1) {
            // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        // 多個(gè)注冊中心或多個(gè)服務(wù)提供者,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會(huì)在運(yùn)行時(shí)
                // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 如果注冊中心鏈接不為空,則將使用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對多個(gè) Invoker 進(jìn)行合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else {
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    //省略無關(guān)代碼

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

  上面代碼很多,不過邏輯比較清晰。

  1、如果是本地調(diào)用,直接jvm 協(xié)議從內(nèi)存中獲取實(shí)例

  2、如果只有一個(gè)注冊中心,直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口

  3、如果有多個(gè)注冊中心,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個(gè) Invoker,最后調(diào)用 ProxyFactory 生成代理類。

  (2) 創(chuàng)建客戶端

  在服務(wù)消費(fèi)方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 是由 Protocol 實(shí)現(xiàn)類構(gòu)建而來。Protocol 實(shí)現(xiàn)類有很多,這里分析DubboProtocol。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 創(chuàng)建 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

  上面方法看起來比較簡單,創(chuàng)建一個(gè)DubboInvoker。通過構(gòu)造方法傳入遠(yuǎn)程調(diào)用的client對象。默認(rèn)情況下,Dubbo 使用 NettyClient 進(jìn)行通信。接下來,我們簡單看一下 getClients 方法的邏輯。

private ExchangeClient[] getClients(URL url) {
    // 是否共享連接
    boolean service_share_connect = false;
    // 獲取連接數(shù),默認(rèn)為0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections,則共享連接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

  這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例,getSharedClient 方法中也會(huì)調(diào)用 initClient 方法,因此下面我們一起看一下這個(gè)方法。

private ExchangeClient initClient(URL url) {

    // 獲取客戶端類型,默認(rèn)為 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
   
    //省略無關(guān)代碼
    ExchangeClient client;
    try {
        // 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 創(chuàng)建懶加載 ExchangeClient 實(shí)例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 創(chuàng)建普通 ExchangeClient 實(shí)例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}

  initClient 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。下面我們分析一下 Exchangers 的 connect 方法。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

  如上,getExchanger 會(huì)通過 SPI 加載 HeaderExchangeClient 實(shí)例,這個(gè)方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實(shí)現(xiàn)。

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這里包含了多個(gè)調(diào)用,分別如下:
    // 1. 創(chuàng)建 HeaderExchangeHandler 對象
    // 2. 創(chuàng)建 DecodeHandler 對象
    // 3. 通過 Transporters 構(gòu)建 Client 實(shí)例
    // 4. 創(chuàng)建 HeaderExchangeClient 對象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

  這里的調(diào)用比較多,我們這里重點(diǎn)看一下 Transporters 的 connect 方法。如下:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數(shù)量大于1,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
   
    // 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實(shí)例
    return getTransporter().connect(url, handler);
}

  如上,getTransporter 方法返回的是自適應(yīng)拓展類,該類會(huì)在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect 方法。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyClient 對象
    return new NettyClient(url, listener);
}

  (3) 注冊

  這里就已經(jīng)創(chuàng)建好了NettyClient對象。關(guān)于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 獲取注冊中心實(shí)例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 將 url 查詢字符串轉(zhuǎn)為 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // 獲取 group 配置
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // 通過 SPI 加載 MergeableCluster 實(shí)例,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
   
    // 調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
    return doRefer(cluster, registry, type, url);
}

  上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊中心實(shí)例。然后獲取 group 配置,根據(jù) group 配置決定 doRefer 第一個(gè)參數(shù)的類型。這里的重點(diǎn)是 doRefer 方法,如下:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 創(chuàng)建 RegistryDirectory 實(shí)例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設(shè)置注冊中心和協(xié)議
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服務(wù)消費(fèi)者鏈接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

    // 注冊服務(wù)消費(fèi)者,在 consumers 目錄下新節(jié)點(diǎn)
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }

    // 訂閱 providers、configurators、routers 等節(jié)點(diǎn)數(shù)據(jù)
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一個(gè)注冊中心可能有多個(gè)服務(wù)提供者,因此這里需要將多個(gè)服務(wù)提供者合并為一個(gè)
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

  如上,doRefer 方法創(chuàng)建一個(gè) RegistryDirectory 實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊中心進(jìn)行注冊。注冊完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker。

  (4)創(chuàng)建代理對象

  Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務(wù)接口生成代理對象。有了代理對象,即可進(jìn)行遠(yuǎn)程調(diào)用。代理對象生成的入口方法為 ProxyFactory 的 getProxy,接下來進(jìn)行分析。

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    // 調(diào)用重載方法
    return getProxy(invoker, false);
}

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Class<?>[] interfaces = null;
    // 獲取接口列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        // 切分接口列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            // 設(shè)置服務(wù)接口類和 EchoService.class 到 interfaces 中
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                // 加載接口類
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }

    // 為 http 和 hessian 協(xié)議提供泛化調(diào)用支持,參考 pull request #1827
    if (!invoker.getInterface().equals(GenericService.class) && generic) {
        int len = interfaces.length;
        Class<?>[] temp = interfaces;
        // 創(chuàng)建新的 interfaces 數(shù)組
        interfaces = new Class<?>[len + 1];
        System.arraycopy(temp, 0, interfaces, 0, len);
        // 設(shè)置 GenericService.class 到數(shù)組中
        interfaces[len] = GenericService.class;
    }

    // 調(diào)用重載方法
    return getProxy(invoker, interfaces);
}

public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

  如上,上面大段代碼都是用來獲取 interfaces 數(shù)組的,我們繼續(xù)往下看。getProxy(Invoker, Class[]) 這個(gè)方法是一個(gè)抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實(shí)現(xiàn)代碼。

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    // 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

  上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實(shí)例。InvokerInvocationHandler 實(shí)現(xiàn) JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調(diào)用。下面以 org.apache.dubbo.demo.DemoService 這個(gè)接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

  好了,到這里代理類生成邏輯就分析完了。整個(gè)過程比較復(fù)雜,大家需要耐心看一下。

  總結(jié)

  1. 從注冊中心發(fā)現(xiàn)引用服務(wù):在有注冊中心,通過注冊中心發(fā)現(xiàn)提供者地址的情況下,ReferenceConfig 解析出的 URL 格式為:`registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")`。

  2. 通過 URL 的registry://協(xié)議頭識(shí)別,就會(huì)調(diào)用RegistryProtocol#refer()方法

  (1). 查詢提供者 URL,如 `dubbo://service-host/com.foo.FooService?version=1.0.0` ,來獲取注冊中心

  (2). 創(chuàng)建一個(gè) RegistryDirectory 實(shí)例并設(shè)置注冊中心和協(xié)議

  (3). 生成 conusmer 連接,在 consumer 目錄下創(chuàng)建節(jié)點(diǎn),向注冊中心注冊

  (4). 注冊完畢后,訂閱 providers,configurators,routers 等節(jié)點(diǎn)的數(shù)據(jù)

  (5). 通過 URL 的 `dubbo://` 協(xié)議頭識(shí)別,調(diào)用 `DubboProtocol#refer()` 方法,創(chuàng)建一個(gè) ExchangeClient 客戶端并返回 DubboInvoker 實(shí)例

  3. 由于一個(gè)服務(wù)可能會(huì)部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這樣也就會(huì)得到多個(gè) DubboInvoker 實(shí)例,就需要 RegistryProtocol 調(diào)用 Cluster 將多個(gè)服務(wù)提供者節(jié)點(diǎn)偽裝成一個(gè)節(jié)點(diǎn),并返回一個(gè) Invoker

  4. Invoker 創(chuàng)建完畢后,調(diào)用 ProxyFactory 為服務(wù)接口生成代理對象,返回提供者引用

分享到:
在線咨詢 我要報(bào)名
和我們在線交談!