我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Dubbo 源码解析 —— 集群容错(一)之抽象 API

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

从本文开始,我们来分享 Dubbo 的集群容错功能的实现。

《精尽 Dubbo 源码分析 —— 项目结构一览》「3.4 dubbo-cluster」 中,我们对 Dubbo 的 dubbo-cluster 项目,做了整体的代码结构做了介绍。如果已经没什么印象的胖友,请先回过头找回失散的记忆。

Dubbo 对集群容错功能,实现了很好的 package 拆分,因此我们按照如下顺序:

  1. 抽象 API
  • Cluster 实现
  • Directory 实现
  • LoadBalance 实现
  • Merger 实现
  • Router 实现
  • Configurator 实现

一个主题,对应一篇文章。那么,本文当然是分享抽象 API。考虑到干巴巴的看抽象 API 会很容易一脸懵逼,所以我们会使用 FailoverCluster 贯穿本文。

2. 整体流程

集群容错

  • 🙂 只看红线。
  • 左边 invoke :通过 Cluster 暴露 Invoker 对象,从而实现统一透明的调用过程。
  • 右边 list :通过 Directory 中,获取可调用的 Invoker 集合。
  • 右边 route :通过 Router ,过滤符合路由规则的 Invoker 集合。
  • 右边 select :通过 LoadBalance ,根据负载均衡机制选择一个符合的 Invoker 对象。
  • 右边 invoke :调用该 Invoker 对象。

3. Cluster

com.alibaba.dubbo.rpc.cluster.Cluster ,集群接口。代码如下:

@SPI(FailoverCluster.NAME)
public interface Cluster {

/**
* Merge the directory invokers to a virtual invoker.
*
* 基于 Directory ,创建 Invoker 对象,实现统一、透明的 Invoker 调用过程
*
* @param directory Directory 对象
* @param <T> 泛型
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;

}
  • @SPI(FailoverCluster.NAME) 注解,Dubbo SPI 拓展点,默认为 "failover" ,即失败重试,也就是会贯穿本文的 FailoverCluster 类。
  • @Adaptive 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Cluster 实现,使用 URL.cluster 属性。
  • #join(Directory<T>) 接口方法,基于 Directory ,创建 Invoker 对象,实现统一、透明的 Invoker 调用过程。

3.1 join 方法

在 RegistryProtocol 的 #doRefer(Cluster, Registry, type, url) 方法中,会调用 Cluster#join(directory) 方法,创建 Invoker 对象。代码如下:

private Cluster cluster; // <1>

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 对象,并设置注册中心 <2>
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);

// ... 省略无关代码

// 创建 Invoker 对象 <3>
Invoker invoker = cluster.join(directory);
// 向本地注册表,注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
  • <1>cluster 属性,Cluster$Adaptive 对象
  • <2> :创建 RegistryDirectory 对象。通过它,可以注册到一个注册中心的所有服务提供者,即上文提到的【右边 list】。
  • <3> :调用 Cluster#join(directory) 方法,创建 Invoker 对象。因为 cluster 是 Dubbo SPI Adaptive 类,所以可以自动获取到对应的 Cluster 实现类。

3.2 子类

Cluster 子类

我们可以看到,每个 Cluster 实现类,对应一个专属于其的 Invoker 实现类。本文分享的 FailoverCluster 的对应的 Invoker 为 FailoverClusterInvoker 。在看具体的代码之前,先一起来看看集群容错的调用( invoke )过程

4. 调用顺序图

如下是服务消费者的调用顺序图:

顺序图

  • 在 InvokerInvocationHandler 的 【4】#invoke(invocation)插入调用集群容错 Invoker#invoke(invocation)调用 ProtocolFilterWrapper$Invoker#invoke(invocation)
  • 调用栈如下图:调用栈
    • MockClusterInvoker ,胖友先无视,后续有详细文章,进行分享。

5. FailoverCluster

com.alibaba.dubbo.rpc.cluster.support.FailoverCluster ,实现 Cluster 接口,失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。代码如下:

public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}

}
  • 对应 Invoker 为 FailoverClusterInvoker 。

6. AbstractClusterInvoker

因为,FailoverClusterInvoker 继承 AbstractClusterInvoker ,所以我们来分享它。

com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker ,实现 Invoker 接口,Cluster Invoker 抽象类:

  • 实现例如选择一个符合 Invoker 对象等等公用方法
  • 定义 #doInvoke(Invocation, List<Invoker<T>>, LoadBalance) 抽象方法,实现子 Cluster 的 Invoker 实现类的服务调用的差异逻辑,代码如下:

    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;

6.1 构造方法

/**
* Directory 对象
*/
protected final Directory<T> directory;
/**
* 集群时是否排除非可用( available )的 Invoker ,默认为 true
*/
protected final boolean availablecheck;
/**
* 是否已经销毁
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
/**
* 粘滞连接 Invoker
*
* http://dubbo.apache.org/zh-cn/docs/user/demos/stickiness.html
* 粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。
* 粘滞连接将自动开启延迟连接,以减少长连接数。
*/
private volatile Invoker<T> stickyInvoker = null;

public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}

public AbstractClusterInvoker(Directory<T> directory, URL url) {
// 初始化 directory
if (directory == null) {
throw new IllegalArgumentException("service directory == null");
}
this.directory = directory;
// sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
// 初始化 availablecheck
this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK);
}
  • directory 字段,Directory 对象。通过它,可以获得所有服务提供者的 Invoker 对象。
  • availablecheck 字段,集群时是否排除非可用( available )的 Invoker ,默认为 "true" ,通过 "cluster.availablecheck" 配置项设置。
  • destroyed 字段,是否已经销毁。若已经销毁,则不允许在调用。
  • stickyInvoker 字段,粘滞连接 Invoker ,参见 《Dubbo 用户指南 —— 粘滞连接
    文档。

    粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。

6.2 list

#list(Invocation) 方法,获得所有服务提供者 Invoker 集合。代码如下:

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}

6.3 select

#select(LoadBalance, Invocation, invokers, selected) 方法,从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象。代码如下:

/**
* 使用 loadbalance 选择 invoker.
*
* @param loadbalance Loadbalance 对象,提供负责均衡策略
* @param invocation Invocation 对象
* @param invokers 候选的 Invoker 集合
* @param selected 已选过的 Invoker 集合. 注意:输入保证不重复
* @return 最终的 Invoker 对象
* @throws RpcException 当发生 RpcException 时
*/
1: protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
2: if (invokers == null || invokers.isEmpty()) {
3: return null;
4: }
5: // 获得 sticky 配置项,方法级
6: String methodName = invocation == null ? "" : invocation.getMethodName();
7: boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
8: {
9: // ignore overloaded method
10: // 若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择
11: if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
12: stickyInvoker = null;
13: }
14: // ignore cucurrent problem
15: // 若开启粘滞连接的特性,且 stickyInvoker 不存在于 selected 中,则返回 stickyInvoker 这个 Invoker 对象
16: if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
17: // 若开启排除非可用的 Invoker 的特性,则校验 stickyInvoker 是否可用。若可用,则进行返回
18: if (availablecheck && stickyInvoker.isAvailable()) {
19: return stickyInvoker;
20: }
21: }
22: }
23:
24: // 执行选择
25: Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
26:
27: // 若开启粘滞连接的特性,记录最终选择的 Invoker 到 stickyInvoker
28: if (sticky) {
29: stickyInvoker = invoker;
30: }
31: return invoker;
32: }
  • 该方法主要处理粘滞连接的特性,具体使用 Loadbalance 选择 Invoker 对象的逻辑,在 #doselect(loadbalance, invocation, invokers, selected) 方法中。
  • 第 5 至 22 行:获得粘滞连接 stickyInvoker 对象。
    • 第 6 至 7 行:获得方法级的 sticky 配置项。
    • 第 9 至 13 行:若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择。
    • 第 14 至 21 行:获得粘滞连接 stickyInvoker 对象。如要满足如下条件
      • 第 16 行:1)开启粘滞连接的特性;2)stickyInvoker 不存在于 selected 中。
      • 第 18 行:若开启排除非可用的 Invoker 的特性,则校验 stickyInvoker 是否可用。
  • 第 25 行:调用 #doselect(loadbalance, invocation, invokers, selected) 方法,执行选择一个 Invoker 对象。
  • 第 27 至 30 行:若开启粘滞连接的特性,记录最终选择的 Invoker 对象,到 stickyInvoker 中。

6.3.1 doselect

#doselect(loadbalance, invocation, invokers, selected) 方法,从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象。代码如下:

 1: private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
2: if (invokers == null || invokers.isEmpty()) {
3: return null;
4: }
5: // 【第一种】如果只有一个 Invoker ,直接选择
6: if (invokers.size() == 1) {
7: return invokers.get(0);
8: }
9: // 【第二种】如果只有两个 Invoker ,退化成轮循
10: // If we only have two invokers, use round-robin instead.
11: if (invokers.size() == 2 && selected != null && !selected.isEmpty()) {
12: return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
13: }
14:
15: // 【第三种】使用 Loadbalance ,选择一个 Invoker 对象。
16: Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
17:
18: // If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
19: // 如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
20: if ((selected != null && selected.contains(invoker))
21: || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
22: try {
23: //【第四种】重选一个 Invoker 对象
24: Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
25: if (rinvoker != null) {
26: invoker = rinvoker;
27: } else {
28: // Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
29: // 【第五种】看下第一次选的位置,如果不是最后,选+1位置.
30: int index = invokers.indexOf(invoker);
31: try {
32: // Avoid collision
33: // 最后在避免碰撞
34: invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
35: } catch (Exception e) {
36: logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
37: }
38: }
39: } catch (Throwable t) {
40: logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
41: }
42: }
43: return invoker;
44: }
  • 五种选择最终调用的 Invoker 对象的方式。
  • 【第一种】第 5 至 8 行:如果只有一个候选的 Invoker 对象,直接选择返回。😈 因为没的选择了。
  • 【第二种】第 9 至 13 行:如果只有两个候选的 Invoker 集合,退化为轮询。此处存在一个 BUG :

    转载自我飞哥《dubbo 源码 - 负载均衡》

    这里退化成轮询的实现有问题,对应源码return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);如果retries=4,即最多调用5次,且两个可选invoke分别为:

    10.0.0.1:20884,10.0.0.1:20886;

    那么5次选择的invoke为:

    • 10.0.0.1:20884
    • 10.0.0.1:20886
    • 10.0.0.1:20886
    • 10.0.0.1:20886
    • 10.0.0.1:20886,

    即除了第1次外后面的选择都是选择第二个invoker;

    因次需要把selected.get(0)修改为:selected.get(selected.size()-1);

    即每次拿前一次选择的invoker与 invokers.get(0)比较,如果相同,则选则另一个invoker;否则就选 invokers.get(0);

  • 【第三种】第 16 至 21 行:调用 Loadbalance#select(invokers, url, invocation) 方法,使用 Loadbalance ,选择一个 Invoker 对象。具体的代码实现,见 Loadbalance 的文章。

    • 这种方式的返回,选择的 Invoker 对象,需要满足两个条件:1)不存在于 selected 中。2)Invoker 是可用的,若开启排除非可用的 Invoker 的特性。
  • 【第四种】调用 #reselect(loadbalance, invocation, invokers, selected, availablecheck) 方法,重新选择一个 Invoker 对象。😈 因为此时 invokers 中,无法找到一个满足条件的 Invoker 对象。详细解析,见 「6.3.2 reselect」
  • 【第五种】顺序从候选的 invokers 集合中,选择一个 Invoker 对象,不考虑是否可用,又或者已经选择过,类似【第一种】【第二种】的方式。😈总之,保证能获取到一个 Invoker 对象。

6.3.2 reselect

#reselect(loadbalance, invocation, invokers, selected, availablecheck) 方法,重新选择一个 Invoker 对象。代码如下:

 1: private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
2: // Allocating one in advance, this list is certain to be used.
3: // 预先分配一个,这个列表是一定会用到的.
4: List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
5:
6: // First, try picking a invoker not in `selected`.
7: // 先从非select中选
8: if (availablecheck) { // invoker.isAvailable() should be checked
9: // 获得非选择过,并且可用的 Invoker 集合
10: for (Invoker<T> invoker : invokers) {
11: if (invoker.isAvailable()) { // 并且可用
12: if (selected == null || !selected.contains(invoker)) {
13: reselectInvokers.add(invoker);
14: }
15: }
16: }
17: // 使用 Loadbalance ,选择一个 Invoker 对象。
18: if (!reselectInvokers.isEmpty()) {
19: return loadbalance.select(reselectInvokers, getUrl(), invocation);
20: }
21: } else { // do not check invoker.isAvailable()
22: // 获得非选择过的 Invoker 集合
23: for (Invoker<T> invoker : invokers) {
24: if (selected == null || !selected.contains(invoker)) {
25: reselectInvokers.add(invoker);
26: }
27: }
28: // 使用 Loadbalance ,选择一个 Invoker 对象。
29: if (!reselectInvokers.isEmpty()) {
30: return loadbalance.select(reselectInvokers, getUrl(), invocation);
31: }
32: }
33: // Just pick an available invoker using loadbalance policy
34: // 最后从select中选可用的.
35: {
36: // 获得选择过的,并且可用的 Invoker 集合
37: if (selected != null) {
38: for (Invoker<T> invoker : selected) {
39: if ((invoker.isAvailable()) // available first
40: && !reselectInvokers.contains(invoker)) {
41: reselectInvokers.add(invoker);
42: }
43: }
44: }
45: // 使用 Loadbalance ,选择一个 Invoker 对象。
46: if (!reselectInvokers.isEmpty()) {
47: return loadbalance.select(reselectInvokers, getUrl(), invocation);
48: }
49: }
50: return null;
51: }
  • 第 4 行:预先创建一个重选 Invoker 集合,我们会发现很奇怪的一段 invokers.size() - 1 代码。这是为什么呢?笔者的理解是,出现重选 #reselect(...) 的原因,说明 #doselect(...) 的【第三种】选择的 Invoker 对象,在 selected 中,因此需要去掉一个
  • 一共有两类三种的选择方式:
    • 【第一种】第 10 至 16 行:获得非选择过( invokers ), 并且必须可用的 Invoker 集合。
    • 【第二种】第 22 至 27 行:获得非选择过( invokers ), 并且不考虑可用的 Invoker 集合。
    • 【第三种】第 36 至 44 行:获得选择过( selected ),并且必须可用的 Invoker 集合。
  • 第 19 行 || 第 30 行 || 第 47 行:调用 Loadbalance#select(invokers, url, invocation) 方法,使用 Loadbalance ,选择一个 Invoker 对象。

6.4 invoke

#invoke(invocation) 方法,调用服务提供者的逻辑。代码如下:

 1: @Override
2: public Result invoke(final Invocation invocation) throws RpcException {
3: // 校验是否销毁
4: checkWhetherDestroyed();
5:
6: // 获得所有服务提供者 Invoker 集合
7: List<Invoker<T>> invokers = list(invocation);
8:
9: // 获得 LoadBalance 对象
10: LoadBalance loadbalance;
11: if (invokers != null && !invokers.isEmpty()) {
12: loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
13: .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
14: } else {
15: loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
16: }
17:
18: // 设置调用编号,若是异步调用
19: RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
20:
21: // 执行调用
22: return doInvoke(invocation, invokers, loadbalance);
23: }
  • 第 4 行:调用 #checkWhetherDestroyed() 方法,校验是否已经销毁。代码如下:

    protected void checkWhetherDestroyed() {
    if (destroyed.get()) {
    throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()
    + " use dubbo version " + Version.getVersion()
    + " is now destroyed! Can not invoke any more.");
    }
    }
  • 第 7 行:调用 #list(invocation) 方法,基于 Directory ,获得所有服务提供者 Invoker 集合。

  • 第 9 至 16 行:获得 Loadbalance 对象。
  • 第 19 行:调用 RpcUtils#attachInvocationIdIfAsync(url, invocation) 方法,设置调用编号,若是异步调用。
  • 第 22 行:调用 #doInvoke(invocation, invokers, loadbalance) 抽象方法,执行调用。🙂 子 Cluster 的 Invoker 实现类的服务调用的差异逻辑。

6.5 其它实现方法

6.5.1 getInterface

@Override
public Class<T> getInterface() {
return directory.getInterface();
}

6.5.2 getUrl

@Override
public URL getUrl() {
return directory.getUrl();
}

6.5.3 isAvailable

@Override
public boolean isAvailable() {
// 如有粘滞连接 Invoker ,基于它判断。
Invoker<T> invoker = stickyInvoker; // 指向,避免并发
if (invoker != null) {
return invoker.isAvailable();
}
// 基于 Directory 判断
return directory.isAvailable();
}

6.5.4 checkInvokers

protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
throw new RpcException("Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". No provider available for the service " + directory.getUrl().getServiceKey()
+ " from registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ". Please check if the providers have been started and registered.");
}
}

6.5.5 destroy

@Override
public void destroy() {
if (destroyed.compareAndSet(false, true)) {
directory.destroy();
}
}

7. FailoverClusterInvoker

com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker ,实现 AbstractClusterInvoker 抽象类,FailoverCluster Invoker 实现类。

失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

在看具体的 #doInvoke(Invocation, List<Invoker<T>>, LoadBalance) 的实现代码之前,我们先来瞅瞅调用顺序图

调用顺序图

  • 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功

#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) 方法,代码如下:

 1: @Override
2: public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
3: List<Invoker<T>> copyinvokers = invokers;
4: // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
5: checkInvokers(copyinvokers, invocation);
6: // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
7: int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
8: if (len <= 0) {
9: len = 1;
10: }
11: // 保存最后一次调用的异常
12: RpcException le = null;
13: // 保存已经调用过的Invoker
14: List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
15: Set<String> providers = new HashSet<String>(len);
16: // failover机制核心实现:如果出现调用失败,那么重试其他服务器
17: for (int i = 0; i < len; i++) {
18: // 重试时,进行重新选择,避免重试时invoker列表已发生变化.
19: // 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
20: if (i > 0) {
21: checkWhetherDestroyed();
22: // 根据Invocation调用信息从Directory中获取所有可用Invoker
23: copyinvokers = list(invocation);
24: // check again
25: // 重新检查一下
26: checkInvokers(copyinvokers, invocation);
27: }
28: // 根据负载均衡机制从copyinvokers中选择一个Invoker
29: Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
30: // 保存每次调用的Invoker
31: invoked.add(invoker);
32: // 设置已经调用的 Invoker 集合,到 Context 中
33: RpcContext.getContext().setInvokers((List) invoked);
34: try {
35: // RPC 调用得到 Result
36: Result result = invoker.invoke(invocation);
37: // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
38: if (le != null && logger.isWarnEnabled()) {
39: logger.warn("Although retry the method " + invocation.getMethodName()
40: + " in the service " + getInterface().getName()
41: + " was successful by the provider " + invoker.getUrl().getAddress()
42: + ", but there have been failed providers " + providers
43: + " (" + providers.size() + "/" + copyinvokers.size()
44: + ") from the registry " + directory.getUrl().getAddress()
45: + " on the consumer " + NetUtils.getLocalHost()
46: + " using the dubbo version " + Version.getVersion() + ". Last error is: "
47: + le.getMessage(), le);
48: }
49: return result;
50: } catch (RpcException e) {
51: // 如果是业务性质的异常,不再重试,直接抛出
52: if (e.isBiz()) { // biz exception.
53: throw e;
54: }
55: // 其他性质的异常统一封装成RpcException
56: le = e;
57: } catch (Throwable e) {
58: le = new RpcException(e.getMessage(), e);
59: } finally {
60: providers.add(invoker.getUrl().getAddress());
61: }
62: }
63: // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
64: throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
65: + invocation.getMethodName() + " in the service " + getInterface().getName()
66: + ". Tried " + len + " times of the providers " + providers
67: + " (" + providers.size() + "/" + copyinvokers.size()
68: + ") from the registry " + directory.getUrl().getAddress()
69: + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
70: + Version.getVersion() + ". Last error is: "
71: + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
72: }
  • 第 3 行:copyinvokers 变量,候选的 Invoker 集合。
  • 第 5 行:调用 #checkInvokers(copyinvokers, invocation) 方法,校验候选的 Invoker 集合非空。如果为空,抛出 RpcException 异常。
  • 第 6 至 10 行:获得最大可调用次数:最大可重试次数 +1 。默认最大可重试次数Constants.DEFAULT_RETRIES = 2
  • 第 12 行:le 变量,保存最后一次调用的异常
  • 第 14 行:invoked 变量,保存已经调用的 Invoker 集合。
  • 第 15 行:providers 变量,保存已经调用的网络地址集合。
  • 第 16 至 62 行:failover 机制核心实现:如果出现调用失败,那么重试其他服务器
    • 第 20 至 27 行:重试时( i > 0 ), 进行重新选择,避免重试时,候选 Invoker 集合,已发生变化。
    • 【重要】第 29 行:调用 #select(loadbalance, invocation, copyinvokers, invoked) 方法,根据 Loadbalance 负载均衡机制,从 copyinvokers 中,选择一个被调用的 Invoker 对象。
    • 第 31 行:保存每次调用的 Invoker 对象,到 invoked 中。
    • 第 33 行:保存已经调用的 Invoker 集合,到 Context 中。
    • 【重要】第 36 行:调用 Invoker#invoke(invocation) 方法,发起 RPC 调用
    • 第 37 至 48 行:若 le 非空,说明此时是重试调用成功,将最后一次调用的异常信息以 warn 级别日志输出,方便未来追溯。
    • ========== 异常相关 ===========
    • 第 55 至 54 行:如果是业务性质的异常,不再重试,直接抛出。
    • 第 56 行:保存异常到 le
    • 第 58 行:非 RpcException 异常,封装成 RpcException 异常。
    • 第 59 至 61 行:保存每次调用的网络地址,到 providers 中。
  • 第 63 至 71 行:超过最大调用次数,抛出 RpcException 异常。该异常中,带有最后一次调用异常的信息。

666. 彩蛋

知识星球

刚开始看集群容错,真的是一脸懵逼。

感谢我飞哥。

总访客数 && 总访问量