21 调用流程:消费方的调用流程体系,你知道多少?
你好,我是何辉。今天我们深入研究Dubbo源码的第十篇,调用流程。
在消费方这样的代码你一定见过很多了。
在 Spring Bean 中定义 Dubbo 接口为成员变量时,用 @DubboReference 注解修饰 DemoFacade。
你有没有好奇过,我们现在看到的这个 demoFacade 变量,在内存运行时,值类型还属于 DemoFacade 这个类型么?如果不是,那拿着 demoFacade 变量去调用里面的方法时,在消费方到底会经历怎样的调用流程呢?
这个问题,我以前也探索过,不过因为源码太复杂,最后都徒劳无功。直到有一天,遇到了一位资深大佬,指点了我看源码的 12 字方针: 不钻细节:只看流程;不看过程:只看结论;再看细节:再看过程。参考他的经验,我经过一番简单的调试后,就轻松梳理出了调用流程的大体框架。
今天我就带你按照这12字方针的思路,感受一下高手是如何研究源码流程的。每个环节,我会用图片总结,加强你对流程的形象化理解。今天的内容稍微多一些,做好准备,我们马上开始。
sayHello 调试
先来看下我们的消费方调用代码。
///////////////////////////////////////////////////
// 消费方的一个 Spring Bean 类
// 1、里面定义了下游 Dubbo 接口的成员变量,并且还用 @DubboReference 修饰了一下。
// 2、还定义了一个 invokeDemo 方法被外部调用,但其重点是该方法可以调用下游的 Dubbo 接口
///////////////////////////////////////////////////
@Component
public class InvokeDemoService {
// 定义调用下游接口的成员变量,
// 并且用注解修饰
@DubboReference
private DemoFacade demoFacade;
// invokeDemo 是被外部触发调用的,不过这不是重点,
// 重点的是该 invokeDemo 逻辑中能调用下游的接口,
// 这里调用的是下游 DemoFacade 接口的 sayHello 方法
public String invokeDemo(){
return demoFacade.sayHello("Geek");
}
}
代码在这个类中定义了一个 invokeDemo 的方法,然后,在方法体中对下游 DemoFacade 接口的 sayHello 方法发起了远程调用。
是不是发现消费方真的非常简单且普通。那怎么展开探索呢?我们以 Debug 的方式调试消费方,在 sayHello 方法打个断点,通过断点调试,钻进调用流程的各个环节,同时参考12字方针,深入研究调用流程的底层逻辑。
1. JDK代理
我们在调用 sayHello 方法的这行打上一个断点,先运行提供方,再 Debug 运行消费方,很快断点就到来了。
还记得前面提出的一个疑问么,demoFacade 在内存运行时,值类型是什么?
从图中,我们可以清楚地看到,demoFacade 的类型,是一个随机生成的代理类名,不再属于 DemoFacade 这个类型了,而且结合 $Proxy 类名、代理类中的 h 成员变量属于 JdkDynamicAopProxy 类型,综合判断,这是采用 JDK 代理动态,生成了一个继承 Proxy 的代理类。
如果你再展开 h 属性看看它里面的成员变量。
有 2 个比较重要的字段,targetSource 和 interfaces,我们挨个看下。
先来看看 targetSource 变量(ReferenceBean$DubboReferenceLazyInitTargetSource)。
看到它的类名,有没有似曾相识的感觉,类名上包含了 DubboReference 关键字,想必跟订阅流程中的引用有关。而且类名的构成,由 $ 符号隔开了,可以说明 DubboReferenceLazyInitTargetSource 是 ReferenceBean 的一个内部类。
进去瞄一眼,看看猜想对不对。
///////////////////////////////////////////////////
// org.apache.dubbo.config.spring.ReferenceBean.DubboReferenceLazyInitTargetSource
// 懒加载初始化的一种操作
///////////////////////////////////////////////////
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
// 创建对象,其实就是创建代理对象,拿到远程引用
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
↓
///////////////////////////////////////////////////
// org.apache.dubbo.config.spring.ReferenceBean#getCallProxy
// 直接通过 referenceConfig 的 get 方法获取接口引用的代理对象
///////////////////////////////////////////////////
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
}
//get reference proxy
return referenceConfig.get();
}
进去之后才发现,原来内部类是一层壳, 核心创建对象的逻辑还是 ReferenceConfig 的 get 方法。
我们再来看 interfacaces 变量。
这里除了有我们调用下游的接口 DemoFacade,还有一个回声测试接口(EchoService)和一个销毁引用的接口(Destroyable)。
这俩接口我们没见过,却被 Dubbo 在创建代理的过程中追加进来了,可以说明一点, 我们可以拿着 demoFacade 变量强转为 EchoService,或者强转为 Destroyable,强转后就可以调用对应接口的方法,实现了一个代理类拥有多态功能的效果。
这个多态功能的支持,得归功于 JDK 的 Proxy 类,它在创建代理对象的 newProxyInstance 方法中,支持入参传入一个接口数组,Dubbo 框架在创建代理时,不但传入了 DemoFacade 接口,还传入了 EchoService 和 Destroyable 接口,生成的代理对象就有了另外两种接口的行为能力。
///////////////////////////////////////////////////
// java.lang.reflect.Proxy#newProxyInstance
// JDK 的 Proxy 类中,创建代理对象的方法
///////////////////////////////////////////////////
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
{ 省略部分实现代码... }
好,简单整理一下,刚开始断点探索,我们就接触到了“高深”的 JDK 代理对象,解开了开头的一个疑惑, demoFacade 在运行时并非我们所见的 DemoFacade 类型,而是由 JDK 动态代理生成的一个代理对象类型。
碰巧发现,在生成的代理对象中,targetSource 成员变量创建的底层核心逻辑还是 ReferenceConfig 的 get 方法,不得不说 ReferenceConfig 是消费方引用下游接口逻辑中非常重要的一个类。
同时还认识了 EchoService 和 Destroyable 两个接口,让我们使用 demoFacade 时不仅可以调用 sayHello 方法,还可以强转为这两个接口调用不同的方法,使得一个 demoFacade 变量拥有三能能力,这就是代理增强的魅力所在。
2. InvokerInvocationHandler
接下来,我们继续 Debug 进入 sayHello 方法中,发现了一个名字有点眼熟的 InvokerInvocationHandler 类,虽然不知道具体能做啥,但是发现它实现了 JDK 代理中的 InvocationHandler 接口,所以,我们可以认为,这个 InvokerInvocationHandler 类是 Dubbo 框架接收 JDK 代理触发调用的入口。
来看看 InvokerInvocationHandler 的 invoke 方法,验证一下。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
// JDK 代理被触发调用后紧接着就开始进入 Dubbo 框架的调用,
// 因此跟踪消费方调用的入口,一般直接搜索这个 InvokerInvocationHandler 即可,
// 再说一点,这个 InvokerInvocationHandler 继承了 InvocationHandler 接口。
///////////////////////////////////////////////////
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果方法所在的类是 Object 类型的话,则不做任何处理
// 毕竟 Object 类型是 JDK 源码中的类,也不是 Dubbo 框架处理的重点
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 获取方法名,方法参数类型
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 如果参数类型个数是 0 个的话,则表示是无参数方法
// 既然是无参数方法的话,对于一些特殊的方法需要尽可能的在入口处解决
// 因此对于 Object 中的 toString、hashCode 方法,Destroyable 接口实现类的 $destroy 方法
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
}
// 如果方法参数个数为 1 个,并且还是 equals 方法,则也不是 Dubbo 框架处理的重点
else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 还能来到这里,说明这才是 Dubbo 框架需要介入处理的方法
// 前面那么多提前返回的逻辑,在前几次阅读时,根本不需要细看,可以直接跳过不看
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method, invoker.getInterface().getName(), protocolServiceKey, args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
// 然后转手就把逻辑全部收口到一个 InvocationUtil 类中,
// 从命名也看得出,就是一个调用的工具类
return InvocationUtil.invoke(invoker, rpcInvocation);
}
按12字方针,从上往下大致浏览一遍。
- 从代码表面的流程看,一些 toString、$destroy、hashCode 等方法做了特殊的逻辑提前返回了,最终调用 InvocationUtil 的 invoke 方法进行了收口处理。
- 从方法的入参和返回值看,有方法对象、入参数据、代理对象,满足了反射调用的基本要素,调用后返回的数据,自然就是我们需要的结果。
- 再看方法实现体的一些细节,发现最终并没有使用 proxy 代理对象,而是使用了 invoker + rpcInvocation 传入 InvocationUtil 工具类,完成了逻辑收口。
再进入 InvocationUtil,忽略与入参无关的逻辑,最终发现把 rpcInvocation 对象传入了 invoker 的 invoke 方法中,继续走后续调用逻辑。
3. MigrationInvoker
Debug 一直往后走,来到了一个 MigrationInvoker 的调用类,从类的名字上看是“迁移”的意思,有点 Dubbo2 与 Dubbo3 新老兼容迁移的味道。
那来看看 MigrationInvoker 的 invoke 方法代码逻辑。
///////////////////////////////////////////////////
// org.apache.dubbo.registry.client.migration.MigrationInvoker#invoke
// 迁移兼容调用器
///////////////////////////////////////////////////
@Override
public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
return invoker.invoke(invocation);
}
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
currentAvailableInvoker = serviceDiscoveryInvoker;
} else if (checkInvokerAvailable(invoker)) {
currentAvailableInvoker = invoker;
} else {
currentAvailableInvoker = serviceDiscoveryInvoker;
}
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
}
继续按 12 字方针分析。
- 从代码流程看,从上到下都在针对 step 变量值,进行 if…else 判断走不走分支处理,说明这里针对 step 进行了逻辑分发处理。
- 从方法的入参和返回值来看,入参 invocation 是一个富含所有请求信息的接口,返参也是一个抽象的 Result 接口,意思很直白,根据参数拿到结果,只是入参和返参是接口,体现了抽象的概念。
- 再看看方法实现体的一些细节,通过针对 APPLICATION_FIRST、FORCE_APPLICATION、FORCE_INTERFACE 来进行分发处理,这不就是消费方设置 dubbo.application.service-discovery.migration 属性,进行新老订阅方案兼容的值么?根据不同的属性值,选择对应可用的 invoker 进行继续后续调用。
4. MockClusterInvoker
然后走进”step == APPLICATION_FIRST“的分支逻辑,进入 currentAvailableInvoker 的 invoke 方法,来到了 MockClusterInvoker 这个类,看名字是“模拟集群调用”的意思。
本意是干啥的呢,我们继续看它的 invoke 方法逻辑。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
// 调用异常时进行使用mock逻辑来处理数据的返回
///////////////////////////////////////////////////
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result;
// 从远程引用的url中看看有没有 mock 属性
String value = getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
// mock 属性值为空的话,相当于没有 mock 逻辑,则直接继续后续逻辑调用
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// 如果 mock 属性值是以 force 开头的话
else if (value.startsWith("force")) {
// 那么就直接执行 mock 调用逻辑,
// 用事先准备好的模拟逻辑或者模拟数据返回
//force:direct mock
result = doMockInvoke(invocation, null);
}
// 还能来到这说明只是想在调用失败的时候尝试一下 mock 逻辑
else {
//fail-mock
try {
// 先正常执行业务逻辑调用
result = this.invoker.invoke(invocation);
//fix:#4585
// 当业务逻辑执行有异常时,并且这个异常类属于RpcException或RpcException子类时,
// 还有异常的原因如果是 Dubbo 框架层面的业务异常时,则不做任何处理。
// 如果不是业务异常的话,则会继续尝试执行 mock 业务逻辑
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
// 如果异常是 Dubbo 系统层面所认为的业务异常时,就不错任何处理
if(rpcException.isBiz()){
throw rpcException;
}else {
// 能来到这里说明是不是业务异常的话,那就执行模拟逻辑
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
// 业务异常直接往上拋
if (e.isBiz()) {
throw e;
}
// 不是 Dubbo 层面所和认为的异常信息时代,
// 直接
result = doMockInvoke(invocation, e);
}
}
return result;
}
照例按12字方针分析。
- 从代码流程看,整体上就三个 if 逻辑分支,no mock、force mock 和 fail mock,分别进行了不同逻辑的调用。
- 从方法的入参和返回值看,入参和返参和 MigrationInvoker 一样,都是重写了 invoke 方法,只是内部逻辑不一样,不同实现类做各自的事情。
- 再看看方法实现体的一些细节,mock 属性值(除了空值),当以 force 开头时,会直接执行 Mock 业务逻辑,其他情况还是先尝试正常后续调用,如果出现了非业务异常时,就尝试执行 Mock 业务逻辑。
5. 过滤器链
这里,看到 Cluster 这个关键字,想必你也想到了它是一个 SPI 接口,那在“发布流程”中我们也学过,远程导出和远程引用的时候,会用过滤器链把 invoker 层层包装起来。
那么我们就接着断点下去,发现确实进入 FutureFilter、MonitorFilter 等过滤器,这也证明了过滤器链包裹消费方 invoker 的存在。
6. FailoverClusterInvoker
断点一层层走完了所有的过滤器,接着又来到了 FailoverClusterInvoker 这个类,从名字上一看就是在“ 温故知新”中接触的故障转移策略,是不是有点好奇故障到底是怎么转移的?
我们不妨继续断点,进入它的 doInvoke 方法看看。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
// 故障转移策略的核心逻辑实现类
///////////////////////////////////////////////////
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
// 获取此次调用的方法名
String methodName = RpcUtils.getMethodName(invocation);
// 通过方法名计算获取重试次数
int len = calculateInvokeTimes(methodName);
// retry loop.
// 循环计算得到的 len 次数
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
// 从第2次循环开始,会有一段特殊的逻辑处理
if (i > 0) {
// 检测 invoker 是否被销毁了
checkWhetherDestroyed();
// 重新拿到调用接口的所有提供者列表集合,
// 粗俗理解,就是提供该接口服务的每个提供方节点就是一个 invoker 对象
copyInvokers = list(invocation);
// check again
// 再次检查所有拿到的 invokes 的一些可用状态
checkInvokers(copyInvokers, invocation);
}
// 选择其中一个,即采用了负载均衡策略从众多 invokers 集合中挑选出一个合适可用的
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
// 设置 RpcContext 上下文
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
// 得到最终的 invoker 后也就明确了需要调用哪个提供方节点了
// 反正继续走后续调用流程就是了
Result result = invokeWithContext(invoker, invocation);
// 如果没有抛出异常的话,则认为正常拿到的返回数据
// 那么设置调用成功标识,然后直接返回 result 结果
success = true;
return result;
} catch (RpcException e) {
// 如果是 Dubbo 框架层面认为的业务异常,那么就直接抛出异常
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他异常的话,则不继续抛出异常,那么就意味着还可以有机会再次循环调用
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
// 如果没有正常返回拿到结果的话,那么把调用异常的提供方地址信息记录起来
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
// 如果 len 次循环仍然还没有正常拿到调用结果的话,
// 那么也不再继续尝试调用了,直接索性把一些需要开发人员关注的一些信息写到异常描述信息中,通过异常方式拋出去
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
代码流程分析也比较简单。
- 从代码流程看,主要就是一个大的 for 循环,循环体中进行了 select 操作,拿到了一个合适的 invoker,发起后续逻辑调用。
- 从方法的入参和返回值看,入参是 invocation、invokers、loadbalance 三个参数,猜测应该就是利用 loadbalance 负载均衡器,从 invokers 集合中,选择一个 invoker来发送 invocation 数据,发送完成后得到了返参的 Result 结果。
- 再看看方法实现体的一些细节,通过计算 retries 属性值得到重试次数并循环,每次循环都是利用负载均衡器选择一个进行调用,如果出现非业务异常,继续循环调用,直到所有次数循环完,还是没能拿到结果的话就会抛出 RpcException 异常了。
7. DubboInvoker
现在你应该知道“故障转移策略”的深层含义了吧,就是根据预先设置好的重试次数,当调用发生非业务异常时,按照负载均衡策略继续选择一个 invoker,再次发起调用。
了解完故障转移策略,我们继续 Debug,结果来到了 DubboInvoker 这个类。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
// 按照dubbo协议发起调用实现类
///////////////////////////////////////////////////
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 获取此次调用的方法名
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取发送数据的客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 看看是单程发送不需要等待响应,还是发送完了后需要等待响应
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取超时时间,这个之前在“配置加载顺序”接触过这个方法
int timeout = calculateTimeout(invocation, methodName);
invocation.setAttachment(TIMEOUT_KEY, timeout);
// 单程发送不需要等待响应
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 发送完了之后需要等待响应
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 操作 currentClient 发送了一个 request 请求,
// 然后接收了一个 CompletableFuture 对象,说明这里存在异步操作
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- 从代码流程看,拿到了一个交换数据的客户端类,然后走两个发送数据的分支,一条分支逻辑单程调用不需要响应,一条有响应。
- 从方法的入参和返回值来看,没有什么特别,根据请求数据想办法发起调用,拿到 Result 结果。
但是当前的类名是处理 Dubbo 协议的调用?有点好奇,我们看看这个 doInvoke 有哪些实现类,发现了有 InjvmInvoker、GrpcInvoker 等实现类,也证实了当前的 DubboInvoker 是专门处理 Dubbo 协议调用的类。
- 再看看方法实现体的一些细节,计算了超时时间值,单程发送,不需要响应好说,直接调用交互数据客户端类的 send 方法就行,需要有响应的分支逻辑还定义了线程池,调用的是 request 方法进行发送。
不过,两条分支最终都返回了一个异步结果对象,可以看出当前的 DubboInvoker 旨在屏蔽协议之间的差异,让上层调用方不感知协议间的不同。
8. ReferenceCountExchangeClient
因为我们没有单独设置调用不需要响应,就继续断点进入 currentClient 的 request 方法,看看到底是怎么发送的,来到了 ReferenceCountExchangeClient 类。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
// 这里将构建好的 request 对象发送出去,然后拿到了一个 CompletableFuture 异步化的对象
///////////////////////////////////////////////////
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
// client为:HeaderExchangeClient
return client.request(request, timeout, executor);
}
- 从代码流程来看,这个类中的 request 方法也很简单,把入参数据直接全部交给了当前类的成员变量 client 处理。
- 从方法的入参和返回值看,入参是请求对象、超时时间、线程池对象,返参是 CompletableFuture 对象。从线程池和 Future 对象,充分说明了这个 request 方法进行了同步转异步的操作。
- 再看看方法实现体的一些细节,从断点展示的变量值细节看,只是发现了 client 的类型属于 HeaderExchangeClient,也就意味着 HeaderExchangeClient 完成了同步转异步的逻辑操作。
9. NettyClient
仍然没有看到数据是怎么发送的,我们继续深入 HeaderExchangeClient 的 request 方法中,Debug 几步后发现,进入了抽象类 AbstractPeer 的 send 方法。
///////////////////////////////////////////////////
// org.apache.dubbo.remoting.transport.AbstractPeer#send
// this:NettyClient
// NettyClient 负责和提供方服务建立连接、发送数据等操作
///////////////////////////////////////////////////
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
逻辑也挺简单。
- 从代码流程来看,没有过多的逻辑包装,直接把入参交给了另外一个重载的 send 方法。
- 从方法的入参和返回值来看,入参是请求对象,没有返参,并且方法上声明了 RemotingException 异常,说明这里离数据的发送应该很近了。至于没有返回值,主要是前面由同步转了异步,没有返回值也正常。
- 再看看方法实现体的一些细节,抽象类看不出什么,但是从堆栈变量的引用值可以发现,当前的抽象类是 NettyClient 的父类。
那我们从 NettyClient 的 send 方法细看,结果发现了最终调用了 NioSocketChannel 的 writeAndFlush 方法,这个不就是 Netty 网络通信框架的 API 么?
到这里,我们在代码层面解释了数据原来是通过 Netty 框架的 NioSocketChannel 发送出去的。
10. NettyCodecAdapter
一旦进入到 Netty 框架,再通过断点一步步跟踪数据就有点难了,因为 Netty 框架内部处处都是“线程+队列”的异步操作方式,这里我们走个捷径,进入 NettyClient 类,找找初始化 NettyClient 的相关 Netty 层面的配置。
你会找到一个 NettyCodecAdapter 类,对数据进行编解码,直接在类中的 encode 方法打个断点,等断点过来。
///////////////////////////////////////////////////
// org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder#encode
// 将对象按照一定的序列化方式发送出去
///////////////////////////////////////////////////
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
可以看到,编码的方法简单干脆。
- 从代码流程看,调用了一个 codec 编码器的变量,对入参编码处理。
- 从方法的入参和返回值看,msg 是请求数据对象,out 变量是 ByteBuf 缓冲区,应该是将 msg 编码之后的数据流。
- 再看看方法实现体的一些细节,没什么特别之处,无非就是走后续编解码器的编码方法,编码完成后,再通过 Netty 框架把数据流发送出去。
调用流程的其他案例
经过漫长的十步断点,我们把消费方的调用流程大致串了起来,虽说过程有点曲折,但最终的艰辛是值得的,对于你理解框架调用的来龙去脉很重要,而且未来你会体会到,当调用环节发生了异常,你只要认真回忆一下异常在哪个环节,然后结合代码简单分析一下,很快就能分析出原因来。
消费方的调用流程就是这么一步步调试分析出来的,那还有哪些典型的流程,可以按这个思路一步步调试分析出调用流程呢?我就列举三个比较典型的,你课后可以练练手。
第一,Tomcat 容器接收请求流程,通过在 HttpServlet 的 service 方法打个断点,从调用堆栈的最下面开始,可以分析出 Tomcat 的整体架构。
第二,SpringMvc 处理请求的流程,通过在 Controller 的任意方法打个断点,就可以逐步分析出一个 SpringMvc 处理请求的大致流程框架。
第三,Spring 的 getBean 方法,通过这个方法的层层深入,可以分析出一个庞大的 Spring 对象生成体系,也能挖掘出非常多 Spring 各种操控对象的扩展机制。
总结
今天,从一个被 @DubboReference 标识的常规成员变量开始,我们抛出了两个问题,变量的值类型在运行时是否是见到的类型,在消费方发起调用时会经过怎样的调用流程体系。
带着这两个问题,我们用最普通的 Debug 调试方式,借鉴 12 字方针深入跟踪调用流程的源码,边分析、边绘图,步步为营,最终梳理出了消费方调用流程的大体框架。 你还可以和“源码框架”中学过的分层模块联系到一起复习。
总结一下跟踪源码的 12 字方针。
- “不钻细节:只看流程。”代码虽然多,但我们不必研究每个细节,要先捡方法中的重要分支流程,一些提前返回的边边角角的流程一概忽略不看。
- “不看过程:只看结论。”每个方法的代码逻辑可长可短,我们可以重点研究这个方法需要什么入参,又能给出什么返参,以此推测方法在干什么,到底要完成一件什么样的事情,搞清楚并得出结论。
- “再看细节:再看过程。”当你按照前两点认真调试后,大概的调用流程体系就清楚了。在此基础之上,再来细看被遗忘的边角料代码,有助于你进一步丰富调用流程图,体会源码细节中那些缜密的思维逻辑。
思考题
留个作业,消费方的普通调用会经历哪些流程你已经十分熟悉了,可以研究消费方进行泛化调用时会经历哪些流程,以及泛化调用的底层是怎么实现的?动手练习一下。
期待看到你的回答,如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起讨论。我们下一讲见。
20 思考题参考
上一期留了个作业,研究下消费方接收 zookeeper 服务端事件变更的地方在哪里。
想要解答这个问题,我们得从添加监听的起源看起,只有知道添加监听干了些什么事情,预留了什么回调,我们才能进一步找到接收的地方。
我们直接进入 doSubscribe 方法中看看,到底添加了什么监听?监听对应的实现类是哪个?
///////////////////////////////////////////////////
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
// 订阅的核心逻辑,读取 zk 目录下的数据,然后通知刷新内存中的数据
///////////////////////////////////////////////////
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
// 因为这里用 * 号匹配,我们在真实的线上环境也不可能将服务接口配置为 * 号
// 因此这里的 * 号逻辑暂且跳过,直接看后面的具体接口的逻辑
if ("*".equals(url.getServiceInterface())) {
// 省略其他部分代码...
}
// 能来到这里,说明 ServiceInterface 不是 * 号
// url.getServiceInterface() = com.hmilyylimh.cloud.facade.demo.DemoFacade
else {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
// toCategoriesPath(url) 得出来的集合有以下几种:
// 1、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/providers
// 2、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/configurators
// 3、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 向 zk 创建持久化目录,一种容错方式,担心目录被谁意外的干掉了
zkClient.create(path, false);
// !!!!!!!!!!!!!!!!
// 这段逻辑很重要了,添加对 path 目录的监听,
// 添加监听完成后,还能拿到 path 路径下所有的信息
// 那就意味着监听一旦添加完成,那么就能立马获取到该 DemoFacade 接口到底有多少个提供方节点
List<String> children = zkClient.addChildListener(path, zkListener);
// 将返回的信息全部添加到 urls 集合中
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 从 zk 拿到了所有的信息后,然后调用 notify 方法
// url.get(0) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say®ister-mode=interface&release=3.0.7&side=provider
// url.get(1) = empty://192.168.100.183/com.hmilyylimh.cloud.facade.demo.DemoFacade?application=dubbo-20-subscribe-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=11560&qos.enable=false&release=3.0.7&side=consumer&sticky=false×tamp=1670846788876
// url.get(2) = empty://192.168.100.183/com.hmilyylimh.cloud.facade.demo.DemoFacade?application=dubbo-20-subscribe-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=11560&qos.enable=false&release=3.0.7&side=consumer&sticky=false×tamp=1670846788876
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
再次回到了这段代码中,这次我们是有针对性地寻找监听器类了,眼尖的你,相信很快就看到了 RegistryChildListenerImpl 这个类,从名字上看,是子节点的监听实现类,既然能在这段实现全流程逻辑,那毫无疑问得进入 RegistryChildListenerImpl 这个类去看看。
///////////////////////////////////////////////////
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.RegistryChildListenerImpl
// 子节点目录监听变更类
///////////////////////////////////////////////////
private class RegistryChildListenerImpl implements ChildListener {
private RegistryNotifier notifier;
private long lastExecuteTime;
private volatile CountDownLatch latch;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
this.latch = latch;
notifier = new RegistryNotifier(getUrl(), ZookeeperRegistry.this.getDelay()) {
@Override
public void notify(Object rawAddresses) {
// 获取延时时间
long delayTime = getDelayTime();
// 若延时时间小于或等于 0 的话,就直接执行 RegistryNotifier 内部类的 doNotify 方法
if (delayTime <= 0) {
this.doNotify(rawAddresses);
} else {
// 否则就计算【当前时间】减去【上一次执行时间】得到一个差值
// 若 delayTime 大于这个差值的话,那就继续睡眠下
// 否则又立马执行 RegistryNotifier 内部类的 doNotify 方法
long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
}
}
lastExecuteTime = System.currentTimeMillis();
this.doNotify(rawAddresses);
}
}
@Override
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
}
};
}
// 设置计数器概念的同步工具类
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
// 感知到节点有变更,该方法是别其他地方回调的
@Override
public void childChanged(String path, List<String> children) {
// path = /dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/providers
// children.get(0) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=8552®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859852464
// children.get(1) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=10568®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859874187
// children.get(2) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=12988®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859893317
try {
latch.await();
} catch (InterruptedException e) {
logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
}
// 节点变更通知,直接调用的是上面 RegistryNotifier 内部类中的 notify 方法
notifier.notify(children);
}
}
通过这段代码我们看到了若 childChanged 方法发生了回调,那么最终会调用到 ZookeeperRegistry 的 notify 方法。虽然我们找到了 Dubbo 针对 zookeeper 目录监听回调的地方,但是到底是谁调用了这个 childChanged 方法呢?
这个时候,又得使出我们的 Debug 绝招了,你可以按步骤操作。
- 步骤一,启动提供方。
- 步骤二,启动消费方。
- 步骤三,在 RegistryChildListenerImpl 类中的 childChanged 方法打个断点。
- 步骤四,关闭提供方。
- 步骤五,启动提供方。
就能看到断点的到来了。
通过对断点调用栈的分析,真相也就清晰了,原来 dubbo 框架引用了 curator 插件来与 zookeeper 进行交互,curator 插件会实时感知 zookeeper 服务端发送的事件变更,然后 curator 再通过一定的回调处理,就调用到了 RegistryChildListenerImpl 类中的 childChanged 方法。