Java项目常使用ThreadPoolExecutor创建线程池,核心参数包含corePoolSize,maximumPoolSize,workQueue。我们希望构建的线程池能满足如下条件:
- 线程数量可控。需要设置一个最大线程数量maximumPoolSize,防止线程无限制创建,耗尽系统资源。
- 放到线程池中的任务不会被拒绝丢弃(任务被丢弃,将导致严重的业务BUG)。所以一般定义一个无界阻塞队列(不指定大小,容量最大值是Integer.MAX_VALUE),用于缓存待执行的任务。
无界队列导致maximumPoolSize无效
ThreadPoolExecutor的execute() 方法逻辑如下:新增一个任务先判断核心线程是否有空闲,核心线程有空闲则交给核心线程执行;没有空闲的核心线程则将任务放入阻塞队列。
1. 任务执行三步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1. 有空闲核心线程,则交给核心线程执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2. 将任务加到队列,等待空闲线程消费 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 尝试创建非核心线程执行任务 else if (!addWorker(command, false)) reject(command); }
2. 无界队列workQueue.offer(command)永远返回true
以LinkedBlockingDeque来看一下其offer的执行过程:
- ThreadPoolExecutor.execute()调用LinkedBlockingDeque.offer()。
- LinkedBlockingDeque.offer()调用LinkedBlockingDeque.offerLast()。
- LinkedBlockingDeque.offerLast()调用LinkedBlockingDeque.linkLast()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private boolean linkLast(Node<E> node) { // 超出队列容量时,添加失败。返回false。 if (count >= capacity) return false; Node<E> l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; notEmpty.signal(); return true; }
对于无界队列需要满足count >= Integer.MAX_VALUE才会返回false,触发创建非核心线程。这个条件基本上无法达到,所以线程池表现出来的就是只有核心线程在工作。
自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程
考虑:从调用链来看,我们可以重写linkLast方法,修改if (count >= capacity)判断条件。但linkLast()是私有方法,子类无法重写;更上层的offerLast()是public方法,但依赖内部类Node。再往上看offer(), offer()直接调用了offerLast()方法,没有依赖LinkedBlockingDequene内部定义的私有模型,符合改写条件。
将自定义的ThreadTaskLinkedBlockingQueue作为线程池工具的私有内部类定义,对外隐藏实现细节。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68package elon.threadpool.util; import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池工具类。 * * @author elon * @since 2021/11/6 */ public class ElonThreadPoolUtils { private static final Logger LOGGER = LogManager.getLogger(ElonThreadPoolUtils.class); private static int corePoolSize = 10; private static int maximumPoolSize = 100; private static ThreadPoolExecutor poolExecutor = null; private static ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>(); public static void initThreadPool(int corePoolSize, int maximumPoolSize){ ElonThreadPoolUtils.corePoolSize = corePoolSize; ElonThreadPoolUtils.maximumPoolSize = maximumPoolSize; poolExecutor = new ThreadPoolExecutor(ElonThreadPoolUtils.corePoolSize, ElonThreadPoolUtils.maximumPoolSize, 10, TimeUnit.SECONDS, queue); LOGGER.info("[ElonThreadPoolUtils]Init thread pool success. corePoolSize:{}|maximumPoolSize:{}", corePoolSize, maximumPoolSize); } synchronized public static void executeTask(Runnable task){ int activeThreadNum = poolExecutor.getActiveCount(); LOGGER.info("[ElonThreadPoolUtils]Number of active threads:{}", activeThreadNum); LOGGER.info("[ElonThreadPoolUtils]The number of tasks waiting to be processed in the queue:{}", queue.size()); poolExecutor.execute(task); } /** * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理. * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。 * * @author elon * @since 2021/11/6 */ @Setter private static class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> { @Override public boolean offer(E e) { int activeThreadNum = poolExecutor.getActiveCount(); if (activeThreadNum < maximumPoolSize) { return false; } return offerLast(e); } } }
关键逻辑是改写了offer()的实现,在调用offerLast()前增加了判断:如果线程池活跃线程数量小于最大线程数,新增任务直接返回false;不放到队列中。从而触发线程池创建非核心线程执行任务。
如果活跃线程数等于最大线程数,任务将放到队列中等待空闲线程来消费。
测试代码
- 定义测试用的线程任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** * 线程任务 * * @author elon * @since 2021/11/6 */ public class ThreadTask implements Runnable { private static final Logger LOGGER = LogManager.getLogger(ThreadTask.class); private final String threadName; public ThreadTask(String threadName) { this.threadName = threadName; } @Override public void run() { LOGGER.info("threadName:{}", threadName); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 创建线程池,执行1000个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import elon.threadpool.service.ThreadTask; import elon.threadpool.util.ElonThreadPoolUtils; /** * 应用启动类 * * @author elon * @since 2021/11/6 */ public class StartupThreadPool { public static void main(String[] args) { ElonThreadPoolUtils.initThreadPool(10, 100); for (int i = 1; i <= 1000; ++i) { ElonThreadPoolUtils.executeTask(new ThreadTask(String.valueOf(i))); } } }
从控制台打印的日志可以看到,有100个线程在并发执行。
源码地址:https://github.com/ylforever/elon-threadpool
最后
以上就是柔弱电脑最近收集整理的关于通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码的全部内容,更多相关通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque内容请搜索靠谱客的其他文章。
发表评论 取消回复