博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo源码解析(三十九)集群——merger
阅读量:6634 次
发布时间:2019-06-25

本文共 7599 字,大约阅读时间需要 25 分钟。

集群——merger

目标:介绍dubbo中集群的分组聚合,介绍dubbo-cluster下merger包的源码。

前言

按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。这个时候就要用到分组聚合。

源码分析

(一)MergeableCluster

public class MergeableCluster implements Cluster {    public static final String NAME = "mergeable";    @Override    public 
Invoker
join(Directory
directory) throws RpcException { // 创建MergeableClusterInvoker return new MergeableClusterInvoker
(directory); }}

该类实现了Cluster接口,是分组集合的集群实现。

(二)MergeableClusterInvoker

该类是分组聚合的实现类,其中最关机的就是invoke方法。

@Override@SuppressWarnings("rawtypes")public Result invoke(final Invocation invocation) throws RpcException {    // 获得invoker集合    List
> invokers = directory.list(invocation); /** * 获得是否merger */ String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); // 如果没有设置需要聚合,则只调用一个invoker的 if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group // 只要有一个可用就返回 for (final Invoker
invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } return invokers.iterator().next().invoke(invocation); } // 返回类型 Class
returnType; try { // 获得返回类型 returnType = getInterface().getMethod( invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } // 结果集合 Map
> results = new HashMap
>(); // 循环invokers for (final Invoker
invoker : invokers) { // 获得每次调用的future Future
future = executor.submit(new Callable
() { @Override public Result call() throws Exception { // 回调,把返回结果放入future return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); // 加入集合 results.put(invoker.getUrl().getServiceKey(), future); } Object result = null; List
resultList = new ArrayList
(results.size()); // 获得超时时间 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 遍历每一个结果 for (Map.Entry
> entry : results.entrySet()) { Future
future = entry.getValue(); try { // 获得调用返回的结果 Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: " + r.getException().getMessage(), r.getException()); } else { // 加入集合 resultList.add(r); } } catch (Exception e) { throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e); } } // 如果为空,则返回空的结果 if (resultList.isEmpty()) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { // 如果只有一个结果,则返回该结果 return resultList.iterator().next(); } // 如果返回类型是void,也就是没有返回值,那么返回空结果 if (returnType == void.class) { return new RpcResult((Object) null); } // 根据方法来合并,将调用返回结果的指定方法进行合并 if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { // 获得方法 method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + returnType.getClass().getName() + " ]"); } // 有 Method ,进行合并 if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } // 从集合中移除 result = resultList.remove(0).getValue(); try { // 方法返回类型匹配,合并时,修改 result if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { // 方法返回类型不匹配,合并时,不修改 result for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException("Can not merge result: " + e.getMessage(), e); } } else { // 基于 Merger Merger resultMerger; // 如果是默认的方式 if (ConfigUtils.isDefault(merger)) { // 获得该类型的合并方式 resultMerger = MergerFactory.getMerger(returnType); } else { // 如果不是默认的,则配置中指定获得Merger的实现类 resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List
rets = new ArrayList(resultList.size()); // 遍历返回结果 for (Result r : resultList) { // 加入到rets rets.add(r.getValue()); } // 合并 result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType, 0))); } else { throw new RpcException("There is no merger to merge result."); } } // 返回结果 return new RpcResult(result);}

前面部分在讲获得调用的结果,后面部分是对结果的合并,合并有两种方式,根据配置不同可用分为基于方法的合并和基于merger的合并。

(三)MergerFactory

Merger 工厂类,获得指定类型的Merger 对象。

public class MergerFactory {    /**     * Merger 对象缓存     */    private static final ConcurrentMap
, Merger
> mergerCache = new ConcurrentHashMap
, Merger
>(); /** * 获得指定类型的Merger对象 * @param returnType * @param
* @return */ public static
Merger
getMerger(Class
returnType) { Merger result; // 如果类型是集合 if (returnType.isArray()) { // 获得类型 Class type = returnType.getComponentType(); // 从缓存中获得该类型的Merger对象 result = mergerCache.get(type); // 如果为空,则 if (result == null) { // 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。 loadMergers(); // 从集合中取出对应的Merger对象 result = mergerCache.get(type); } // 如果结果为空,则直接返回ArrayMerger的单例 if (result == null && !type.isPrimitive()) { result = ArrayMerger.INSTANCE; } } else { // 否则直接从mergerCache中取出 result = mergerCache.get(returnType); // 如果为空 if (result == null) { // 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。 loadMergers(); // 从集合中取出 result = mergerCache.get(returnType); } } return result; } /** * 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。 */ static void loadMergers() { // 获得Merger所有的扩展对象名 Set
names = ExtensionLoader.getExtensionLoader(Merger.class) .getSupportedExtensions(); // 遍历 for (String name : names) { // 加载每一个扩展实现,然后放入缓存。 Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name); mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m); } }}

逻辑比较简单。

(四)ArrayMerger

因为不同的类型有不同的Merger实现,我们可以来看看这个图片:

merger

可以看到有好多好多,我就讲解其中的一种,偷懒一下,其他的麻烦有兴趣的去看看源码了。

public class ArrayMerger implements Merger
{ /** * 单例 */ public static final ArrayMerger INSTANCE = new ArrayMerger(); @Override public Object[] merge(Object[]... others) { // 如果长度为0 则直接返回 if (others.length == 0) { return null; } // 总长 int totalLen = 0; // 遍历所有需要合并的对象 for (int i = 0; i < others.length; i++) { Object item = others[i]; // 如果为数组 if (item != null && item.getClass().isArray()) { // 累加数组长度 totalLen += Array.getLength(item); } else { throw new IllegalArgumentException((i + 1) + "th argument is not an array"); } } if (totalLen == 0) { return null; } // 获得数组类型 Class
type = others[0].getClass().getComponentType(); // 创建长度 Object result = Array.newInstance(type, totalLen); int index = 0; // 遍历需要合并的对象 for (Object array : others) { // 遍历每个数组中的数据 for (int i = 0; i < Array.getLength(array); i++) { // 加入到最终结果中 Array.set(result, index++, Array.get(array, i)); } } return (Object[]) result; }}

是不是很简单,就是循环合并就可以了。

后记

该部分相关的源码解析地址:

该文章讲解了集群中关于分组聚合实现的部分。接下来我将开始对集群模块关于路由部分进行讲解。

转载地址:http://uwdvo.baihongyu.com/

你可能感兴趣的文章
一段代码,SQL注入猜解数据库用户密码
查看>>
wcf 基础教程 契约 Contract 控制xml输出 数据契约DataContract序列化前身 XmlSerializer xml序列化...
查看>>
概率中国一种没有语料字典的分词方法
查看>>
类型缩放Google map 地图类型
查看>>
Hash Table
查看>>
Renderer.materials
查看>>
UITableView 顶部能够放大的图片
查看>>
PHP概率算法(适用于抽奖、随机广告)
查看>>
C# 程序性能提升篇-1、装箱和拆箱,枚举的ToString浅析
查看>>
EXP-00056遇到Oracle错误1455问题解决办法
查看>>
20款时尚的 WordPress 企业模板【免费主题下载】
查看>>
SQLSERVER 里经常看到的CACHE STORES是神马东东?
查看>>
java中如何生成可执行的jar文件
查看>>
java中synchronized使用方法
查看>>
vim常用命令
查看>>
整型变量
查看>>
微信公众平台开发(92) 多客服(转)
查看>>
自制Unity小游戏TankHero-2D(1)制作主角坦克
查看>>
DevExpress控件水印文字提示 z
查看>>
【LeetCode OJ】Same Tree
查看>>