26 线程池扩展:如何选择Dubbo线程池?
你好,我是何辉。今天我们继续学习Dubbo拓展的第四篇,线程池扩展。
提到线程池,我们在前面“ 异步化实践”中通过“线程池耗尽”这个现象已经接触到了,Dubbo 采用默认的线程池,也就是 200 个核心线程,来提供服务,其实我们已经用得非常舒服了,业务正常运转没什么阻碍。
不过你知道吗,Dubbo 框架里面其实有 4 种线程池,那其他的线程池存在的意义是什么,我们在使用时该怎么选择呢?
带着这个问题,今天我们来点轻松的,带你掌握 Dubbo 4 种线程池的用法。
线程池原理
在具体分析每一种线程池之前,我们还是回忆一下创建线程池的核心代码参数,看一看添加任务到线程池的大致工作原理是什么样的。
///////////////////////////////////////////////////
// java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
// 线程池最核心的构造方法
///////////////////////////////////////////////////
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 核心线程数量小于 0,则抛出参数非法异常
if (corePoolSize < 0 ||
// 最大线程数量小于等于 0,则抛出参数非法异常
maximumPoolSize <= 0 ||
// 最大线程数量小于核心线程数量,则抛出参数非法异常
maximumPoolSize < corePoolSize ||
// 非核心线程空闲时的存活时间小于0,则抛出参数非法异常
keepAliveTime < 0)
throw new IllegalArgumentException();
// 没有配置任务队列、没有配置创建线程的工厂、没有配置拒绝策略,一律抛出参数非法异常
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 剩下的就是一些入参的赋值逻辑了
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这么一个偌大的构造方法,含有 7 个参数,有点多。不过没关系,我们还是用图来辅助梳理逻辑。
假设主线程一直在不停地调用线程池的 execute 方法添加任务。
① 当处理任务的核心线程数量小于 corePoolSize,那就优先把任务分派给核心线程干活。
② 当处理任务的核心线程数量恰好等于 corePoolSize,那就继续把任务添加至 BlockingQueue 阻塞队列中等待。
③ 若任务也把阻塞队列堆满了,就继续尝试把任务分配给非核心线程去处理。
④ 若核心线程数量 + 非核心线程数量恰好为最大线程数量(maximumPoolSize),这个时候,如果再添加任务,线程池就吃不消了,所以会采取一定的拒绝策略来应对。
拒绝策略主要有 4 种。
- 策略一:拒绝添加新任务并抛出异常(AbortPolicy), 同时也是默认的拒绝策略。
- 策略二:让调用方线程执行新任务(CallerRunsPolicy)。
- 策略三:抛弃新任务(DiscardPolicy)。
- 策略四:丢弃最早入队的任务然后添加新任务(DiscardOldestPolicy)。
Dubbo 线程池
了解了线程池的原理,我们分析 Dubbo 的四种线程池就比较轻松了,因为 Dubbo 线程池的创建方式,就是基于我们刚刚梳理的拥有 7 个参数的线程池构造方法,我们一个一个看,分别是:FixedThreadPool、LimitedThreadPool、CachedThreadPool、EagerThreadPool。
1. FixedThreadPool
第一个,固定线程数量的线程池,老规矩,我们直接看这个线程池在源码层面是怎么体现的。
///////////////////////////////////////////////////
// org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
// 固定线程数量的线程池
///////////////////////////////////////////////////
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 获取一些参数变量
String name = url.getParameter("threadname", (String) url.getAttribute("threadname", "Dubbo"));
int threads = url.getParameter("threads", 200);
int queues = url.getParameter("queues", 0);
// 调用创建线程池的构造方法
return new ThreadPoolExecutor(
// 核心线程数量:threads
threads,
// 最大线程数量:threads
threads,
// 非核心线程空闲时的存活时间等于0
0,
// 非核心线程空闲时的存活时间等于0,单位:毫秒
TimeUnit.MILLISECONDS,
// 存放任务的阻塞队列
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
// 创建线程的工厂
new NamedInternalThreadFactory(name, true),
// 带有导出线程堆栈的拒绝策略,内部继承了 AbortPolicy 抛异常策略
new AbortPolicyWithReport(name, url)
);
}
}
从这段简短的创建线程池的代码中,我们能得出 3 点结论。
- 第一, 核心线程数量与最大线程数量是一致的。这说明只有核心线程的存在,没有非核心线程的存在,而且在没有人为干预设置 threads 属性的情况下,默认核心线程数是 200,这也是 Dubbo 默认线程池数量是 200 个的由来。
- 第二,采用的阻塞队列,是根据队列长度 queues 属性值来确定。长度等于 0 使用同步队列(SynchronousQueue),长度小于 0 使用无界阻塞队列,长度大于 0 使用有界队列。
- 第三,拒绝策略使用的是默认的抛异常策略。不过,这个拒绝策略是经过框架特殊包装处理的,发现拒绝添加任务时,这个策略有导出线程堆栈的能力,特别适合开发人员分析线程池满时的一些实时状况。
从第一点结论我们不难发现,核心线程与最大线程数量等价,也说明了线程数量是固定不变的,所以这个线程池,我们就叫做 固定线程数量的线程池(FixedThreadPool)。
固定数量的默认值是 200,也就是说在某时刻最大能并行处理 200 个任务,假设每个任务耗时 1 秒,相当于这台机器的单机 QPS = 200,假设每个任务耗时 5 秒,那这台机器的单机 QPS = 40。
这样计算出来的 QPS 你可能感触不太强烈,不过你可以认真观察下,自己负责的应用服务器单机 QPS 情况,对比一下数值。你会发现,原来 Dubbo 设置默认 200 个线程也不是没有道理的,这是经过大量验证后的折中取值,能满足大多数业务场景的诉求。所以,如果我们没有特别的诉求,也别去瞎折腾设置什么线程池数量,直接使用 Dubbo 默认的线程池数量就好了。
2. LimitedThreadPool
不过你也许会说:我这系统又没多大的量,200 个线程有点浪费,量大的时候 QPS 也不过十几,量低的时候 QPS 几乎为零,而且也不好预估量低的时候设置多少合适。
对于这样的疑惑,需要 Dubbo 的第二个线程池出马。看代码。
///////////////////////////////////////////////////
// org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
// 有限制数量的线程池
///////////////////////////////////////////////////
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 获取一些参数变量
String name = url.getParameter("threadname", (String) url.getAttribute("threadname", "Dubbo"));
int cores = url.getParameter("corethreads", 0);
int threads = url.getParameter("threads", 200);
int queues = url.getParameter("queues", 0);
// 调用创建线程池的构造方法
return new ThreadPoolExecutor(
// 核心线程数量:cores
cores,
// 最大线程数量:threads
threads,
// 非核心线程空闲时的永久存活
Long.MAX_VALUE,
// 非核心线程空闲时的存活时间,单位:毫秒
TimeUnit.MILLISECONDS,
// 存放任务的阻塞队列
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
// 创建线程的工厂
new NamedInternalThreadFactory(name, true),
// 带有导出线程堆栈的拒绝策略,内部继承了 AbortPolicy 抛异常策略
new AbortPolicyWithReport(name, url)
);
}
}
看过固定数量的线程池后,我们再看这个有限制数量的线程池源码,就非常得心应手,可以得出 2 点结论。
- 第一,核心线程数是由一个单独的 corethreads 属性来赋值的,默认值为 0。最大线程数是由 threads 属性来赋值的,默认值为 200。 这说明默认情况下没有核心线程,非核心线程数量最大也不能超过 200。
- 第二,非核心线程的 keepAliveTime 存活时间为 Long 类型的最大值,也就是说永不过期,一旦非核心线程创建出来了,只要不出现什么意外,就会一直存活,有任务就处理任务,没任务就躺在那里休息。
从核心线程与最大线程数量分开设置的情况来看, 这个线程池的数量分配,算是配合业务有弹性的分配,如果业务量并不大,分配了多少就使用多少,如果业务量稍微再增长一点,那就继续分配非核心线程继续干活,按照默认情况分配 200 个就封顶了。
所以对于那些量小,不知道最初设置多少核心线程合适的,又不想浪费创建过多的线程的时候,我们可以考虑使用这款有限制数量的线程池(LimitedThreadPool)。
3. CachedThreadPool
不过LimitedThreadPool用着用着,你可能会觉得非核心线程一直闲着不回收,会占用内存,又琢磨着怎么让线程在有大量任务来时大量创建线程应对,当任务处理完后,非核心线程都闲置时默默地销毁掉。
这里 Dubbo 也有一款缓存线程池,我们来看它的源码实现。
///////////////////////////////////////////////////
// org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
// 缓存一定数量的线程池
///////////////////////////////////////////////////
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 获取一些参数变量
String name = url.getParameter("threadname", (String) url.getAttribute("threadname", "Dubbo"));
int cores = url.getParameter("corethreads", 0);
int threads = url.getParameter("threads", Integer.MAX_VALUE);
int queues = url.getParameter("queues", 0);
int alive = url.getParameter("alive", 60 * 1000);
// 调用创建线程池的构造方法
return new ThreadPoolExecutor(
// 核心线程数量:cores
cores,
// 最大线程数量:threads
threads,
// 非核心线程空闲时的存活时间
alive,
// 非核心线程空闲时的存活时间,单位:毫秒
TimeUnit.MILLISECONDS,
// 存放任务的阻塞队列
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
// 创建线程的工厂
new NamedInternalThreadFactory(name, true),
// 带有导出线程堆栈的拒绝策略,内部继承了 AbortPolicy 抛异常策略
new AbortPolicyWithReport(name, url)
);
}
}
对比有数量限制的线程池(LimitedThreadPool),CachedThreadPool 最大的变化就在于创建线程池对象的时候,支持 alive 属性来赋值非核心线程的空闲时的存活时间,默认存活时间是 1 分钟,也就是说, 一旦非核心线程自带了销毁功能,也就变成了缓存线程池了。
所以,当任务突增,你期望可以开辟非核心线程来执行,等到任务量降下去,你又不希望非核心线程占用空间,期望非核心线程自动销毁的话,可以考虑这个缓存线程池。
4. EagerThreadPool
不过系统总是千奇百怪,每个系统遇到的情况都不一样,你可能会对目前的线程池挑出一些毛病,比如在线程池工作原理图中,分支①发现核心线程都在处理任务的时候,对于新添加的任务你不期望走分支②逻辑,而是期望走分支③逻辑。这该怎么办呢?
别着急,你的需求Dubbo 框架也考虑过,我们看 Dubbo 的第四个线程池。
///////////////////////////////////////////////////
// org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
// 渴望数量的线程池
///////////////////////////////////////////////////
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 获取一些参数变量
String name = url.getParameter("threadname", (String) url.getAttribute("threadname", "Dubbo"));
int cores = url.getParameter("corethreads", 0);
int threads = url.getParameter("threads", Integer.MAX_VALUE);
int queues = url.getParameter("queues", 0);
int alive = url.getParameter("alive", 60 * 1000);
// 初始化队列和线程池
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(
// 核心线程数量:cores
cores,
// 最大线程数量:threads
threads,
// 非核心线程空闲时的存活时间
alive,
// 非核心线程空闲时的存活时间,单位:毫秒
TimeUnit.MILLISECONDS,
// 存放任务的阻塞队列
taskQueue,
// 创建线程的工厂
new NamedInternalThreadFactory(name, true),
// 带有导出线程堆栈的拒绝策略,内部继承了 AbortPolicy 抛异常策略
new AbortPolicyWithReport(name, url));
// 将队列和线程池建立联系
taskQueue.setExecutor(executor);
return executor;
}
}
↓
///////////////////////////////////////////////////
// org.apache.dubbo.common.threadpool.support.eager.TaskQueue#offer
// 尝试添加任务至队列
///////////////////////////////////////////////////
@Override
public boolean offer(Runnable runnable) {
// 参数必要性检查,若线程池对象为 null 则抛出异常
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 获取线程池中工作线程 worker 的数量
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
// 若线程池中活跃的数量小于 worker 的数量,
// 说明有些 worker 是闲置状态,没有活干
// 因此把任务添加到队列后,线程就有机会被分派到任务继续干活了
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
// 还能来到这里,说明目前所有的 worker 都在处于工作状态
// 那么继续看 worker 的数量和最大线程数量想比,若偏小的话
// 那么就返回 false 表示需要继续创建 worker 来干活
// 至于为什么返回 false 就能创建 worker 来继续干活,请看下面的 execute 方法
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
// 还能来到这里,说明已经达到了最大线程数量了,
// 那该放队列就放队列,队列放不下的的话,又没有非核心线程了,那就走拒绝策略了
return super.offer(runnable);
}
↓
///////////////////////////////////////////////////
// java.util.concurrent.ThreadPoolExecutor#execute
// 线程池添加任务的方法
// 解释:currentPoolThreadSize < executor.getMaximumPoolSize() 这行代码
// 为什么返回 false 就能创建 worker 来继续干活
// 原理:在 workQueue.offer(command) 返回 false 后继续走下面的
// else if (!addWorker(command, false)) 尝试添加 worker 工作线程,
// 添加成功了,那就执行任务,添加不成功了,说明已达到了最大线程数量,走拒绝策略
///////////////////////////////////////////////////
public void execute(Runnable command) {
// 若任务 command 对象为 null 的话,是不合法的,直接抛出 NPE 异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 若工作线程的数量小于核心线程的数量的话
if (workerCountOf(c) < corePoolSize) {
// 则添加核心线程,addWorker(command, true) 中的 true 表示创建核心线程
// 添加成功就结束该 execute 方法流程了
if (addWorker(command, true))
return;
c = ctl.get();
}
// 若还能来到这里,说明工作线程数量已经达到了核心线程的数量了
// 再来的任务就只能尝试添加至任务阻塞队列了
// 调用队列的 offer 方法尝试看看能否添加至任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 若添加成功了,但是呢,线程池不处于运行状态,还得将任务移除,
// 然后执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 还能来到这里,说明线程池处于运行状态,但是尝试添加至队列 offer 失败了
// 那么就再次尝试调用 addWorker(command, false) 来创建非核心线程来执行任务
// 尝试添加失败的话,再走拒绝策略
else if (!addWorker(command, false))
reject(command);
}
渴望数量的线程池(EagerThreadPool)源码,非常有亮点,可以说一定程度上打破了线程池固有的流程机制。
第一,核心线程数是由一个单独的 corethreads 属性来赋值的,默认值为 0。最大线程数是由 threads 属性来赋值的,默认值为 200,非核心线程的存活时间是由 alive 属性来赋值的,基本上都交给了用户自由设置指定。
第二,任务阻塞队列,既不是 SynchronousQueue,也不是 LinkedBlockingQueue,而是重新设计了一款新的阻塞队列 TaskQueue 放到了线程池中。
新的阻塞队列 TaskQueue,亮点在于 重写了 LinkedBlockingQueue 的 offer 方法,只要活跃的工作线程数量小于最大线程数量,就优先创建工作线程来处理任务。这个 offer 方法的优秀设计,主要源于对线程池 execute 方法的深度研究,利用任务入队失败的方式,来促使线程池尝试创建新的工作线程,来快速处理新增的任务。
我也用一张图总结了EagerThreadPool的设计精髓。
在分支①中,核心线程都在工作时,新增的任务,会直接开辟非核心线程继续工作,如果非核心线程也容纳不下了,后面的流程就和正常的线程池工作原理一样了。
所以当你原本设定了一些核心线程提供服务,但是突如其来的任务,需要优先紧急处理,而不想放到队列里面等待,就可以考虑用这款渴望线程数量的线程池(EagerThreadPool)。
线程池监控
到这里,相信你对 Dubbo 的四种线程池提供的能力有一定了解了,接下来,我们就用一个监控案例来实践一下。
Dubbo 线程池什么时候耗尽,可能是无法预测的,但是我们可以监控,提前预防,比如当活跃线程数量与最大线程满足多少百分比时,记录一个打点(至于在打点平台如何设置告警策略,或者如何做后续应对,就是后话了)。
那如何提供一个打点的入口呢?
你也许会说,这个监控诉求还不简单,在“ SPI 机制”中就学过,通过 ExtensionLoader 中的 getExtension 方法,拿到指定扩展点名称的实现类,然后就可以拿到线程池对象了,有了线程池对象,再来做任何读取操作都是简单的。
用 Dubbo 框架中四种线程池任意一种行为方式都是一样的,这里我们就以默认的线程池为例。代码很简单。
// 将 ThreadPool.class 传入到 ExtensionLoader 对象中
ThreadPool threadPool = ExtensionLoader
.getExtensionLoader(ThreadPool.class)
// 然后获取指定扩展点名称的实现类,这里指定了拿默认的固定数量的线程池
.getExtension("fixed");
// 然后调用实现类的 getExecutor 方法拿到对应的线程池对象
Executor fixedExecutor = threadPool.getExecutor(...)
可是写着写着,有点不对劲了,我们拿到 fixed 对应的实现类后,再调用 getExecutor 方法,不是又创建了一个线程池对象?这可不太行,我们的目的是要针对 Dubbo 运行正在使用的线程池进行监控,现在不但没帮上忙,还又创建了一个新的线程池,得另找办法。
不能自己手动调用 getExecutor 方法,源码我们也不能随便更改,那有什么方式,可以在源码被调用的时候,顺便记录一下这个被创建的线程池呢?
还记得吗,在上一讲“ 注册扩展”中,我们仿照 ZookeeperRegistryFactory 的编写思路,能继承使用的,就继承使用,不能继承,可以简单粗暴地 copy 一份。
想到这里,思路就打开了,既然不能修改 FixedThreadPool 的逻辑,那我们 直接新建一个 MonitorFixedThreadPool 来继承 FixedThreadPool 应该不是什么难事。再拿到线程池对象就比较简单了,在新创建的类中重写 getExecutor 方法,把父类创建的线程池缓存起来。有了线程池对象,就再取出线程池的活跃线程与最大线程数量。
可是,这里注意细节,调用方不会时时刻刻都调用 getExecutor 方法,而我们要检测线程池的数量变化,就得专门有个后台线程定时检测一下,可是从哪里找这样具有定时效果的线程池呢?
我教你一个小技巧,我们使用 Collection 接口时,Collections 这个类中富含操作 Collection 的各种工具类;使用 List 接口时,Lists 类中也富含各种操作 List 的一些工具类;使用 Map 接口也是,Maps 这个类中也富含各种操作 Map 的一些工具类。所以, 当你使用 Executor 接口时,可以尝试在类的尾巴上添加一个字母“s”,看看有没有对应的工具类。
按照小技巧的方式,我们通过 Executors 寻找一下,果然 Executors 这个类存在于 JDK 中,里面也能找到关于定时轮询的 API。
现在,怎么扩展线程池解决了,怎么拿到线程池对象也解决了,定时轮询的 API 也找到了,万事俱备,只欠最后的轮询检测代码了。
///////////////////////////////////////////////////
// 自定义监控固定数量的线程池
///////////////////////////////////////////////////
@Slf4j
public class MonitorFixedThreadPool extends FixedThreadPool implements Runnable {
private static final Set<ThreadPoolExecutor> EXECUTOR_SET = new HashSet<>();
/** <h2>高水位线阈值</h2> **/
private static final double HIGH_WATER_MARK = 0.85;
// 默认的构造方法,借用该构造方法创建一个带有轮询机制的单线程池
public MonitorFixedThreadPool() {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(
// 当前的 MonitorFixedThreadPool 对象自己
this,
// 启动后 0 秒执行一次
0,
// 每间隔 30 秒轮询检测一次
30,
// 单位:秒
TimeUnit.SECONDS
);
}
// 重写了父类的 FixedThreadPool 的 getExecutor 方法
// 然后择机将返回值 executor 存储起来了
@Override
public Executor getExecutor(URL url) {
// 通过 super 直接调用父类的方法,拿到结果
Executor executor = super.getExecutor(url);
// 针对结果进行缓存处理
if (executor instanceof ThreadPoolExecutor) {
EXECUTOR_SET.add((ThreadPoolExecutor) executor);
}
return executor;
}
@Override
public void run() {
// 每隔 30 秒,这个 run 方法被触发执行一次
for (ThreadPoolExecutor executor : EXECUTOR_SET) {
// 循环检测每隔线程池是否超越高水位线
doCheck(executor);
}
}
// 检测方法
private void doCheck(ThreadPoolExecutor executor) {
final int activeCount = executor.getActiveCount();
int maximumPoolSize = executor.getMaximumPoolSize();
double percent = activeCount / (maximumPoolSize * 1.0);
// 判断计算出来的值,是否大于高水位线
if (percent > HIGH_WATER_MARK) {
log.info("溢出高水位线:activeCount={}, maximumPoolSize={}, percent={}",
activeCount, maximumPoolSize, percent);
// 记录打点,将该信息同步值 Cat 监控平台
CatUtils.logEvent("线程池溢出高水位线",
executor.getClass().getName(),
"1", buildCatLogDetails(executor));
}
}
}
///////////////////////////////////////////////////
// 资源目录文件
// 路径为:/META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool
///////////////////////////////////////////////////
monitorfixed=com.hmilyylimh.cloud.threadpool.config.MonitorFixedThreadPool
///////////////////////////////////////////////////
// 修改 Java 代码配置类指定使用该监控线程池
// 或
// dubbo.provider.threadpool=monitorfixed
///////////////////////////////////////////////////
@Bean
public ProtocolConfig protocolConfig(){
ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", 28260);
protocolConfig.setThreadpool("monitorfixed");
return protocolConfig;
}
思路清晰后,写代码并不是很难,关注 4 个重要的点。
- 在重写 getExecutor 的方法逻辑中,通过调用父类的 getExecutor 方法拿到线程池对象,并把线程池对象缓存起来。
- 巧妙利用构造方法,一次性生成带有轮询机制的单线程池对象。
- 在轮询检测中,计算出来的百分比,如果大于预设的高水位线,就记录打点通知 Cat 监控平台。
- 代码编写完后,记得 SPI 文件的配置,以及指定使用新的监控线程池。
总结
今天我们回忆了线程池添加任务的底层原理,可以总结成四部曲。
- 第一步,新添加的任务,优先分配给核心线程处理。
- 第二步,若核心线程数量满,把任务添加至阻塞队列中。
- 第三步,若阻塞队列也溢出,把任务分配给非核心线程处理。
- 第四步,若非核心线程数量也满,走预先设置好的拒绝策略。
基于线程池创建的核心原理,我们分析了 Dubbo 四种线程池的特点和使用场景,FixedThreadPool、LimitedThreadPool、CachedThreadPool、EagerThreadPool。不过线程池各有各的好处,也各有弊端。
- FixedThreadPool,创建固定大小的线程池。默认情况下使用的是同步队列,你可以理解成没有缓冲等待队列,所以极端情况下遇到大量任务时,可能存在阻塞的情况。
- LimitedThreadPool,有限定数量的线程池,你也可以理解为可伸缩线程池,但是随着任务的增加,线程池中的线程数只增不减,在流量低峰期就造成了一定的内存浪费,线程池无法得到充分的利用。
- CachedThreadPool,具有一定缓存能力的线程池。因为最大线程数量的默认值是 Integer 的最大值,如果遇到大量任务,会创建大量的线程,对系统的 CPU 会造成一定的冲击压力,线程越多,反而让系统的效率低下。
- EagerThreadPool,渴望更多线程来干活的线程池。应对流量高峰期非常好,可以迅速增加线程提供服务,但同时也打破了线程池原本的机制流程,有一种欺骗线程池的感觉,不过这种设计非常巧妙,也算是一种善意的欺骗吧。
思考题
留个作业给你,针对 Dubbo 的线程池进行扩展你已经会了,那针对线程池本身,任务扔到线程池后,在真正执行之前和之后,可以进行扩展么?
期待在留言区看到你的思考,参与讨论。如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起讨论,可能就帮TA解决了一个困惑。我们下一讲见。
25 思考题参考
上一期留了个作业,在消费方自定义一个集群扩展器,通过从接收请求的上下文对象中拿到 routeNo 属性值,进行分组过滤,然后使用过滤后的 invoker 进行后续的调用流程。
在之前的分析中,我们已经知道了 FailoverClusterInvoker 持有了多个 invoker 对象,而且这个 FailoverClusterInvoker 逻辑很多,我们就自定义一个类 RouteNoClusterInvoker 来继承 FailoverClusterInvoker 类,然后重写 list 方法,通过子类的自定义逻辑,来改变负载均衡的集合范围。
大体上逻辑也比较简单,看实现后的代码。
///////////////////////////////////////////////////
// 名称:路由编码集群扩展实现类
// 功能:类比于 FailoverCluster 故障自动转移集群扩展器
///////////////////////////////////////////////////
public class RouteNoCluster extends AbstractCluster {
public final static String NAME = "routeNo";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new RouteNoClusterInvoker<>(directory);
}
}
///////////////////////////////////////////////////
// 名称:路由编码集群扩展实现类的 invoker 核心处理对象
// 功能:将 invokers 按照 routeNo 进行分组得到一个 map,再从 map 中取出对应集合即可
///////////////////////////////////////////////////
public class RouteNoClusterInvoker<T> extends FailoverClusterInvoker<T> {
public RouteNoClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = super.list(invocation);
// TODO 把 invokers 按照 routeNo 进行分组,得到 Map<String, List<Invoker<T>>> 数据结构
// String routeNo = invoker.getUrl().getParameter("routeNo") 这样获取分发ID值
Map<String, List<Invoker<T>>> map = groupByRouteNo(invokers);
// 根据本地线程获取的 routeNo 的值,从 map 里面获取到对应选择的 invokers 列表
String routeNo = RouteNoThreadLocalUtils.get();
List<Invoker<T>> selectedInvokers = map.get(routeNo);
// 将选择出来的 selectedInvokers 返回,交给后续代码流程进行负载均衡处理等等
return Collections.unmodifiableList(selectedInvokers);
}
}
///////////////////////////////////////////////////
// 消费方资源目录文件
// 路径为:/META-INF/dubbo/org.apache.dubbo.rpc.cluster.Cluster
///////////////////////////////////////////////////
routeNo=com.hmilyylimh.cloud.registry.cluster.RouteNoCluster
///////////////////////////////////////////////////
// dubbo.properties,直接指定 dubbo.provider.cluster 的值
///////////////////////////////////////////////////
dubbo.provider.cluster=routeNo
///////////////////////////////////////////////////
// 名称:路由编码容器过滤器
// 功能:接收到前端请求后,将前端指定的路由编码设置到本地线程中,以方便后续逻辑使用
///////////////////////////////////////////////////
public class RouteNoContainerFilter implements Filter {
@Override
public void init(FilterConfig var1) throws ServletException {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException,
ServletException {
String routeNo = ((HttpServletRequest) request).getHeader("routeNo");
// TODO 想办法把 routeNo 的值存储到 ThreadLocal 中,以便该线程的其他地方也可以获取到
RouteNoThreadLocalUtils.set(routeNo);
filterChain.doFilter(request, response);
}
}
///////////////////////////////////////////////////
// 名称:路由编码消费方过滤器
// 功能:将本地线程的路由编码值取出来,然后发给提供方,这样提供方将来收到请求后继续路由选择具体的 invoker 对象进行调用
///////////////////////////////////////////////////
@Activate(group = CONSUMER, order = Integer.MIN_VALUE + 2000)
public class RouteNoConsumerFilter implements Filter, Filter.Listener {
public static final String TRACE_ID = "TRACE-ID";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 从本地线程对象中取出跟路由编码值
String routeNo = RouteNoThreadLocalUtils.get();
// 然后将路由编码值设置到请求对象中
invocation.getObjectAttachments().put("routeNo", routeNo);
RpcContext.getClientAttachment().setObjectAttachment("routeNo", routeNo);
// 继续后面过滤器的调用
return invoker.invoke(invocation);
}
}
实现后的代码主要关注 3 点。
第一,研究 FailoverCluster 的实现逻辑,仿照它自定义了一个 RouteNoCluster 而已。
第二,由于 FailoverCluster 与 FailoverClusterInvoker 是配套使用的,恰好 FailoverClusterInvoker 里面的逻辑一堆,所以,可以采用继承的方式,自定义一个 RouteNoClusterInvoker 类去继承 FailoverClusterInvoker 类,并重写了 list 方法改变负载均衡的范围。
第三,当前系统一定得想办法把前端发来的 routeNo 存储起来,以便在发起调用时,选择具体的invoker 对象,并且把 routeNo 传给提供方,让将来提供方调用别的系统时能派上用场。