我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Dubbo 源码解析 —— 集群容错(五)之 Merger 实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码解析 —— 集群容错(四)之 LoadBalance 实现》 一文,分享 dubbo-cluster 模块, merger 包,各种 Merger 实现类

Merger 相关类,如下图:

Merger 相关类

我们可以看到,目前一共有两部分

  • Merger 以及其实现类。
  • MergerCluster 以及其 MergerClusterInvoker

老艿艿:本文对应 《Dubbo 用户指南 —— 分组聚合》 文档。

2. Merger

com.alibaba.dubbo.rpc.cluster.Merger ,Merger 接口,提供接口方法,将对象数组合并成一个对象。代码如下:

@SPI
public interface Merger<T> {

/**
* 合并 T 数组,返回合并后的 T 对象
*
* @param items T 数组
* @return T 对象
*/
T merge(T... items);

}
  • @SPI 注解,Dubbo SPI 拓展点,无默认值。

2.1 Merger 实现类

Merger 内置十二个实现类,从代码上看基本类似。我们以 MapMerger 和 ShortArrayMerger 作为例子。

2.1.1 MapMerger

com.alibaba.dubbo.rpc.cluster.merger.MapMerger ,实现 Merger 接口,Map Merger 实现类。代码如下:

public class MapMerger implements Merger<Map<?, ?>> {

@Override
public Map<?, ?> merge(Map<?, ?>... items) {
if (items.length == 0) {
return null;
}
// 创建结果 Map
Map<Object, Object> result = new HashMap<Object, Object>();
// 合并多个 Map
for (Map<?, ?> item : items) {
if (item != null) {
result.putAll(item);
}
}
return result;
}

}

2.1.2 ShortArrayMerger

com.alibaba.dubbo.rpc.cluster.merger.ShortArrayMerger ,实现 Merger 接口,Short 数组 Merger 实现类。代码如下:

public class ShortArrayMerger implements Merger<short[]> {

@Override
public short[] merge(short[]... items) {
// 计算合并后的数组大小
int total = 0;
for (short[] array : items) {
total += array.length;
}
// 创建结果数组
short[] result = new short[total];
// 合并多个数组
int index = 0;
for (short[] array : items) {
for (short item : array) {
result[index++] = item;
}
}
return result;
}

}

2.2 MergerFactory

com.alibaba.dubbo.rpc.cluster.merger.MergerFactory ,Merger 工厂类,提供 #getMerger(Class<T> returnType) 方法,获得指定类对应的 Merger 对象。代码如下:

public class MergerFactory {

/**
* Merger 对象缓存
*/
private static final ConcurrentMap<Class<?>, Merger<?>> mergerCache = new ConcurrentHashMap<Class<?>, Merger<?>>();

public static <T> Merger<T> getMerger(Class<T> returnType) {
Merger result;
// 数组类型
if (returnType.isArray()) {
Class type = returnType.getComponentType();
// 从缓存中获得 Merger 对象
result = mergerCache.get(type);
if (result == null) {
loadMergers();
result = mergerCache.get(type);
}
// 获取不到,使用 ArrayMerger
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
// 普通类型
} else {
// 从缓存中获得 Merger 对象
result = mergerCache.get(returnType);
if (result == null) {
loadMergers();
result = mergerCache.get(returnType);
}
}
return result;
}

/**
* 初始化所有的 Merger 拓展对象,到 mergerCache 缓存中。
*/
static void loadMergers() {
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class).getSupportedExtensions();
for (String name : names) {
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
}
}

}

3. MergeableCluster

com.alibaba.dubbo.rpc.cluster.support.MergeableCluster ,实现 Cluster 接口,分组聚合 Cluster 实现类。代码如下:

public class MergeableCluster implements Cluster {

public static final String NAME = "mergeable";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MergeableClusterInvoker<T>(directory);
}

}
  • 对应 Invoker 实现类为 MergeableClusterInvoker 。

Merger 的使用,需要设置 Cluster 的实现类为 MergeableCluster 。但是呢,它的配置方式,和其他 Cluster 实现类不同。

3.1 MergeableClusterInvoker

com.alibaba.dubbo.rpc.cluster.support.MergeableClusterInvoker ,实现 Invoker 接口,MergeableCluster Invoker 实现类。代码如下:

/**
* Directory$Adaptive 对象
*/
private final Directory<T> directory;
/**
* ExecutorService 对象,并且为 CachedThreadPool 。
*/
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));

1: @Override
2: public Result invoke(final Invocation invocation) throws RpcException {
3: // 获得 Invoker 集合
4: List<Invoker<T>> invokers = directory.list(invocation);
5: // 获得 Merger 拓展名
6: String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
7: // 若果未配置拓展,直接调用首个可用的 Invoker 对象
8: if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
9: for (final Invoker<T> invoker : invokers) {
10: if (invoker.isAvailable()) {
11: return invoker.invoke(invocation);
12: }
13: }
14: return invokers.iterator().next().invoke(invocation);
15: }
16:
17: // 通过反射,获得返回类型
18: Class<?> returnType;
19: try {
20: returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
21: } catch (NoSuchMethodException e) {
22: returnType = null;
23: }
24:
25: // 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中
26: Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
27: for (final Invoker<T> invoker : invokers) {
28: Future<Result> future = executor.submit(new Callable<Result>() {
29: public Result call() {
30: // RPC 调用
31: return invoker.invoke(new RpcInvocation(invocation, invoker));
32: }
33: });
34: results.put(invoker.getUrl().getServiceKey(), future);
35: }
36:
37: // 阻塞等待执行执行结果,并添加到 resultList 中
38: List<Result> resultList = new ArrayList<Result>(results.size());
39: int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
40: for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
41: Future<Result> future = entry.getValue();
42: try {
43: Result r = future.get(timeout, TimeUnit.MILLISECONDS);
44: if (r.hasException()) { // 异常 Result ,打印错误日志,忽略
45: log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
46: } else { // 正常 Result ,添加到 resultList 中
47: resultList.add(r);
48: }
49: } catch (Exception e) { // 异常,抛出 RpcException 异常
50: throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
51: }
52: }
53:
54: // 结果大小为空,返回空的 RpcResult
55: if (resultList.isEmpty()) {
56: return new RpcResult((Object) null);
57: // 结果大小为 1 ,返回首个 RpcResult
58: } else if (resultList.size() == 1) {
59: return resultList.iterator().next();
60: }
61: // 返回类型为 void ,返回空的 RpcResult
62: if (returnType == void.class) {
63: return new RpcResult((Object) null);
64: }
65:
66: Object result;
67: // 【第 1 种】基于合并方法
68: if (merger.startsWith(".")) {
69: // 获得合并方法 Method
70: merger = merger.substring(1);
71: Method method;
72: try {
73: method = returnType.getMethod(merger, returnType);
74: } catch (NoSuchMethodException e) {
75: throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
76: }
77: // 有 Method ,进行合并
78: if (method != null) {
79: if (!Modifier.isPublic(method.getModifiers())) {
80: method.setAccessible(true);
81: }
82: result = resultList.remove(0).getValue();
83: try {
84: // 方法返回类型匹配,合并时,修改 result
85: if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
86: for (Result r : resultList) {
87: result = method.invoke(result, r.getValue());
88: }
89: // 方法返回类型不匹配,合并时,不修改 result
90: } else {
91: for (Result r : resultList) {
92: method.invoke(result, r.getValue());
93: }
94: }
95: } catch (Exception e) {
96: throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
97: }
98: // 无 Method ,抛出 RpcException 异常
99: } else {
100: throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
101: }
102: // 【第 2 种】基于 Merger
103: } else {
104: Merger resultMerger;
105: // 【第 2.1 种】根据返回值类型自动匹配 Merger
106: if (ConfigUtils.isDefault(merger)) {
107: resultMerger = MergerFactory.getMerger(returnType);
108: // 【第 2.2 种】指定 Merger
109: } else {
110: resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
111: }
112: // 有 Merger ,进行合并
113: if (resultMerger != null) {
114: List<Object> rets = new ArrayList<Object>(resultList.size());
115: for (Result r : resultList) {
116: rets.add(r.getValue());
117: }
118: result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
119: // 无 Merger ,抛出 RpcException 异常
120: } else {
121: throw new RpcException("There is no merger to merge result.");
122: }
123: }
124: // 返回 RpcResult 结果
125: return new RpcResult(result);
126: }
  • 🙂 看似比较长,实际很易懂。
  • 第 4 行:调用 Directory#list(invocation) 方法,获得服务 Invoker 集合
  • 第 6 行:调用 URL#getMethodParameter(methodName, "merger") 方法,获得 Merger 拓展名,方法级
  • 第 7 至 15 行:若未配置 Merger 拓展名,优先调用首个可用的 Invoker 对象,其次调用首个 Invoker 对象。
  • 第 17 至 23 行:通过反射,获得调用方法的返回类型
  • 第 25 至 35 行:提交线程池,并行执行,发起 RPC 调用,并添加 Future 到 results 中。
  • 第 37 至 52 行:阻塞等待执行结果,并添加到 resultList 中。注意,分成正常 Result、异常 Result(忽略)、Exception 三种情况。
  • 第 54 至 56 行:结果大小为,返回的 RpcResult 。
  • 第 57 至 60 行:结果大小为 1 ,返回首个 RpcResult 。
  • 第 61 至 64 行:返回类型为 void ,返回的 RpcResult 。
  • ========== 【第 1 种】基于 Method 合并==========
  • 第 68 行:若 merger"." 开头,指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型本身
  • 第 69 至 76 行:调用 Class#getMethod(String name, Class<?>... parameterTypes) 方法,获得合并方法 Method 。这个方法,意味着“合并方法的参数类型必须是返回结果类型本身”!!!具体原因,见 《dubbo源码-集群容错之MergeableCluster》 ,搜索 "在条件分支if ( merger.startsWith(".") ) {}"
  • 第 77 至 97 行: Method ,循环调用 Method#invoke(Object obj, Object... args) 方法,进行合并。
  • 第 98 至 101 行: Method ,抛出 RpcException 异常。
  • ========== 【第 2 种】基于 Merger 合并 ==========
  • 【第 2.1 种】第 105 至 107 行:当 merger"default""true" 时,调用 MergerFactory#getMerger(Class<T> returnType) 方法,根据返回值类型自动匹配 Merger 。
  • 【第 2.2 种】第 108 至 111 行:调用 ExtensionLoader#getExtension(merger) 方法啊,获得指定 Merger 。
  • 第 112 至 118 行: Merger ,循环调用 Merger#merge(T... items) 方法,进行合并。
  • 第 119 至 122 行: Method ,抛出 RpcException 异常。

666. 彩蛋

知识星球

啰嗦了,啰嗦了。

总访客数 && 总访问量