• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

十三Dubbo负载均衡七router的配置以和ConditionRouter源码

武飞扬头像
焦虑的二狗
帮助1

路由使用:

学新通
学新通
配置后consumer在获取到集群服务的invoker后 会针对 port为20881的进行过滤

介绍 及 源码

dubbo的路由规则,主要包括 Router、RouterFactory 两个核心接口和RouterChain一个核心类;其中RouterChain是入口,RouterChain会通过RouterFactory生成Router放入缓存list,然后遍历执行route;而RouterChain主要用于Directory.list方法,根据Invocation会话列出当前符合条件(路由、负载均衡、可用)的Invoker;dubbo提供几种内置Router,主要分为两类:静态Router(ConditionRouter、ScriptRouter)和动态Router(实现ConfigurationListener接口,比如TagRouter、AppRouter、ServiceRouter),主要区别在于支持路由规则动态更新。下面从RouterChain开始,逐个分析。
RouterChain内部逻辑非常简单,核心关注私有构造方法以及route方法:

private RouterChain(URL url) {
    //加载RouterFactory SPI扩展
    List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
            .getActivateExtension(url, (String[]) null);
    // 通过RouterFactory生成对应Router
    List<Router> routers = extensionFactories.stream()
            .map(factory -> factory.getRouter(url))
            .collect(Collectors.toList());
    // 初始化内置Router
    initWithRouters(routers);
}

public void initWithRouters(List<Router> builtinRouters) {
   this.builtinRouters = builtinRouters;
   this.routers = new CopyOnWriteArrayList<>(builtinRouters);
   // 排序规则,先根据priority排然后是url.fullString
   this.sort();
}

// route,遍历调用所有routers的route,返回过滤后的Invoker
public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
}
学新通

RouterFactory支持SPI,dubbo内置实现 AppRouterFactory、ServiceRouterFactory、TagRouterFactory、CacheableRouterFactory、ConditionRouterFactory、ScriptRouterFactory等,分别对应AppRouter、ServiceRouter、TagRouter、ConditionRouter、ScripRouter;RouterFactory逻辑非常简单,就不上代码了。
了解完RouterChain和RouterFactory,下面来关注各种Router。先来看ConditionRouter(非常重要,动态Router依赖ConditionRouter实现),顾名思义,条件路由,内置路由规则格式正则以及两个条件map(whenCondition、thenCondition,这两个条件map设计的非常有意思)。先从构造方法开始:

public ConditionRouter(URL url) {
    this.url = url;
    // 从url获取制定参数
    this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
    this.force = url.getParameter(Constants.FORCE_KEY, false);
    this.enabled = url.getParameter(Constants.ENABLED_KEY, true);
    // 根据url配置的路由规则,执行核心初始化逻辑
    init(url.getParameterAndDecoded(Constants.RULE_KEY));
}

public void init(String rule) {
        try {
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            // 这里可以猜想下rule的格式
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");

			/**
			* eg:  method = sayHello,addNum => port != 20880
			* 这种情况下,whenRule为consumer -> method = sayHello,addNum,thenRule为provider -> port != 20880
			* 表示 当consumer执行到method为sayHello时,则判断 provider的port是否不等于20880
			**/

            // 截取whenRule,用于解析whenCondition
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            // 同样的,截取thenRule,用于解析thenCondition
            String thenRule = i < 0 ? rule.trim() : rule.substring(i   2).trim();
            // 解析whenContion和thenCondition
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            /**
            *	eg:
            *	when -> key = method, value = MatchPair{matches = [sayHello,addNum] }
            *	then -> key = port, value = MatchPair{mismatches = [20880]}
            **/
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
}

//顺便看一下MatchPair的设计
protected static final class MatchPair {
    // 匹配结果集
    final Set<String> matches = new HashSet<String>();
    // 不匹配结果集
    final Set<String> mismatches = new HashSet<String>();
        // 判断是否匹配
    private boolean isMatch(String value, URL param) {
        if (!matches.isEmpty() && mismatches.isEmpty()) {
            for (String match : matches) {
               if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                     return true;
               }
            }
                return false;
        }

        if (!mismatches.isEmpty() && matches.isEmpty()) {
            for (String mismatch : mismatches) {
                if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                    return false;
                }
            }
            return true;
        }

        if (!matches.isEmpty() && !mismatches.isEmpty()) {
            // 即在matches又在mismatches,认为是mismatches
            //when both mismatches and matches contain the same value, then using mismatches first
            for (String mismatch : mismatches) {
                 if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                       return false;
                 }
            }
            for (String match : matches) {
                if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                     return true;
                }
            }
            return false;
        }
            return false;
    }
}

//然后再来看规则解析,rule格式类似 
// host = 2.2.2.2,1.1.1.1,3.3.3.3" 或者 "host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3"
// 以后者为例,解析结果是 key = host,value = matchpair [matches:{"2.2.2.2","1.1.1.1","3.3.3.3"},misMatches:{"4.4.4.4"]
private static Map<String, MatchPair> parseRule(String rule) throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if (StringUtils.isBlank(rule)) {
            return condition;
        }
        // Key-Value pair, stores both match and mismatch conditions
        MatchPair pair = null;
        // Multiple values
        Set<String> values = null;
            //正则解析rule
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        while (matcher.find()) { // Try to match one by one
            // separator = "" 
            String separator = matcher.group(1);
            // content = "host"
            String content = matcher.group(2);
            // Start part of the condition expression.
            if (StringUtils.isEmpty(separator)) {
                pair = new MatchPair();
                condition.put(content, pair);
            }
            // The KV part of the condition expression
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    pair = condition.get(content);
                }
            }
            // The Value in the KV part.
            else if ("=".equals(separator)) {
                if (pair == null) {
                    //抛异常,省略
                }

                values = pair.matches;
                values.add(content);
            }
            // The Value in the KV part.
            else if ("!=".equals(separator)) {
                if (pair == null) {
                  // 抛异常,省略
                }
                values = pair.mismatches;
                values.add(content);
            }
            // The Value in the KV part, if Value have more than one items.多值用,分割
            else if (",".equals(separator)) { // Should be separated by ','
                if (values == null || values.isEmpty()) {
                   // 抛异常,省略
                }
                values.add(content);
            } else {
                 // 抛异常,省略
            }
        }
        return condition;

学新通

上面就是ConditionRouter的初始化过程,route方法的逻辑比较简单,先做条件校验,不满足则直接返回原invokers;满足则将匹配then条件的放入结果集返回,重点关注mathCondition方法。

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
        throws RpcException {
    // 不可用,则直接返回原invokers
    if (!enabled) {
        return invokers;
    }

    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }
    try {
        // 不满足when条件,直接返回所有invokers,不做条件过滤
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: "   NetUtils.getLocalHost()   ", service: "   url.getServiceKey());
            return result;
        }
        // 匹配then规则的放入结果集
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        if (!result.isEmpty()) {
            return result;
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: "   NetUtils.getLocalHost()   ", service: "   url.getServiceKey()   ", router: "   url.getParameterAndDecoded(Constants.RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: "   getUrl()   ", invokers: "   invokers   ", cause: "   t.getMessage(), t);
    }
    return invokers;
}

private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
        Map<String, String> sample = url.toMap();
        boolean result = false;
        for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
            String key = matchPair.getKey();
            String sampleValue;
            //get real invoked method name from invocation
            // 匹配的维度包括method、address、host,以及URL中的自定义参数
            if (invocation != null && (Constants.METHOD_KEY.equals(key) || Constants.METHODS_KEY.equals(key))) {
                sampleValue = invocation.getMethodName();
            } else if (Constants.ADDRESS_KEY.equals(key)) {
                sampleValue = url.getAddress();
            } else if (Constants.HOST_KEY.equals(key)) {
                sampleValue = url.getHost();
            } else {
                sampleValue = sample.get(key);
                if (sampleValue == null) {
                    sampleValue = sample.get(Constants.DEFAULT_KEY_PREFIX   key);
                }
            }
            // 匹配成功 true,否则false;
            if (sampleValue != null) {
                if (!matchPair.getValue().isMatch(sampleValue, param)) {
                    return false;
                } else {
                    result = true;
                }
            } else {
                //not pass the condition
                if (!matchPair.getValue().matches.isEmpty()) {
                    return false;
                } else {
                    result = true;
                }
            }
        }
        return result;
    }
学新通

了解完静态Router,下面来看依赖ConditionRouter的其他动态Router(TagRouter、AppRouter、ServiceRouter);其中AppRouter与ServiceRouter一样(唯一区别在于ruleKey,分别取自URL的application和serviceKey),继承自ListenableRouter,核心逻辑也在ListenableRouter,所以直接看ListenableRouter。LisenableRouter内置ConditionRule和ConditionRouter列表,核心的route方法也是借助ConditionRouter实现,下面是初始化逻辑的核心代码

// 构造方法,核心关注init
public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) {
    super(configuration, url);
    this.force = false;
    this.init(ruleKey);
}

private synchronized void init(String ruleKey) {
   if (StringUtils.isEmpty(ruleKey)) {
       return;
   }
   // routerKey = ${applicationName}.condition-router
   String routerKey = ruleKey   RULE_SUFFIX;
   // 监听配置变更
   configuration.addListener(routerKey, this);
   String rule = configuration.getConfig(routerKey);
   if (rule != null) {
       // 处理规则变更事件,ConfigurationListener接口process方法的实现
       this.process(new ConfigChangeEvent(routerKey, rule));
   }
}

// 路由规则动态变更事件处理,初始化会调用
public synchronized void process(ConfigChangeEvent event) {
        if (logger.isInfoEnabled()) {
            logger.info("Notification of condition rule, change type is: "   event.getChangeType()  
                    ", raw rule is:\n "   event.getValue());
        }
                // 删除事件,则置空routerRule和conditonRouters
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            routerRule = null;
            conditionRouters = Collections.emptyList();
        } else {
            try {
                // 路由规则解析(涉及yml解析),根据规则生成conditonRouter,逻辑比较简单,就不单独列了
                routerRule = ConditionRuleParser.parse(event.getValue());
                generateConditions(routerRule);
            } catch (Exception e) {
             //异常处理,忽略
            }
        }
}
学新通

route逻辑也比较简单

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
        return invokers;
    }
    // We will check enabled status inside each router.
    // 结果需要满足所有router规则
    for (Router router : conditionRouters) {
        invokers = router.route(invokers, url, invocation);
    }
    return invokers;
}

最后来看TagRouter,TagRouter内置TagRouterRule,核心route逻辑完全自己实现;另外TagRouter也是唯一个一个地个实现Router接口notify方法的router。初始化过程比较简单,直接来看核心逻辑route和notify。

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }

    // tagRouterRule 无效,则使用静态tag进行路由
    if (tagRouterRule == null || !tagRouterRule.isValid() || !tagRouterRule.isEnabled()) {
        return filterUsingStaticTag(invokers, url, invocation);
    }

    List<Invoker<T>> result = invokers;
    // 优先取invocation中的tag
    String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
            invocation.getAttachment(TAG_KEY);

    // if we are requesting for a Provider with a specific tag
    // 有指定tag,则优先使用tagRouterRule中的address过滤;
    // 过滤结果非空且要求强制使用tagRule,则直接返回;否则根据tag再次过滤
    if (StringUtils.isNotEmpty(tag)) {
        List<String> addresses = tagRouterRule.getTagnameToAddresses().get(tag);
        // filter by dynamic tag group first
        // 根据地址过滤,addresses.contains(invoker.getUrl.getAddress)
        if (CollectionUtils.isNotEmpty(addresses)) {
            result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
            // if result is not null OR it's null but force=true, return result directly
            if (CollectionUtils.isNotEmpty(result) || tagRouterRule.isForce()) {
                return result;
            }
        } else {
            // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
            // dynamic tag group but force=false. check static tag
            // 否则根据tag进行过滤
            result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
        }
        // If there's no tagged providers that can match the current tagged request. force.tag is set by default to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
        if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
            return result;
        }
        // FAILOVER: return all Providers without any tags.
        // 失效转移,此时会取没有match到address的invoker
        else {
            List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
                    tagRouterRule.getAddresses()));
            return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
        }
    } else {
        // List<String> addresses = tagRouterRule.filter(providerApp);
        // return all addresses in dynamic tag group.
        // 未指定tag,优先找address不匹配的invoker,若地址全部在tag,则直接返回空
        List<String> addresses = tagRouterRule.getAddresses();
        if (CollectionUtils.isNotEmpty(addresses)) {
            result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
            // 1. all addresses are in dynamic tag group, return empty list.
            if (CollectionUtils.isEmpty(result)) {
                return result;
            }
            // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
        }
        return filterInvoker(result, invoker -> {
            String localTag = invoker.getUrl().getParameter(TAG_KEY);
            return StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag);
        });
    }
}

// 使用静态tag进行route路由过滤
private <T> List<Invoker<T>> filterUsingStaticTag(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        List<Invoker<T>> result = invokers;
        // Dynamic param,从invocation取tag
        String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
                invocation.getAttachment(TAG_KEY);
        // Tag request
        if (!StringUtils.isEmpty(tag)) {
            // 配了tag,优先通过tag过滤
            result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
            // 未匹配到,则判断是否强制使用tag,不强制则取tag值为空的invoker的
            if (CollectionUtils.isEmpty(result) && !isForceUseTag(invocation)) {
                result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
            }
        } else {
            // 取tag值为空的invoker
            result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
        }
        return result;
}
学新通

动态tag与静态tag的区别在于动态tag使用tagRouterRule,通过监听动态配置,tagRouterRule会发生变更,代码如下:

public synchronized void process(ConfigChangeEvent event) {
    if (logger.isDebugEnabled()) {
        logger.debug("Notification of tag rule, change type is: "   event.getChangeType()   ", raw rule is:\n "  
                event.getValue());
    }

    try {
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            this.tagRouterRule = null;
        } else {
            this.tagRouterRule = TagRuleParser.parse(event.getValue());
        }
    } catch (Exception e) {
       // 异常处理,忽略
    }
}
学新通

最后来看notify,逻辑也非常简单,根据invoker的url获取providerApplication;若与当前application不一致,则会重新加载tagRouterRule

public <T> void notify(List<Invoker<T>> invokers) {
    if (CollectionUtils.isEmpty(invokers)) {
        return;
    }

    Invoker<T> invoker = invokers.get(0);
    URL url = invoker.getUrl();
    // remote.application
    String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY);

    if (StringUtils.isEmpty(providerApplication)) {
        logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application "  
                "in this TagRouter is not specified.");
        return;
    }

    //涉及configuration内部lisenter
    synchronized (this) {
        if (!providerApplication.equals(application)) {
            if (!StringUtils.isEmpty(application)) {
                configuration.removeListener(application   RULE_SUFFIX, this);
            }
            String key = providerApplication   RULE_SUFFIX;
            configuration.addListener(key, this);
            application = providerApplication;
            String rawRule = configuration.getConfig(key);
            if (rawRule != null) {
                this.process(new ConfigChangeEvent(key, rawRule));
            }
        }
    }
}
学新通

这篇好文章是转载于:编程之路

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 编程之路
  • 本文地址: /boutique/detail/tanhggbggc
系列文章
更多 icon
同类精品
更多 icon
继续加载