更新時間:2022-09-16 來源:黑馬程序員 瀏覽量:
在之前的內(nèi)容中,我們講解了消費者端服務(wù)發(fā)現(xiàn)與提供者端服務(wù)暴露的相關(guān)內(nèi)容,同時也知道消費者端通過內(nèi)置的負(fù)載均衡算法獲取合適的調(diào)用invoker進行遠(yuǎn)程調(diào)用。那么,本章節(jié)重點關(guān)注的就是遠(yuǎn)程調(diào)用過程即網(wǎng)絡(luò)通信。
網(wǎng)絡(luò)通信位于Remoting模塊:
- Remoting 實現(xiàn)是 Dubbo 協(xié)議的實現(xiàn),如果你選擇 RMI 協(xié)議,整個 Remoting 都不會用上;
- Remoting 內(nèi)部再劃為 `Transport 傳輸層` 和 `Exchange 信息交換層`;
- Transport 層只負(fù)責(zé)單向消息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也可以擴展 UDP 傳輸;
- Exchange 層是在傳輸層之上封裝了 Request-Response 語義;
網(wǎng)絡(luò)通信的問題:
客戶端與服務(wù)端連通性問題
粘包拆包問題
異步多線程數(shù)據(jù)一致問題
通信協(xié)議
dubbo內(nèi)置,dubbo協(xié)議 ,rmi協(xié)議,hessian協(xié)議,http協(xié)議,webservice協(xié)議,thrift協(xié)議,rest協(xié)議,grpc協(xié)議,memcached協(xié)議,redis協(xié)議等10種通訊協(xié)議。各個協(xié)議特點如下
dubbo協(xié)議
Dubbo 缺省協(xié)議采用單一長連接和 NIO 異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費者機器數(shù)遠(yuǎn)大于服務(wù)提供者機器數(shù)的情況。
缺省協(xié)議,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 tbremoting 交互。
- 連接個數(shù):單連接
- 連接方式:長連接
- 傳輸協(xié)議:TCP
- 傳輸方式:NIO 異步傳輸
- 序列化:Hessian 二進制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較小(建議小于100K),消費者比提供者個數(shù)多,單一消費者無法壓滿提供者,盡量不要用 dubbo 協(xié)議傳輸大文件或超大字符串。
- 適用場景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用
rmi協(xié)議
RMI 協(xié)議采用 JDK 標(biāo)準(zhǔn)的 `java.rmi.*` 實現(xiàn),采用阻塞式短連接和 JDK 標(biāo)準(zhǔn)序列化方式。
- 連接個數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:TCP
- 傳輸方式:同步傳輸
- 序列化:Java 標(biāo)準(zhǔn)二進制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,消費者與提供者個數(shù)差不多,可傳文件。
- 適用場景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用,與原生RMI服務(wù)互操作
hessian協(xié)議
Hessian 協(xié)議用于集成 Hessian 的服務(wù),Hessian 底層采用 Http 通訊,采用 Servlet 暴露服務(wù),Dubbo 缺省內(nèi)嵌 Jetty 作為服務(wù)器實現(xiàn)。
Dubbo 的 Hessian 協(xié)議可以和原生 Hessian 服務(wù)互操作,即:
- 提供者用 Dubbo 的 Hessian 協(xié)議暴露服務(wù),消費者直接用標(biāo)準(zhǔn) Hessian 接口調(diào)用
- 或者提供方用標(biāo)準(zhǔn) Hessian 暴露服務(wù),消費方用 Dubbo 的 Hessian 協(xié)議調(diào)用。
- 連接個數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:Hessian二進制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較大,提供者比消費者個數(shù)多,提供者壓力較大,可傳文件。
- 適用場景:頁面?zhèn)鬏敚募鬏?,或與原生hessian服務(wù)互操作
http協(xié)議
基于 HTTP 表單的遠(yuǎn)程調(diào)用協(xié)議,采用 Spring 的 HttpInvoker 實現(xiàn)
- 連接個數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:表單序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,提供者比消費者個數(shù)多,可用瀏覽器查看,可用表單或URL傳入?yún)?shù),暫不支持傳文件。
- 適用場景:需同時給應(yīng)用程序和瀏覽器 JS 使用的服務(wù)。
webservice協(xié)議
基于 WebService 的遠(yuǎn)程調(diào)用協(xié)議,基于 Apache CXF 實現(xiàn)](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。
可以和原生 WebService 服務(wù)互操作,即:
- 提供者用 Dubbo 的 WebService 協(xié)議暴露服務(wù),消費者直接用標(biāo)準(zhǔn) WebService 接口調(diào)用,
- 或者提供方用標(biāo)準(zhǔn) WebService 暴露服務(wù),消費方用 Dubbo 的 WebService 協(xié)議調(diào)用。
- 連接個數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:SOAP 文本序列化(http + xml)
- 適用場景:系統(tǒng)集成,跨語言調(diào)用
thrift協(xié)議
當(dāng)前 dubbo 支持 [[1\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn1)的 thrift 協(xié)議是對 thrift 原生協(xié)議 [[2\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn2) 的擴展,在原生協(xié)議的基礎(chǔ)上添加了一些額外的頭信息,比如 service name,magic number 等。
rest協(xié)議
基于標(biāo)準(zhǔn)的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現(xiàn)的REST調(diào)用支持
grpc協(xié)議
Dubbo 自 2.7.5 版本開始支持 gRPC 協(xié)議,對于計劃使用 HTTP/2 通信,或者想利用 gRPC 帶來的 Stream、反壓、Reactive 編程等能力的開發(fā)者來說, 都可以考慮啟用 gRPC 協(xié)議。
- 為期望使用 gRPC 協(xié)議的用戶帶來服務(wù)治理能力,方便接入 Dubbo 體系
- 用戶可以使用 Dubbo 風(fēng)格的,基于接口的編程風(fēng)格來定義和使用遠(yuǎn)程服務(wù)
memcached協(xié)議
基于 memcached實現(xiàn)的 RPC 協(xié)議
redis協(xié)議
基于 Redis 實現(xiàn)的 RPC 協(xié)議
序列化
序列化就是將對象轉(zhuǎn)成字節(jié)流,用于網(wǎng)絡(luò)傳輸,以及將字節(jié)流轉(zhuǎn)為對象,用于在收到字節(jié)流數(shù)據(jù)后還原成對象。序列化的優(yōu)勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網(wǎng)絡(luò)通訊,在`NettyClient.doOpen()`方法中可以看到Netty的相關(guān)類
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } });
然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH];
是的,就是Serialization接口,默認(rèn)是Hessian2Serialization序列化接口。
Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認(rèn)hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。
- dubbo序列化:阿里尚未開發(fā)成熟的高效java序列化實現(xiàn),阿里不建議在生產(chǎn)環(huán)境使用它。
- **hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認(rèn)啟用的序列化方式。**
- json序列化:目前有兩種實現(xiàn),一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現(xiàn)的簡單json庫,但其實現(xiàn)都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。
- java序列化:主要是采用JDK自帶的Java序列化實現(xiàn),性能很不理想。
最近幾年,各種新的高效序列化方式層出不窮,不斷刷新序列化性能的上限,最典型的包括:
- 專門針對Java語言的:Kryo,F(xiàn)ST等等
- 跨語言的:Protostuff,ProtoBuf,Thrift,Avro,MsgPack等等
這些序列化方式的性能多數(shù)都顯著優(yōu)于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我們可以為 dubbo 引入 Kryo 和 FST 這兩種高效 Java 來優(yōu)化 dubbo 的序列化。
使用Kryo和FST非常簡單,只需要在dubbo RPC的XML配置中添加一個屬性即可:
<dubbo:protocol name="dubbo" serialization="kryo"/>
網(wǎng)絡(luò)通信
dubbo中數(shù)據(jù)格式
解決socket中數(shù)據(jù)粘包拆包問題,一般有三種方式
* 定長協(xié)議(數(shù)據(jù)包長度一致)
* 定長的協(xié)議是指協(xié)議內(nèi)容的長度是固定的,比如協(xié)議byte長度是50,當(dāng)從網(wǎng)絡(luò)上讀取50個byte后,就進行decode解碼操作。定長協(xié)議在讀取或者寫入時,效率比較高,因為數(shù)據(jù)緩存的大小基本都確定了,就好比數(shù)組一樣,缺陷就是適應(yīng)性不足,以RPC場景為例,很難估計出定長的長度是多少。
* 特殊結(jié)束符(數(shù)據(jù)尾:通過特殊的字符標(biāo)識#)
* 相比定長協(xié)議,如果能夠定義一個特殊字符作為每個協(xié)議單元結(jié)束的標(biāo)示,就能夠以變長的方式進行通信,從而在數(shù)據(jù)傳輸和高效之間取得平衡,比如用特殊字符`\n`。特殊結(jié)束符方式的問題是過于簡單的思考了協(xié)議傳輸?shù)倪^程,對于一個協(xié)議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸?shù)臄?shù)據(jù)不能同結(jié)束符相同,否則就會出現(xiàn)紊亂。
* 變長協(xié)議(協(xié)議頭+payload模式)
* 這種一般是自定義協(xié)議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內(nèi)容長度。
* dubbo就是使用這種形式的數(shù)據(jù)傳輸格式
Dubbo 框架定義了私有的RPC協(xié)議,其中請求和響應(yīng)協(xié)議的具體內(nèi)容我們使用表格來展示。
Dubbo 數(shù)據(jù)包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(shù)(Magic),數(shù)據(jù)包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調(diào)用消息,比如方法名稱,參數(shù)列表等。下面簡單列舉一下消息頭的內(nèi)容。
| 偏移量(Bit) | 字段 | 取值 |
| ----------- | ------------ | ------------------------------------------------------------ |
| 0 ~ 7 | 魔數(shù)高位 | 0xda00 |
| 8 ~ 15 | 魔數(shù)低位 | 0xbb |
| 16 | 數(shù)據(jù)包類型 | 0 - Response, 1 - Request |
| 17 | 調(diào)用方式 | 僅在第16位被設(shè)為1的情況下有效,0 - 單向調(diào)用,1 - 雙向調(diào)用 |
| 18 | 事件標(biāo)識 | 0 - 當(dāng)前數(shù)據(jù)包是請求或響應(yīng)包,1 - 當(dāng)前數(shù)據(jù)包是心跳包 |
| 19 ~ 23 | 序列化器編號 | 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization |
| 24 ~ 31 | 狀態(tài) | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... |
| 32 ~ 95 | 請求編號 | 共8字節(jié),運行時生成 |
| 96 ~ 127 | 消息體長度 | 運行時計算
消費方發(fā)送請求
(1)發(fā)送請求
為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調(diào)用路徑貼出來。
proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 調(diào)用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
dubbo消費方,自動生成代碼對象如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { private InvocationHandler handler; public String sayHello(String string) { // 將參數(shù)存儲到 Object 數(shù)組中 Object[] arrobject = new Object[]{string}; // 調(diào)用 InvocationHandler 實現(xiàn)類的 invoke 方法得到調(diào)用結(jié)果 Object object = this.handler.invoke(this, methods[0], arrobject); // 返回調(diào)用結(jié)果 return (String)object; } }
InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內(nèi)部封裝了服務(wù)降級邏輯。下面簡單看一下:
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 獲取 mock 配置值 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // 無 mock 邏輯,直接調(diào)用其他 Invoker 對象的 invoke 方法, // 比如 FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // force:xxx 直接執(zhí)行 mock 邏輯,不發(fā)起遠(yuǎn)程調(diào)用 result = doMockInvoke(invocation, null); } else { // fail:xxx 表示消費方對調(diào)用服務(wù)失敗后,再執(zhí)行 mock 邏輯,不拋出異常 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { // 調(diào)用失敗,執(zhí)行 mock 邏輯 result = doMockInvoke(invocation, e); } } return result; }
考慮到前文已經(jīng)詳細(xì)分析過 FailoverClusterInvoker,因此本節(jié)略過 FailoverClusterInvoker,直接分析 DubboInvoker。
public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service ..."); } RpcInvocation invocation = (RpcInvocation) inv; // 設(shè)置 Invoker invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { // 設(shè)置 attachment invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { // 添加 contextAttachments 到 RpcInvocation#attachment 變量中 invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // 設(shè)置異步信息到 RpcInvocation#attachment 中 invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 抽象方法,由子類實現(xiàn) return doInvoke(invocation); } catch (InvocationTargetException e) { // ... } catch (RpcException e) { // ... } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; // 省略其他方法 }
上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用。doInvoke 是一個抽象方法,需要由子類實現(xiàn),下面到 DubboInvoker 中看一下。
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); //將目標(biāo)方法以及版本好作為參數(shù)放入到Invocation中 inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); //獲得客戶端連接 ExchangeClient currentClient; //初始化invoker的時候,構(gòu)建的一個遠(yuǎn)程通信連接 if (clients.length == 1) { //默認(rèn) currentClient = clients[0]; } else { //通過取模獲得其中一個連接 currentClient = clients[index.getAndIncrement() % clients.length]; } try { //表示當(dāng)前的方法是否存在返回值 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); //isOneway 為 true,表示“單向”通信 if (isOneway) {//異步無返回值 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { //存在返回值 //是否采用異步 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t != null) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); } }); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } //省略無關(guān)代碼 }
最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發(fā)送出去
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!"); } // 創(chuàng)建請求對象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { //NettyClient channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
(2)請求編碼
在netty啟動時,我們設(shè)置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:
public class ExchangeCodec extends TelnetCodec { // 消息頭長度 protected static final int HEADER_LENGTH = 16; // 魔數(shù)內(nèi)容 protected static final short MAGIC = (short) 0xdabb; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; protected static final byte FLAG_REQUEST = (byte) 0x80; protected static final byte FLAG_TWOWAY = (byte) 0x40; protected static final byte FLAG_EVENT = (byte) 0x20; protected static final int SERIALIZATION_MASK = 0x1f; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); public Short getMagicCode() { return MAGIC; } @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { // 對 Request 對象進行編碼 encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 對 Response 對象進行編碼,后面分析 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // 創(chuàng)建消息頭字節(jié)數(shù)組,長度為 16 byte[] header = new byte[HEADER_LENGTH]; // 設(shè)置魔數(shù) Bytes.short2bytes(MAGIC, header); // 設(shè)置數(shù)據(jù)包類型(Request/Response)和序列化器編號 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // 設(shè)置通信方式(單向/雙向) if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; } // 設(shè)置事件標(biāo)識 if (req.isEvent()) { header[2] |= FLAG_EVENT; } // 設(shè)置請求編號,8個字節(jié),從第4個字節(jié)開始設(shè)置 Bytes.long2bytes(req.getId(), header, 4); // 獲取 buffer 當(dāng)前的寫位置 int savedWriteIndex = buffer.writerIndex(); // 更新 writerIndex,為消息頭預(yù)留 16 個字節(jié)的空間 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 創(chuàng)建序列化器,比如 Hessian2ObjectOutput ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { // 對事件數(shù)據(jù)進行序列化操作 encodeEventData(channel, out, req.getData()); } else { // 對請求數(shù)據(jù)進行序列化操作 encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 獲取寫入的字節(jié)數(shù),也就是消息體長度 int len = bos.writtenBytes(); checkPayload(channel, len); // 將消息體長度寫入到消息頭中 Bytes.int2bytes(len, header, 12); // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準(zhǔn)備 buffer.writerIndex(savedWriteIndex); // 從 savedWriteIndex 下標(biāo)處寫入消息頭 buffer.writeBytes(header); // 設(shè)置新的 writerIndex,writerIndex = 原寫下標(biāo) + 消息頭長度 + 消息體長度 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } // 省略其他方法 }
以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數(shù)組中。然后對 Request 對象的 data 字段執(zhí)行序列化操作,序列化后的數(shù)據(jù)最終會存儲到 ChannelBuffer 中。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節(jié)數(shù)組 header 寫入到 ChannelBuffer 中,整個編碼過程就結(jié)束了。本節(jié)的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:
public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; // 依次序列化 dubbo version、path、version out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); // 序列化調(diào)用方法名 out.writeUTF(inv.getMethodName()); // 將參數(shù)類型轉(zhuǎn)換為字符串,并進行序列化 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { // 對運行時參數(shù)進行序列化 out.writeObject(encodeInvocationArgument(channel, inv, i)); } // 序列化 attachments out.writeObject(inv.getAttachments()); } }
至此,關(guān)于服務(wù)消費方發(fā)送請求的過程就分析完了,接下來我們來看一下服務(wù)提供方是如何接收請求的。
提供方接收請求
(1) 請求解碼
這里直接分析請求數(shù)據(jù)的解碼邏輯,忽略中間過程,如下:
public class ExchangeCodec extends TelnetCodec { @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); // 創(chuàng)建消息頭字節(jié)數(shù)組 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // 讀取消息頭數(shù)據(jù) buffer.readBytes(header); // 調(diào)用重載方法進行后續(xù)解碼工作 return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // 檢查魔數(shù)是否相等 if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } // 通過 telnet 命令行發(fā)送的數(shù)據(jù)包不包含消息頭,所以這里 // 調(diào)用 TelnetCodec 的 decode 方法對數(shù)據(jù)包進行解碼 return super.decode(channel, buffer, readable, header); } // 檢測可讀數(shù)據(jù)量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // 從消息頭中獲取消息體長度 int len = Bytes.bytes2int(header, 12); // 檢測消息體長度是否超出限制,超出則拋出異常 checkPayload(channel, len); int tt = len + HEADER_LENGTH; // 檢測可讀的字節(jié)數(shù)是否小于實際的字節(jié)數(shù) if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { // 繼續(xù)進行解碼工作 return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
上面方法通過檢測消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包,比如通過 telnet 命令行發(fā)出的數(shù)據(jù)包。接著再對消息體長度,以及可讀字節(jié)數(shù)進行檢測。最后調(diào)用 decodeBody 方法進行后續(xù)的解碼工作,ExchangeCodec 中實現(xiàn)了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調(diào)用。下面我們來看一下該方法的代碼。
public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { // 獲取消息頭中的第三個字節(jié),并通過邏輯與運算得到序列化器編號 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 獲取調(diào)用編號 long id = Bytes.bytes2long(header, 4); // 通過邏輯與運算得到調(diào)用類型,0 - Response,1 - Request if ((flag & FLAG_REQUEST) == 0) { // 對響應(yīng)結(jié)果進行解碼,得到 Response 對象。這個非本節(jié)內(nèi)容,后面再分析 // ... } else { // 創(chuàng)建 Request 對象 Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); // 通過邏輯與運算得到通信方式,并設(shè)置到 Request 對象中 req.setTwoWay((flag & FLAG_TWOWAY) != 0); // 通過位運算檢測數(shù)據(jù)包是否為事件類型 if ((flag & FLAG_EVENT) != 0) { // 設(shè)置心跳事件到 Request 對象中 req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { // 對心跳包進行解碼,該方法已被標(biāo)注為廢棄 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { // 對事件數(shù)據(jù)進行解碼 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcInvocation inv; // 根據(jù) url 參數(shù)判斷是否在 IO 線程上對消息體進行解碼 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); // 在當(dāng)前線程,也就是 IO 線程上進行后續(xù)的解碼工作。此工作完成后,可將 // 調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來 inv.decode(); } else { // 僅創(chuàng)建 DecodeableRpcInvocation 對象,但不在當(dāng)前線程上執(zhí)行解碼邏輯 inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } // 設(shè)置 data 到 Request 對象中 req.setData(data); } catch (Throwable t) { // 若解碼過程中出現(xiàn)異常,則將 broken 字段設(shè)為 true, // 并將異常對象設(shè)置到 Reqeust 對象中 req.setBroken(true); req.setData(t); } return req; } } }
如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調(diào)用 DecodeableRpcInvocation 的 decode 方法進行后續(xù)的解碼工作。此工作完成后,可將調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來。
(2)調(diào)用服務(wù)
解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續(xù)向下傳遞。整個調(diào)用棧如下:
NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable) // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯
這里我們直接分析調(diào)用棧中的分析第一個和最后一個調(diào)用方法邏輯。如下:
考慮到篇幅,以及很多中間調(diào)用的邏輯并非十分重要,所以這里就不對調(diào)用棧中的每個方法都進行分析了。這里我們直接分析最后一個調(diào)用方法邏輯。如下:
public class ChannelEventRunnable implements Runnable { private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @Override public void run() { // 檢測通道狀態(tài),對于請求或響應(yīng)消息,此時 state = RECEIVED if (state == ChannelState.RECEIVED) { try { // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續(xù)的調(diào)用 handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..."); } } // 其他消息類型通過 switch 進行處理 else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("... operation error, channel is ..."); } break; case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); } } } }
如上,請求和響應(yīng)消息出現(xiàn)頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉(zhuǎn)站,它的 run 方法中并不包含具體的調(diào)用邏輯,僅用于將參數(shù)傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。
public class DecodeHandler extends AbstractChannelHandlerDelegate { public DecodeHandler(ChannelHandler handler) { super(handler); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { // 對 Decodeable 接口實現(xiàn)類對象進行解碼 decode(message); } if (message instanceof Request) { // 對 Request 的 data 字段進行解碼 decode(((Request) message).getData()); } if (message instanceof Response) { // 對 Request 的 result 字段進行解碼 decode(((Response) message).getResult()); } // 執(zhí)行后續(xù)邏輯 handler.received(channel, message); } private void decode(Object message) { // Decodeable 接口目前有兩個實現(xiàn)類, // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult if (message != null && message instanceof Decodeable) { try { // 執(zhí)行解碼邏輯 ((Decodeable) message).decode(); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }
DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的 Request 對象會繼續(xù)向后傳遞
public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 獲取 Invoker 實例 Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // 回調(diào)相關(guān),忽略 } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 通過 Invoker 調(diào)用具體的服務(wù) return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..."); } // 忽略其他方法 } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { // 忽略回調(diào)和本地存根相關(guān)邏輯 // ... int port = channel.getLocalAddress().getPort(); // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如: // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // 從 exporterMap 查找與 serviceKey 相對應(yīng)的 DubboExporter 對象, // 服務(wù)導(dǎo)出過程中會將 <serviceKey, DubboExporter> 映射關(guān)系存儲到 exporterMap 集合中 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service ..."); // 獲取 Invoker 對象,并返回 return exporter.getInvoker(); } // 忽略其他方法 }
在之前課程中介紹過,服務(wù)全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過 Invoker 的 invoke 方法調(diào)用服務(wù)邏輯
public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { try { // 調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用,并將調(diào)用結(jié)果封裝到 RpcResult 中,并 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..."); } } protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable; }
如上,doInvoke 是一個抽象方法,這個需要由具體的 Invoker 實例實現(xiàn)。Invoker 實例是在運行時通過 JavassistProxyFactory 創(chuàng)建的,創(chuàng)建邏輯如下:
public class JavassistProxyFactory extends AbstractProxyFactory { // 省略其他方法 @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創(chuàng)建匿名類對象 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調(diào)用 invokeMethod 方法進行后續(xù)的調(diào)用 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現(xiàn)類,并實現(xiàn) invokeMethod 方法,該方法最終會根據(jù)調(diào)用信息調(diào)用具體的服務(wù)。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其他方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 類型轉(zhuǎn)換 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根據(jù)方法名調(diào)用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }
到這里,整個服務(wù)調(diào)用過程就分析完了。最后把調(diào)用過程貼出來,如下:
ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper0#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String)
提供方返回調(diào)用結(jié)果
服務(wù)提供方調(diào)用指定服務(wù)后,會將調(diào)用結(jié)果封裝到 Response 對象中,并將該對象返回給服務(wù)消費方。服務(wù)提供方也是通過 NettyChannel 的 send 方法將 Response 對象返回,這里就不在重復(fù)分析了。本節(jié)我們僅需關(guān)注 Response 對象的編碼過程即可。
public class ExchangeCodec extends TelnetCodec { public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 對響應(yīng)對象進行編碼 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // 創(chuàng)建消息頭字節(jié)數(shù)組 byte[] header = new byte[HEADER_LENGTH]; // 設(shè)置魔數(shù) Bytes.short2bytes(MAGIC, header); // 設(shè)置序列化器編號 header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) header[2] |= FLAG_EVENT; // 獲取響應(yīng)狀態(tài) byte status = res.getStatus(); // 設(shè)置響應(yīng)狀態(tài) header[3] = status; // 設(shè)置請求編號 Bytes.long2bytes(res.getId(), header, 4); // 更新 writerIndex,為消息頭預(yù)留 16 個字節(jié)的空間 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { // 對心跳響應(yīng)結(jié)果進行序列化,已廢棄 encodeHeartbeatData(channel, out, res.getResult()); } else { // 對調(diào)用結(jié)果進行序列化 encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { // 對錯誤信息進行序列化 out.writeUTF(res.getErrorMessage()) }; out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 獲取寫入的字節(jié)數(shù),也就是消息體長度 int len = bos.writtenBytes(); checkPayload(channel, len); // 將消息體長度寫入到消息頭中 Bytes.int2bytes(len, header, 12); // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準(zhǔn)備 buffer.writerIndex(savedWriteIndex); // 從 savedWriteIndex 下標(biāo)處寫入消息頭 buffer.writeBytes(header); // 設(shè)置新的 writerIndex,writerIndex = 原寫下標(biāo) + 消息頭長度 + 消息體長度 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // 異常處理邏輯不是很難理解,但是代碼略多,這里忽略了 } } } public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; // 檢測當(dāng)前協(xié)議版本是否支持帶有 attachment 集合的 Response 對象 boolean attach = Version.isSupportResponseAttachment(version); Throwable th = result.getException(); // 異常信息為空 if (th == null) { Object ret = result.getValue(); // 調(diào)用結(jié)果為空 if (ret == null) { // 序列化響應(yīng)類型 out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } // 調(diào)用結(jié)果非空 else { // 序列化響應(yīng)類型 out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); // 序列化調(diào)用結(jié)果 out.writeObject(ret); } } // 異常信息非空 else { // 序列化響應(yīng)類型 out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); // 序列化異常對象 out.writeObject(th); } if (attach) { // 記錄 Dubbo 協(xié)議版本 result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); // 序列化 attachments 集合 out.writeObject(result.getAttachments()); } } }
以上就是 Response 對象編碼的過程,和前面分析的 Request 對象編碼過程很相似。如果大家能看 Request 對象的編碼邏輯,那么這里的 Response 對象的編碼邏輯也不難理解,就不多說了。接下來我們再來分析雙向通信的最后一環(huán) —— 服務(wù)消費方接收調(diào)用結(jié)果。
消費方接收調(diào)用結(jié)果
服務(wù)消費方在收到響應(yīng)數(shù)據(jù)后,首先要做的事情是對響應(yīng)數(shù)據(jù)進行解碼,得到 Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是 NettyHandler。接下來 NettyHandler 會將這個對象繼續(xù)向下傳遞,最后 AllChannelHandler 的 received 方法會收到這個對象,并將這個對象派發(fā)到線程池中。這個過程和服務(wù)提供方接收請求的過程是一樣的,因此這里就不重復(fù)分析了。
(1)響應(yīng)數(shù)據(jù)解碼
響應(yīng)數(shù)據(jù)解碼邏輯主要的邏輯封裝在 DubboCodec 中,我們直接分析這個類的代碼。如下:
public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 獲取請求編號 long id = Bytes.bytes2long(header, 4); // 檢測消息類型,若下面的條件成立,表明消息類型為 Response if ((flag & FLAG_REQUEST) == 0) { // 創(chuàng)建 Response 對象 Response res = new Response(id); // 檢測事件標(biāo)志位 if ((flag & FLAG_EVENT) != 0) { // 設(shè)置心跳事件 res.setEvent(Response.HEARTBEAT_EVENT); } // 獲取響應(yīng)狀態(tài) byte status = header[3]; // 設(shè)置響應(yīng)狀態(tài) res.setStatus(status); // 如果響應(yīng)狀態(tài)為 OK,表明調(diào)用過程正常 if (status == Response.OK) { try { Object data; if (res.isHeartbeat()) { // 反序列化心跳數(shù)據(jù),已廢棄 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) { // 反序列化事件數(shù)據(jù) data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcResult result; // 根據(jù) url 參數(shù)決定是否在 IO 線程上執(zhí)行解碼邏輯 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { // 創(chuàng)建 DecodeableRpcResult 對象 result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); // 進行后續(xù)的解碼工作 result.decode(); } else { // 創(chuàng)建 DecodeableRpcResult 對象 result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } // 設(shè)置 DecodeableRpcResult 對象到 Response 對象中 res.setResult(data); } catch (Throwable t) { // 解碼過程中出現(xiàn)了錯誤,此時設(shè)置 CLIENT_ERROR 狀態(tài)碼到 Response 對象中 res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } // 響應(yīng)狀態(tài)非 OK,表明調(diào)用過程出現(xiàn)了異常 else { // 反序列化異常信息,并設(shè)置到 Response 對象中 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else { // 對請求數(shù)據(jù)進行解碼,前面已分析過,此處忽略 } } }
以上就是響應(yīng)數(shù)據(jù)的解碼過程,上面邏輯看起來是不是似曾相識。對的,我們在前面章節(jié)分析過 DubboCodec 的 decodeBody 方法中關(guān)于請求數(shù)據(jù)的解碼過程,該過程和響應(yīng)數(shù)據(jù)的解碼過程很相似。下面,我們繼續(xù)分析調(diào)用結(jié)果的反序列化過程
public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable { private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class); private Channel channel; private byte serializationType; private InputStream inputStream; private Response response; private Invocation invocation; private volatile boolean hasDecoded; public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) { Assert.notNull(channel, "channel == null"); Assert.notNull(response, "response == null"); Assert.notNull(is, "inputStream == null"); this.channel = channel; this.response = response; this.inputStream = is; this.invocation = invocation; this.serializationType = id; } @Override public void encode(Channel channel, OutputStream output, Object message) throws IOException { throw new UnsupportedOperationException(); } @Override public Object decode(Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); // 反序列化響應(yīng)類型 byte flag = in.readByte(); switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE: break; case DubboCodec.RESPONSE_VALUE: handleValue(in); break; case DubboCodec.RESPONSE_WITH_EXCEPTION: handleException(in); break; // 返回值為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: handleAttachment(in); break; //返回值不為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: handleValue(in); handleAttachment(in); break; // 異常對象不為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: handleException(in); handleAttachment(in); break; default: throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag); } if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this; }
正常調(diào)用下,線程會進入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后線程會從 invocation 變量(大家探索一下 invocation 變量的由來)中獲取返回值類型,接著對調(diào)用結(jié)果進行反序列化,并將序列化后的結(jié)果存儲起來。最后對 attachments 集合進行反序列化,并存到指定字段中。
異步轉(zhuǎn)同步
Dubbo發(fā)送數(shù)據(jù)至服務(wù)方后,在通信層面是異步的,通信線程并不會等待結(jié)果數(shù)據(jù)返回。而我們在使用Dubbo進行RPC調(diào)用缺省就是同步的,這其中就涉及到了異步轉(zhuǎn)同步的操作。
而在2.7.x版本中,這種自實現(xiàn)的異步轉(zhuǎn)同步操作進行了修改。新的`DefaultFuture`繼承了`CompletableFuture`,新的`doReceived(Response res)`方法如下:
private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
通過`CompletableFuture#complete`方法來設(shè)置異步的返回結(jié)果,且刪除舊的`get()`方法,使用`CompletableFuture#get()`方法:
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); }
使用`CompletableFuture`完成了異步轉(zhuǎn)同步的操作。
異步多線程數(shù)據(jù)一致
這里簡單說明一下。一般情況下,服務(wù)消費方會并發(fā)調(diào)用多個服務(wù),每個用戶線程發(fā)送請求后,會調(diào)用 get 方法進行等待。 一段時間后,服務(wù)消費方的線程池會收到多個響應(yīng)對象。這個時候要考慮一個問題,如何將每個響應(yīng)對象傳遞給相應(yīng)的 Future 對象,不出錯。答案是通過調(diào)用**編號**。Future 被創(chuàng)建時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調(diào)用編號,并將 <調(diào)用編號, DefaultFuture 對象> 映射關(guān)系存入到靜態(tài) Map 中,即 FUTURES。線程池中的線程在收到 Response 對象后,會根據(jù) Response 對象中的調(diào)用編號到 FUTURES 集合中取出相應(yīng)的 DefaultFuture 對象,然后再將 Response 對象設(shè)置到 DefaultFuture 對象中。這樣用戶線程即可從 DefaultFuture 對象中獲取調(diào)用結(jié)果了。整個過程大致如下圖:
private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); }
心跳檢查
Dubbo采用雙向心跳的方式檢測Client端與Server端的連通性。
我們再來看看 Dubbo 是如何設(shè)計應(yīng)用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會給服務(wù)端發(fā)送心跳,反之,服務(wù)端也會向客戶端發(fā)送心跳。
創(chuàng)建定時器
public class HeaderExchangeClient implements ExchangeClient { private final Client client; private final ExchangeChannel channel; private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL); private HeartbeatTimerTask heartBeatTimerTask; private ReconnectTimerTask reconnectTimerTask; public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client); if (startTimer) { URL url = client.getUrl(); //開啟心跳失敗之后處理重連,斷連的邏輯定時任務(wù) startReconnectTask(url); //開啟發(fā)送心跳請求定時任務(wù) startHeartBeatTask(url); } }
Dubbo 在 `HeaderExchangeClient `初始化時開啟了兩個定時任務(wù)
`startReconnectTask` 主要用于定時發(fā)送心跳請求
`startHeartBeatTask` 主要用于心跳失敗之后處理重連,斷連的邏輯
發(fā)送心跳請求
詳細(xì)解析下心跳檢測定時任務(wù)的邏輯 `HeartbeatTimerTask#doTask`:
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); } }
前面已經(jīng)介紹過,**Dubbo 采取的是雙向心跳設(shè)計**,即服務(wù)端會向客戶端發(fā)送心跳,客戶端也會向服務(wù)端發(fā)送心跳,接收的一方更新 lastRead 字段,發(fā)送的一方更新 lastWrite 字段,超過心跳間隙的時間,便發(fā)送心跳請求給對端。這里的 lastRead/lastWrite 同樣會被同一個通道上的普通調(diào)用更新,通過更新這兩個字段,實現(xiàn)了只在連接空閑時才會真正發(fā)送空閑報文的機制,符合我們一開始科普的做法。
處理重連和斷連
繼續(xù)研究下重連和斷連定時器都實現(xiàn)了什么 `ReconnectTimerTask#doTask`。
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long now = now(); if (!channel.isConnected()) { ((Client) channel).reconnect(); // check pong at client } else if (lastRead != null && now - lastRead > idleTimeout) { ((Client) channel).reconnect(); } }
第二個定時器則負(fù)責(zé)根據(jù)客戶端、服務(wù)端類型來對連接做不同的處理,當(dāng)超過設(shè)置的心跳總時間之后,客戶端選擇的是重新連接,服務(wù)端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調(diào)用是強依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接。
Dubbo 對于建立的每一個連接,同時在客戶端和服務(wù)端開啟了 2 個定時器,一個用于定時發(fā)送心跳,一個用于定時重連、斷連,執(zhí)行的頻率均為各自檢測周期的 1/3。定時發(fā)送心跳的任務(wù)負(fù)責(zé)在連接空閑時,向?qū)Χ税l(fā)送心跳包。定時重連、斷連的任務(wù)負(fù)責(zé)檢測 lastRead 是否在超時周期內(nèi)仍未被更新,如果判定為超時,客戶端處理的邏輯是重連,服務(wù)端則采取斷連的措施。