我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Dubbo 源码分析 —— NIO 服务器(一)之抽象 API

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

从本小节开始,我们来分享 Dubbo 自己实现的 NIO 服务器,使用在 dubbo://thrift:// 协议上。

在 NIO 框架的选型上,强大的 Java 社区里有 mina、netty、grizzly 等,甚至 netty 提供了 3.x 和 4.x 的版本。那么该咋办呢?

Dubbo 开发团队的选择是:

  • API 层:
    • dubbo-remoting-api
  • 实现层:
    • dubbo-remoting-netty3
    • dubbo-remoting-netty4
    • dubbo-remoting-mina
    • dubbo-remoting-grizzly
    • dubbo-remoting-p2p

再配合上 Dubbo SPI 的机制,使用者可以自定义使用哪一种具体的实现。美滋滋。

2. 一览

还是老样子,笔者习惯性对代码量进行下统计,dubbo-remoting代码量如下图:

代码量

WTF !!! dubbo-remoting-api 的代码量竟然近万行

{__/}
( • - •)
/つ淡定 @胖友

我们来首先看一张图:

FROM 《Dubbo 开发指南 —— 框架设计》

整体设计

红框部分,Protocol => Exchange => Transport => Serialize 的调用顺序。

  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

在笔者初看 dubbo-remoting-api 的代码时,对 exchangetransport 的理解是比较模糊的。简单来说,exchangetransport 之上,构造了 Request,Response 模型,一个请求对应一个响应。这样的方式,才符合我们实际业务开发的需要。当然,即使是 Request,Response 也分成同步和异步返回,重要的是,能够一一映射

胖友如果有兴趣,可以看看:

看完以上知识,我们在回过头看 dubbo-remoting-api 的项目结构就清晰了:

dubbo-remoting-api

  • 最外层:通用接口。
  • buffer 包:缓冲区。
  • exchange 包:信息交换层
  • transporter 包:网络传输层
  • telnet 包:Telnet 命令

如上的每一层/包,我们都会独立一篇文章,进行分享。
当然,dubbo-remoting-api 模块,只负责 API 层抽象和部分实现,最终能够真正通信,需要 dubbo-remoting-netty 等等模块来实现。对应的每一个实现模块,我们也是独立一篇文章。

🙂 是不是很清晰,美滋滋?

3. 最外层:通用接口

胖友先看看 《Netty4.0学习笔记系列之一:Server与Client的通讯》 教程文章。在该文章中,我们可以看到使用 Netty 在四个类:

  • 1、HelloServer :server类,启动Netty server
  • 2、HelloServerInHandler:server的handler,接收客户端消息,并向客户端发送消息
  • 3、HelloClient:client类,建立于Netty server的连接
  • 4、HelloClientIntHandler:client的handler,接收server端的消息,并向服务端发送消息

恰好,和 dubbo-remoting-api 模块,定义的 Server、Client、ChannelHandler 接口对应。我们以 dubbo-remoting-netty4 模块的,举例子。整理如下:

上文 dubbo-remoting-api dubbo-remoting-netty4
HelloServer Server 接口 NettyServer 实现类
HelloServerInHandler:server ChannelHandler 接口 NettyServerHandler 实现类
HelloClient Client 接口 NettyClient 实现类
HelloServerInHandler ChannelHandler 接口 NettyClientHandler 实现类

因为教程文章,以教程 Demo 为准,实际会有更多需要抽象的,例如:Codec 协议编解码,Dispatcher 消息等分发。胖友再来看看 dubbo:// 的处理流程:

FROM 《Dubbo 用户指南 —— dubbo://》
dubbo://

  • Transporter: mina, netty, grizzy
  • Serialization: dubbo, hessian2, java, json
  • Dispatcher: all, direct, message, execution, connection
  • ThreadPool: fixed, cached

如果这个图读不懂,没关系,下面我们来看看每个接口。后面,胖友可以回过头来理解这个图。

本文涉及的类图如下

类图

4. Endpoint

com.alibaba.dubbo.remoting.Endpoint端点接口。方法如下:

  • 属性相关

    URL getUrl();

    InetSocketAddress getLocalAddress();

    ChannelHandler getChannelHandler();
  • 发送消息

    void send(Object message) throws RemotingException;
    void send(Object message, boolean sent) throws RemotingException;
  • 关系相关

    void close();
    void close(int timeout);

    void startClose();

    boolean isClosed();

Endpoint ,从中文上解释来说是,“端点”。从字面上来看,不太容易理解。在 dubbo-remoting-api 中,一个 Client 或 Server ,都是一个 Endpoint 。🙂 不同系统的,Endpoint 代表的会略有差距,例如 SpringMVC 中,一个请求 Restful URL 也可以是一个 Endpoint ,胖友可以 Google 查询,理解更多。

4.1 Channel

com.alibaba.dubbo.remoting.Channel ,继承 Endpoint 接口,通道接口。方法如下:

  • 连接相关

    InetSocketAddress getRemoteAddress();

    boolean isConnected();
  • 属性相关

    boolean hasAttribute(String key);
    Object getAttribute(String key);
    void setAttribute(String key, Object value);
    void removeAttribute(String key);

和 Netty Channel 一致,通讯的载体。在后面的文章,我们会看到在 dubbo-remoting-netty4 项目中,NettyChannel 是 Dubbo Channel 的实现,内部有真正的 Netty Channel 属性,用于通讯

4.2 Client

com.alibaba.dubbo.remoting.Client ,实现 Endpoint 和 Channel 和 Resetable 接口,客户端接口。方法如下:

// 重连
void reconnect() throws RemotingException;

4.3 Server

com.alibaba.dubbo.remoting.Server ,继承 Endpoint 和 Resetable 接口,服务器接口。方法如下:

// 是否绑定本地端口,提供服务。即,是否启动成功,可连接,接收消息等。
boolean isBound();

// 获得连接上服务器的通道(客户端)们
Collection<Channel> getChannels();
Channel getChannel(InetSocketAddress remoteAddress);

4.3.1 Resetable

com.alibaba.dubbo.common.Resetable可重置接口。方法如下:

void reset(URL url);

Server 实现 Resetable 接口,在实现 #reset(url) 方法,用于根据新传入的 url 属性,重置自己内部的一些属性,例如 AbstractServer#reset(url) 方法。

5. ChannelHandler

com.alibaba.dubbo.remoting.ChannelHandler通道处理器接口。方法如下:

void connected(Channel channel) throws RemotingException;
void disconnected(Channel channel) throws RemotingException;

void sent(Channel channel, Object message) throws RemotingException;
void received(Channel channel, Object message) throws RemotingException;

void caught(Channel channel, Throwable exception) throws RemotingException;

和 Netty ChannelHandler 一致,负责 Channel 中的逻辑处理。在后面的文章,我们会看到在 dubbo-remoting-netty4 项目中,NettyServerHandler 是 Netty ChannelHandler 的实现,内部调用 Netty ChannelHandler 的方法,进行逻辑处理。

6. Transporter

com.alibaba.dubbo.remoting.Transporter网络传输接口。方法如下:

@SPI("netty")
public interface Transporter {

/**
* Bind a server.
*
* 绑定一个服务器
*
* @param url server url
* @param handler 通道处理器
* @return server 服务器
* @throws RemotingException 当绑定发生异常时
* @see com.alibaba.dubbo.remoting.Transporters#bind(URL, Receiver, ChannelHandler)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;

/**
* Connect to a server.
*
* 连接一个服务器,即创建一个客户端
*
* @param url server url 服务器地址
* @param handler 通道处理器
* @return client 客户端
* @throws RemotingException 当连接发生异常时
* @see com.alibaba.dubbo.remoting.Transporters#connect(URL, Receiver, ChannelListener)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;

}
  • @SPI("netty") 注解,Dubbo SPI 拓展点,默认为 "netty" 。注意,此处的 netty 对应的是 netty3 ,因为 Dubbo 项目在开发时,netty4 并未发布。配置方式见 《Dubbo 用户指南 —— Netty4》 文档。
  • @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Server 实现,使用 URL.serverURL.transporter 属性。
  • @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Client 实现,使用 URL.clientURL.transporter 属性。

6.1 Transporters

com.alibaba.dubbo.remoting.Transporters ,Transporter 门面类。

友情提示:Facade 设计模式,参见 《 设计模式(九)外观模式Facade(结构型)》 文章。

#bind(String url, ChannelHandler... handler) 静态方法,绑定一个服务器。代码如下:

 1: public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
2: return bind(URL.valueOf(url), handler);
3: }
4:
5: public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
6: if (url == null) {
7: throw new IllegalArgumentException("url == null");
8: }
9: if (handlers == null || handlers.length == 0) {
10: throw new IllegalArgumentException("handlers == null");
11: }
12: // 创建 handler
13: ChannelHandler handler;
14: if (handlers.length == 1) {
15: handler = handlers[0];
16: } else {
17: handler = new ChannelHandlerDispatcher(handlers);
18: }
19: // 创建 Server 对象
20: return getTransporter().bind(url, handler);
21: }
  • Transporter#bind(url, handler) 方法,对应。
  • 第 12 至 18 行:创建 handler 。若 handlers 是多个,使用 ChannelHandlerDispatcher 进行封装。在 ChannelHandlerDispatcher 中,会循环调用 handlers ,对应的方法。
  • 第 20 行:调用 #getTransporter() 方法,基于 Dubbo SPI 机制,获得 Transporter$Adaptive 对象。代码如下:

    public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
  • 第 20 行:调用 Transporter#bind(url, handler) 方法,在 Transporter$Adaptive 对象中,会根据 url 参数,获得对应的 Transporter 实现对象(例如, NettyTransporter),从而创建对应的 Server 对象(例如, NettyServer)。

另外,还有一个 #connect(url, handler) 静态方法,连接一个服务器,即创建一个客户端。🙂 和上面方法类似,胖友自己看咯。

7. Codec2

com.alibaba.dubbo.remoting.Codec2编解码器接口。代码如下:

/**
* 编码
*
* @param channel 通道
* @param buffer Buffer
* @param message 消息
* @throws IOException 当编码发生异常时
*/
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

/**
* 解码
*
* @param channel 通道
* @param buffer Buffer
* @return 消息
* @throws IOException 当解码发生异常时
*/
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
  • @SPI("netty") 注解,Dubbo SPI 拓展点
  • @Adaptive({Constants.CODEC_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Codec2 实现,使用 URL.codec 属性。

另外,解码过程中,需要解决 TCP 拆包、粘包的场景,因此解码结果如下:

// Codec2.java
enum DecodeResult {
/**
* 需要更多输入
*/
NEED_MORE_INPUT,
/**
* 忽略一些输入
*/
SKIP_SOME_INPUT
}

7.1 Codec

com.alibaba.dubbo.remoting.Codec 老的编解码器接口,被 Codec2 取代。

通过 CodecAdapter ,将 Codec 适配成 Codec2 。

7.2 Decodeable

com.alibaba.dubbo.remoting.Decodeable可解码的接口。方法如下:

// 解码
void decode() throws Exception;

8. Dispatcher

com.alibaba.dubbo.remoting.Dispatcher调度器接口。方法如下:

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
// The last two parameters are reserved for compatibility with the old configuration
ChannelHandler dispatch(ChannelHandler handler, URL url);

}
  • @SPI(AllDispatcher.NAME) 注解,Dubbo SPI 拓展点,默认为 "all"
  • @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 ChanelHander 实现,使用 URL.dispatcher 属性。
  • 为什么传入的 handler 参数,创建返回的还是 ChannelHandler 对象呢?感兴趣的胖友,可以提前看下 AllChannelHandler《Dubbo 用户指南 —— 线程模型》 。🙂 文章,后面见。

9. RemotingException

com.alibaba.dubbo.remoting.RemotingException ,实现 Exception 类,dubbo-remoting-api基础异常。代码如下:

public class RemotingException extends Exception {

/**
* 本地地址
*/
private InetSocketAddress localAddress;
/**
* 远程地址
*/
private InetSocketAddress remoteAddress;

// ... 省略方法
}

9.1 ExecutionException

com.alibaba.dubbo.remoting.ExecutionException ,实现 RemotingException 类,执行异常。代码如下:

public class ExecutionException extends RemotingException {

/**
* 请求
*/
private final Object request;

// ... 省略方法
}

9.2 TimeoutException

com.alibaba.dubbo.remoting.TimeoutException ,实现 RemotingException 类,超时异常。代码如下:

public class TimeoutException extends RemotingException {

/**
* 客户端
*/
public static final int CLIENT_SIDE = 0;
/**
* 服务端
*/
public static final int SERVER_SIDE = 1;

/**
* 阶段
*/
private final int phase;

// ... 省略方法
}

666. 彩蛋

知识星球

dubbo-remoting-api 模块,涉及到大量的封装,如果不太理解的胖友,建议多多调试。

毕业那年,也尝试过想,封装一个通用的 API 层,屏蔽 Netty、Mina 差异的细节,写的比较戳。现在看到 Dubbo 实现的方式,Get 新姿势了。

总访客数 && 总访问量