public class MergeableCluster implements Cluster { public static final String NAME = "mergeable"; @Override public Invoker join(Directory directory) throws RpcException { // 创建MergeableClusterInvoker return new MergeableClusterInvoker (directory); }}
该类实现了Cluster接口,是分组集合的集群实现。
(二)MergeableClusterInvoker
该类是分组聚合的实现类,其中最关机的就是invoke方法。
@Override@SuppressWarnings("rawtypes")public Result invoke(final Invocation invocation) throws RpcException { // 获得invoker集合 List > invokers = directory.list(invocation); /** * 获得是否merger */ String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); // 如果没有设置需要聚合,则只调用一个invoker的 if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group // 只要有一个可用就返回 for (final Invoker invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } return invokers.iterator().next().invoke(invocation); } // 返回类型 Class returnType; try { // 获得返回类型 returnType = getInterface().getMethod( invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } // 结果集合 Map > results = new HashMap >(); // 循环invokers for (final Invoker invoker : invokers) { // 获得每次调用的future Future future = executor.submit(new Callable () { @Override public Result call() throws Exception { // 回调,把返回结果放入future return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); // 加入集合 results.put(invoker.getUrl().getServiceKey(), future); } Object result = null; List resultList = new ArrayList (results.size()); // 获得超时时间 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 遍历每一个结果 for (Map.Entry > entry : results.entrySet()) { Future future = entry.getValue(); try { // 获得调用返回的结果 Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: " + r.getException().getMessage(), r.getException()); } else { // 加入集合 resultList.add(r); } } catch (Exception e) { throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e); } } // 如果为空,则返回空的结果 if (resultList.isEmpty()) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { // 如果只有一个结果,则返回该结果 return resultList.iterator().next(); } // 如果返回类型是void,也就是没有返回值,那么返回空结果 if (returnType == void.class) { return new RpcResult((Object) null); } // 根据方法来合并,将调用返回结果的指定方法进行合并 if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { // 获得方法 method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + returnType.getClass().getName() + " ]"); } // 有 Method ,进行合并 if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } // 从集合中移除 result = resultList.remove(0).getValue(); try { // 方法返回类型匹配,合并时,修改 result if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { // 方法返回类型不匹配,合并时,不修改 result for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException("Can not merge result: " + e.getMessage(), e); } } else { // 基于 Merger Merger resultMerger; // 如果是默认的方式 if (ConfigUtils.isDefault(merger)) { // 获得该类型的合并方式 resultMerger = MergerFactory.getMerger(returnType); } else { // 如果不是默认的,则配置中指定获得Merger的实现类 resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List