10 服务认证:被异构系统侵入调用了,怎么办?
你好,我是何辉。今天我们探索Dubbo框架的第九道特色风味,服务认证。
通过集成 Java 语言编写的 Dubbo 框架来提供服务,你已经非常熟悉了,作为 Dubbo 多语言生态最火热的项目,用 Go 语言开发的 dubbo-go 框架,想必你也有所耳闻,然而,就是这样一款非常实用且轻量级的优秀框架,却引发了一些产线事件。
事情是这样的,公司最近要做一个关于提升效能的一体化网站,我们的后端服务全是 Dubbo 提供者,但是负责效能开发的同事只会使用 Go 或 Python 来编写代码,于是经过再三考虑,效能同事最后使用 dubbo-go 来过渡对接后端的 Dubbo 服务。就像这样:
然而,dubbo-go 服务上线后不久,某个时刻,支付系统的银行账号查询接口的QPS异常突增,引起了相关领导的关注。
一番排查后,我们发现银行账号查询接口的来源IP格式比较怪异,找网工帮忙分析了一下,怪异的IP是一个异构系统 dubbo-go 服务发出来的请求(至于一体化网站为什么需要查询该接口就是后话了)。
目前暴露了一个比较严重的问题,被异构系统访问的接口缺乏一种认证机制,尤其是安全性比较敏感的业务接口,随随便便就被异构系统通过非正常途径调通了,有不少安全隐患。因此很有必要添加一种服务与服务之间的认证机制。
对于这个添加服务认证的需求,你会如何处理呢?
认证什么?
服务认证是一个很空泛的概念,很多人会疑惑服务认证,到底是在认证什么呢?
我们联想生活中认证含义,你可以从网络搜索到各式各样的解释,有人说,认证是由认证机构进行的一种 合格评定 活动,也有人说,认证是一种 信用保证 形式,还有人说,认证是指由认证机构证明产品、服务、管理体系 符合 相关技术 规范 的强制性要求或者 标准 的合格评定活动。
众说纷纭,但都没有错,不同的领域对于“认证”概念自然有着不同的内涵。我们简单分析一下这三种说法,关键在于“合格”、“信用保证”、“标准”,本质上有一种谁和谁作比较的概念。
既然有比较,就得弄清楚具体要比较的内容,而内容真假好坏的鉴定也需要一套规则。那我们就可以得出所谓的认证,就是用规则来鉴定内容。
对于服务之间的认证而言, 比较的内容是什么?规则又是什么呢?
比较的内容其实好说,无非就是客户端发送给服务端的数据。那我们要利用规则对数据进行怎样的鉴定呢?鉴定数据的真假?还是数据是否被篡改过?还是其他的什么呢?
参考一般的业务需求,目前我们也没有想到更多的鉴别规则了,那就先按照鉴定真假和鉴定篡改两个规则分析。
1. 鉴定真假
具体如何鉴别真假呢?我们联想业界已有的方案,比如对接微信支付,在向微信发送请求数据的时候,需要填写了 appId 和 secret 两个重要的字段,然后微信支付后台会比对这两个字段的值是否存在。
那我们是不是也可以仿照微信支付请求,也在数据里安插几个重要字段来鉴别呢?
应该可行, 在客户端发送数据的时候我们添加一个 TOKEN 字段,然后,服务端收到数据先验证 TOKEN 字段值是否存在,若存在则认为是合法可信任的请求,否则就可以抛出异常中断请求了。
这个 TOKEN 在服务端怎么验证是否存在呢?
肯定难不倒你,一般两种处理方式,要么服务端内部就有这个 TOKEN 直接验证,要么服务端去第三方媒介间接验证,总之处理请求的是服务端,至于服务端是内部验证还是依赖第三方媒介验证,那都是服务端的事情。顺着思路画出了这样的调用链路图:
处理方式一是服务端收到 TOKEN 凭证后与自身内存的值做比对验证,方式二是收到 TOKEN 后调用第三方认证中心进行比对验证。区别就在于,前者无需调用远程服务可以直接验证,而后者需要调用远程才能进行验证。
不过,我们要面临分支选择了,该使用哪种方式呢?
- 方式一无需调用远程,虽然节省了远程调用的开销,加快了处理验证的时效,但是这个 TOKEN 是位于服务端内部的,那就意味着 TOKEN 和服务端的方法有着一定的强绑定关系,不够灵活。
- 方式二远程调用,第三方认证中心可以根据不同的客户端分配不同的 TOKEN 值,并为 TOKEN 设置过期时间,灵活性和可控性变强了,但同时也牺牲了一定的远程调用时间开销。
所以,想通过单独认证中心进行统一约束和管理,且可以容忍远程调用的少许耗时,可以考虑方式二。如果只是想简单处理,或不能容忍性能的少许耗时,可以考虑方式一。
这里为了方便演示,我们就把方式一落实到代码,你知道该怎么做了吧?
还是先梳理改造的思路:
- 客户端在发送请求数据时,需要额外添加一个 TOKEN 字段,参考“ 隐式传递”,我们可以把 TOKEN 放在 Invocation 的 attachments 里面。
- 服务端在处理请求的认证逻辑时,为了不侵入业务逻辑,可以在过滤器里面处理;
- 过滤器需要优先处理 TOKEN 字段值是否存在,如果存在则继续后面的业务逻辑处理,否则就直接抛出异常。
接下来,编写代码,每一行我都详细写了注释,你可以对照着看:
///////////////////////////////////////////////////
// 提供方:自定义TOKEN校验过滤器,主要对 TOKEN 进行验证比对
///////////////////////////////////////////////////
@Activate(group = PROVIDER)
public class ProviderTokenFilter implements Filter {
/** <h2>TOKEN 字段名</h2> **/
public static final String TOKEN = "TOKEN";
/** <h2>方法级别层面获取配置的 auth.enable 参数名</h2> **/
public static final String KEY_AUTH_ENABLE = "auth.enable";
/** <h2>方法级别层面获取配置的 auth.token 参数名</h2> **/
public static final String KEY_AUTH_TOKEN = "auth.token";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 从方法层面获取 auth.enable 参数值
String authEnable = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_ENABLE);
// 如果不需要开启 TOKEN 认证的话,则继续后面过滤器的调用
if (!Boolean.TRUE.toString().equals(authEnable)) {
return invoker.invoke(invocation);
}
// 能来到这里,说明需要进行 TOKEN 认证
Map<String, Object> attachments = invocation.getObjectAttachments();
String recvToken = attachments != null ? (String) attachments.get(TOKEN) : null;
// 既然需要认证,如果收到的 TOKEN 为空,则直接抛异常
if (StringUtils.isBlank(recvToken)) {
throw new RuntimeException(
"Recv token is null or empty, path: " +
String.join(".", invoker.getInterface().getName(), invocation.getMethodName()));
}
// 从方法层面获取 auth.token 参数值
String authToken = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_TOKEN);
// 既然需要认证,如果收到的 TOKEN 值和提供方配置的 TOKEN 值不一致的话,也直接抛异常
if(!recvToken.equals(authToken)){
throw new RuntimeException(
"Recv token is invalid, path: " +
String.join(".", invoker.getInterface().getName(), invocation.getMethodName()));
}
// 还能来到这,说明认证通过,继续后面过滤器的调用
return invoker.invoke(invocation);
}
}
///////////////////////////////////////////////////
// 提供方:支付账号查询方法的实现逻辑
// 关注 auth.token、auth.enable 两个新增的参数
///////////////////////////////////////////////////
@DubboService(methods = {@Method(
name = "queryPayAccount",
parameters = {
"auth.token", "123456789",
"auth.enable", "true"
})}
)
@Component
public class PayAccountFacadeImpl implements PayAccountFacade {
@Override
public String queryPayAccount(String userId) {
String result = String.format(now() + ": Hello %s, 已查询该用户的【银行账号信息】", userId);
System.out.println(result);
return result;
}
private static String now() {
return new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss.SSS").format(new Date());
}
}
///////////////////////////////////////////////////
// 消费方:自定义TOKEN校验过滤器,主要将 TOKEN 传给提供方
///////////////////////////////////////////////////
@Activate(group = CONSUMER)
public class ConsumerTokenFilter implements Filter {
/** <h2>方法级别层面获取配置的 auth.token 参数名</h2> **/
public static final String KEY_AUTH_TOKEN = "auth.token";
/** <h2>TOKEN 字段名</h2> **/
public static final String TOKEN = "TOKEN";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 从方法层面获取 auth.token 参数值
String authToken = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_TOKEN);
// authToken 不为空的话则设置到请求对象中
if (StringUtils.isNotBlank(authToken)) {
invocation.getObjectAttachments().put(TOKEN, authToken);
}
// 继续后面过滤器的调用
return invoker.invoke(invocation);
}
}
///////////////////////////////////////////////////
// 消费方:触发调用支付账号查询接口的类
// 关注 auth.token 这个新增的参数
///////////////////////////////////////////////////
@Component
public class InvokeAuthFacade {
// 引用下游支付账号查询的接口
@DubboReference(timeout = 10000, methods = {@Method(
name = "queryPayAccount",
parameters = {
"auth.token", "123456789"
})})
private PayAccountFacade payAccountFacade;
// 该方法主要用来触发调用下游支付账号查询方法
public void invokeAuth(){
String respMsg = payAccountFacade.queryPayAccount("Geek");
System.out.println(respMsg);
}
}
代码写后,我们验证一下,想办法触发调用消费方的 invokeAuth 方法,打印结果长这样:
调用没有抛出异常,说明认证通过,而且正常拿到提供方的结果了。
接下来把消费方 queryPayAccount 方法的 auth.token 的值修改一下看是否报错,这里我就随便改成 123 了,再尝试调用一下 invokeAuth 方法,打印如下:
Caused by: org.apache.dubbo.remoting.RemotingException: java.lang.RuntimeException: Recv token is invalid, path: com.hmilyylimh.cloud.facade.pay.PayAccountFacade.queryPayAccount
java.lang.RuntimeException: Recv token is invalid, path: com.hmilyylimh.cloud.facade.pay.PayAccountFacade.queryPayAccount
at com.hmilyylimh.cloud.auth.config.ProviderTokenFilter.invoke(ProviderTokenFilter.java:60)
at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:321)
at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99)
把 auth.token 的值乱改一通后,发现接口调用不通了,因为打印的异常日志中已经明确提示 TOKEN 是无效的,说明 TOKEN 的正常流程和异常流程都是符合预期的。
改造完成!我们简单回看一下代码的主要改动点。
代码的实现主要有消费方和提供方两部分,提供方的代码主要有 4 点改动:
- 在提供方定义一个 ProviderTokenFilter 过滤器类,然后实现 invoke 方法。
- 在 invoke 方法中,从 invocation 的 attachments 中获取 TOKEN 值,再从上下文中获取方法层面配置的 TOKEN 值,最后直接比较两个 TOKEN 值是否一样,一样就正常往下执行,不一样则抛出异常直接中断调用流程。
- 在银行账号查询接口所在服务的 @DubboService 注解中,为该方法添加一个 TOKEN 参数(auth.token)以及对应的值(123456789)。
- 将 ProviderTokenFilter 的类路径添加到 META-INF 文件夹下面的 org.apache.dubbo.rpc.Filter 文件中。
消费方的代码也有 4 点改动:
- 在消费方定义一个 ConsumerTokenFilter 过滤器类,然后也实现 invoke 方法。
- 在调用银行账号查询接口所在服务的 @DubboReference 注解中,为该方法添加一个 TOKEN 参数(auth.token = 123456789)以及声明该方法需要开始TOKEN 认证(auth.enable = true)。
- 在 invoke 方法中,从上下文中获取方法层面配置的 TOKEN 值,然后放进 invocation 的 attachments 中,跟随远程调用去往提供方。
- 将 ConsumerTokenFilter 的类路径添加到 META-INF 文件夹下面的 org.apache.dubbo.rpc.Filter 文件中。
按照不发起远程调用的形式把代码实现后,是不是也并没有你想象中的那么难,关键就是找到突破口,写代码只是顺带的事情。在这个基础之上,再来扩展改造为调用远程的方式进行认证,想必难不倒你了,记得课后挑战一下。
2.鉴定篡改
我们接着看第二个认证工作,鉴定篡改,这个比较好理解,就是证明别人发送过来的内容没有在传输过程中被偷偷改动。
对加密算法有一定了解的你想必也马上想到了,证明数据是否被改动,可以考虑加密或加签。 加签 是为了校验数据在传输过程中是否被修改,而 加密 其实就是把明文变成密文,保护数据在传输过程中信息不泄密。
那为了提升安全敏感度,我既想保护数据的隐私,又想保护数据不被篡改,那是不是可以将加密和加签都派上用场呢?
这个当然可以,为了安全因素的考量,有时候必须得在性能损耗上做出一定让步。但是也不能让步的太出格。像公司内部的系统就会舍弃加密的处理,毕竟安全度等级越高,加密出来的密文体积就会越大,在传输过程中会大大增加带宽的消耗。
所以 有时候舍弃加密处理是一种折中考量,采用加签的方式基本上就足够了。
这里我们就以一套常用的 RSA 加签方式进行演示。第一步当然还是梳理代码修改思路:
- 客户端新增一个 ConsumerAddSignFilter 加签过滤器,同样服务端也得增加一个 ProviderVerifySignFilter 验签过滤器。
- 客户端将加签的结果放在 Invocation 的 attachments 里面。
- 服务端获取加签数据时,进行验签处理,若验签通过则放行,否则直接抛出异常。
大致思路梳理完后,我们开始改造,同样地,可以参考我写的详细注释看:
///////////////////////////////////////////////////
// 提供方:自定义验签过滤器,主要对 SIGN 进行验签
///////////////////////////////////////////////////
@Activate(group = PROVIDER)
public class ProviderVerifySignFilter implements Filter {
/** <h2>SING 字段名</h2> **/
public static final String SING = "SING";
/** <h2>方法级别层面获取配置的 auth.ras.enable 参数名</h2> **/
public static final String KEY_AUTH_RSA_ENABLE = "auth.rsa.enable";
/** <h2>方法级别层面获取配置的 auth.rsa.public.secret 参数名</h2> **/
public static final String KEY_AUTH_RSA_PUBLIC_SECRET = "auth.rsa.public.secret";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 从方法层面获取 auth.ras.enable 参数值
String authRsaEnable = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_RSA_ENABLE);
// 如果不需要验签的话,则继续后面过滤器的调用
if (!Boolean.TRUE.toString().equals(authRsaEnable)) {
return invoker.invoke(invocation);
}
// 能来到这里,说明需要进行验签
Map<String, Object> attachments = invocation.getObjectAttachments();
String recvSign = attachments != null ? (String) attachments.get(SING) : null;
// 既然需要认证,如果收到的加签值为空的话,则直接抛异常
if (StringUtils.isBlank(recvSign)) {
throw new RuntimeException(
"Recv sign is null or empty, path: " +
String.join(".", invoker.getInterface().getName(), invocation.getMethodName()));
}
// 从方法层面获取 auth.rsa.public.secret 参数值
String rsaPublicSecretOpsKey = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_RSA_PUBLIC_SECRET);
// 从 OPS 配置中心里面获取到 rsaPublicSecretOpsKey 对应的密钥值
String publicKey = OpsUtils.get(rsaPublicSecretOpsKey);
// 加签处理
boolean passed = SignUtils.verifySign(invocation.getArguments(), publicKey, recvSign);
// sign 不为空的话则设置到请求对象中
if (!passed) {
throw new RuntimeException(
"Recv sign is invalid, path: " +
String.join(".", invoker.getInterface().getName(), invocation.getMethodName()));
}
// 继续后面过滤器的调用
return invoker.invoke(invocation);
}
}
///////////////////////////////////////////////////
// 提供方:支付账号查询方法的实现逻辑
// 关注 auth.rsa.public.secret、auth.rsa.enable 两个新增的参数
///////////////////////////////////////////////////
@DubboService(methods = {@Method(
name = "queryPayAccount",
parameters = {
"auth.rsa.public.secret", "queryPayAccoun_publicSecret",
"auth.rsa.enable", "true"
})}
)
@Component
public class PayAccountFacadeImpl implements PayAccountFacade {
@Override
public String queryPayAccount(String userId) {
String result = String.format(now() + ": Hello %s, 已查询该用户的【银行账号信息】", userId);
System.out.println(result);
return result;
}
private static String now() {
return new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss.SSS").format(new Date());
}
}
///////////////////////////////////////////////////
// 消费方:自定义加签过滤器,主要将 SIGN 传给提供方
///////////////////////////////////////////////////
@Activate(group = CONSUMER)
public class ConsumerAddSignFilter implements Filter {
/** <h2>SING 字段名</h2> **/
public static final String SING = "SING";
/** <h2>方法级别层面获取配置的 auth.rsa.private.secret 参数名</h2> **/
public static final String KEY_AUTH_RSA_PRIVATE_SECRET = "auth.rsa.private.secret";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 从方法层面获取 auth.token 参数值
String aesSecretOpsKey = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), KEY_AUTH_RSA_PRIVATE_SECRET);
// 从 OPS 配置中心里面获取到 aesSecretOpsKey 对应的密钥值
String privateKey = OpsUtils.get(aesSecretOpsKey);
// 加签处理
String sign = SignUtils.addSign(invocation.getArguments(), privateKey);
// sign 不为空的话则设置到请求对象中
if (StringUtils.isNotBlank(sign)) {
invocation.getObjectAttachments().put(SING, sign);
}
// 继续后面过滤器的调用
return invoker.invoke(invocation);
}
}
///////////////////////////////////////////////////
// 消费方:触发调用支付账号查询接口的类
// 关注 auth.rsa.private.secret 这个新增的参数
///////////////////////////////////////////////////
@Component
public class InvokeAuthFacade {
// 引用下游支付账号查询的接口
@DubboReference(timeout = 10000, methods = {@Method(
name = "queryPayAccount",
parameters = {
"auth.rsa.private.secret", "queryPayAccoun_privateSecret"
})})
private PayAccountFacade payAccountFacade;
// 该方法主要用来触发调用下游支付账号查询方法
public void invokeAuth(){
String respMsg = payAccountFacade.queryPayAccount("Geek");
System.out.println(respMsg);
}
}
代码的实现还是有消费方和提供方两部分。提供方的代码主要有4点改动:
- 在提供方定义一个 ProviderVerifySignFilter 验签过滤器类,然后实现 invoke 方法。
- 在 invoke 方法中,从 invocation 的 attachments 中获取 SIGN 加签值,再从上下文中获取方法层面配置的RSA公钥OPS配置值(auth.rsa.public.secret),最后进行验签,不一样则抛出异常直接中断调用流程。
- 在银行账号查询接口所在服务的 @DubboService 注解中,为该方法添加一个 RSA公钥的OPS配置(auth.rsa.public.secret)以及声明该方法需要开启RSA验签(auth.rsa.enable = true)。
- 将 ProviderVerifySignFilter 的类路径添加到 META-INF 文件夹下面的 org.apache.dubbo.rpc.Filter 文件中。
消费方的代码主要有4点改动:
- 在消费方定义一个 ConsumerAddSignFilter 加签过滤器类,然后也实现 invoke 方法。
- 在调用银行账号查询接口所在服务的 @DubboReference 注解中,为该方法添加一个RSA私钥的OPS配置(auth.rsa.private.secret)。
- 在 invoke 方法中,将 invocation 中 arguments 进行加签操作,然后放进 invocation 的 attachments 中,跟随远程调用去往提供方。
- 将 ConsumerAddSignFilter 的类路径添加到 META-INF 文件夹下面的 org.apache.dubbo.rpc.Filter 文件中。
到这里,鉴定真假、鉴定篡改这两种方式如何实现我们就学完了,细心的你想必应该也发现了,其实鉴定真假和鉴定篡改也是可以一起使用的,无非就是看安全度需要提升到什么粒度而已。
服务认证的应用
以后看到异构系统,相信你应该知道怎么轻量级地进行服务认证了。一般来说,凡是存在交互的地方,就会存在信任与不信任,那在实际开发的过程中,还有哪些应用场景需要进行服务认证呢?从一张系统拓扑图中,我们来详细分析一下:
从图中可以总结有三种调用场景。
第一,系统内应用之间的调用,这种基本上是一个系统群或者系统域,里面有很多应用,应用之间的调用可以考虑服务认证。
第二,系统间不同应用的调用,这种有点像不同部门或不同系统域之间的应用调用,有点跨组织的概念,往往会考虑服务认证。
第三,系统内外不同系统的调用,这种有点像内部系统与外部公网之外的系统进行调用,一旦谈到公网,各种安全因素一定少不了,因此就更有必要考虑认证了。
总结
今天,从一个异构系统调用支付系统引发了QPS暴增的问题开始,我们意识到内部系统与异构系统之间缺乏一种安全的认证机制,分析了服务如何进行认证:
- 鉴定真假,通过安插一些特殊字段(比如 TOKEN 字段)放在请求数据中,然后服务端识别这些特殊字段值进行一定的比对验证。
- 鉴定篡改,通过对数据进行加签、加密,将加签结果、加密结果放在请求数据中,然后服务端展开验签、解密操作,以确保数据的原始完整性。
这里也总结下自定义认证过滤器的通用三部曲:
- 首先,自定义过滤器继承 org.apache.dubbo.rpc.Filter 接口,并实现 invoke 方法,同时在过滤器的 @Activate 注解中声明是提供方或消费方,还是两者都是。
- 其次,可以在 @Method 的 parameters 字段中为方法自定义一些属性,以辅助过滤器的通用逻辑处理。
- 最后,将自定义过滤器的类路径一定要记得添加到 META-INF 文件夹下面的 org.apache.dubbo.rpc.Filter 文件中。
服务认证的应用场景有三类,系统内应用之间的调用、系统间不同应用的调用、系统内外不同系统的调用。
思考题
留个作业给你,Dubbo 框架也有类似服务认证这样的支撑能力,你可以研究下 TokenFilter、ConsumerSignFilter、ProviderAuthFilter ,看看如何应用。
欢迎在留言区分享你的思考和学习心得。我们下节课再见。
09思考题参考
上一期留了两个作业:
- 如何在 CustomCacheFilter 基础之上继续实现消费方的限流解决方案呢?
- 如何简单地应用 ActiveLimitFilter、ExecuteLimitFilter 分两个过滤器来控制流量?
问题一,其实在 CustomCacheFilter 底层,根本不区分到底是消费方还是提供方,底层只是识别了服务名和方法名就完成了一切了。因此只需要做两步:
将 CustomCacheFilter 的 @Activate 类注解修改下,既支持提供方,也支持消费方即可:
@Activate(group = {PROVIDER, CONSUMER})
public class CustomLimitFilter implements Filter {
// 省略其他代码
}
当消费方发起调用的时候,在 @DubboReference 注解中同样添加 qps.enable、qps.value、qps.type 这三个参数即可:
@DubboReference(timeout = 10000, methods = {@Method(
name = "queryRoleList",
parameters = {
"qps.enable", "true",
"qps.value", "3"
})})
private RoleQueryFacade roleQueryFacade;
问题二就更简单了,有了编写 CustomCacheFilter 的经验,再来看 ActiveLimitFilter、ExecuteLimitFilter 这两个类的实现逻辑,简直是秒懂。
打开 ActiveLimitFilter 看看:
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {
private static final String ACTIVE_LIMIT_FILTER_START_TIME = "active_limit_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 参数一:设置 actives 属性
int max = invoker.getUrl().getMethodParameter
(methodName, "actives", 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (!RpcStatus.beginCount(url, methodName, max)) {
// 参数二:设置 timeout 属性
long timeout = invoker.getUrl().getMethodParameter
(invocation.getMethodName(), "timeout", 0);
// 省略了其他代码逻辑
// 大致含义是,在有限的超时时间范围内会继续等待
// 直到超时时间到了,还未拿到计数资源的话,那么就抛出异常
}
invocation.put(ACTIVE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
// 省略了其他代码逻辑
}
阅读 ActiveLimitFilter 代码后,我们可以得出 3 个规则:
- @Activate 中指定的是 CONSUMER,说明只在消费方生效。
- 在方法级别层面,可以设置 actives、timeout 两个参数来进行消费方限流。
- 特别之处在于 timeout 参数,如果拿不到计数限流资源,仍未超时,会继续等待片刻,直到有计数资源释放后再次进行尝试获取计数资源,只要超时时间一到,还没有获取到计数资源的话,则抛出异常。
接下来再打开 ExecuteLimitFilter 看看:
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter, Filter.Listener {
private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 参数一:设置 executes 属性
int max = url.getMethodParameter(methodName, "executes", 0);
// 若获取不到计数资源的话,则抛出异常
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
try {
// 能来到这里,说明已获取到计数资源
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
// 省略了其他代码逻辑
}
阅读 ExecuteLimitFilter 代码后,同样可以以下 2 个规则:
- 规则一,@Activate 中指定的是 PROVIDER,说明只在提供方生效。
- 规则二,在方法级别层面,可以设置 executes 两个参数来进行提供方限流。
阅读 ActiveLimitFilter、ExecuteLimitFilter 两个源码类,细心的你可能会发现,消费方比提供方多了一个时间等待的操作,采用了等待的操作进行最大力度的获取计数资源,来充分利用 timeout 超时属性提高接口调用的成功概率。