概述
Java项目常使用ThreadPoolExecutor创建线程池,核心参数包含corePoolSize,maximumPoolSize,workQueue。我们希望构建的线程池能满足如下条件:
- 线程数量可控。需要设置一个最大线程数量maximumPoolSize,防止线程无限制创建,耗尽系统资源。
- 放到线程池中的任务不会被拒绝丢弃(任务被丢弃,将导致严重的业务BUG)。所以一般定义一个无界阻塞队列(不指定大小,容量最大值是Integer.MAX_VALUE),用于缓存待执行的任务。
无界队列导致maximumPoolSize无效
ThreadPoolExecutor的execute() 方法逻辑如下:新增一个任务先判断核心线程是否有空闲,核心线程有空闲则交给核心线程执行;没有空闲的核心线程则将任务放入阻塞队列。
1. 任务执行三步骤
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 有空闲核心线程,则交给核心线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 将任务加到队列,等待空闲线程消费
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 尝试创建非核心线程执行任务
else if (!addWorker(command, false))
reject(command);
}
2. 无界队列workQueue.offer(command)永远返回true
以LinkedBlockingDeque来看一下其offer的执行过程:
- ThreadPoolExecutor.execute()调用LinkedBlockingDeque.offer()。
- LinkedBlockingDeque.offer()调用LinkedBlockingDeque.offerLast()。
- LinkedBlockingDeque.offerLast()调用LinkedBlockingDeque.linkLast()。
private boolean linkLast(Node<E> node) {
// 超出队列容量时,添加失败。返回false。
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
对于无界队列需要满足count >= Integer.MAX_VALUE才会返回false,触发创建非核心线程。这个条件基本上无法达到,所以线程池表现出来的就是只有核心线程在工作。
自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程
考虑:从调用链来看,我们可以重写linkLast方法,修改if (count >= capacity)判断条件。但linkLast()是私有方法,子类无法重写;更上层的offerLast()是public方法,但依赖内部类Node。再往上看offer(), offer()直接调用了offerLast()方法,没有依赖LinkedBlockingDequene内部定义的私有模型,符合改写条件。
将自定义的ThreadTaskLinkedBlockingQueue作为线程池工具的私有内部类定义,对外隐藏实现细节。
package elon.threadpool.util;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池工具类。
*
* @author elon
* @since 2021/11/6
*/
public class ElonThreadPoolUtils {
private static final Logger LOGGER = LogManager.getLogger(ElonThreadPoolUtils.class);
private static int corePoolSize = 10;
private static int maximumPoolSize = 100;
private static ThreadPoolExecutor poolExecutor = null;
private static ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();
public static void initThreadPool(int corePoolSize, int maximumPoolSize){
ElonThreadPoolUtils.corePoolSize = corePoolSize;
ElonThreadPoolUtils.maximumPoolSize = maximumPoolSize;
poolExecutor = new ThreadPoolExecutor(ElonThreadPoolUtils.corePoolSize, ElonThreadPoolUtils.maximumPoolSize, 10,
TimeUnit.SECONDS, queue);
LOGGER.info("[ElonThreadPoolUtils]Init thread pool success. corePoolSize:{}|maximumPoolSize:{}", corePoolSize,
maximumPoolSize);
}
synchronized public static void executeTask(Runnable task){
int activeThreadNum = poolExecutor.getActiveCount();
LOGGER.info("[ElonThreadPoolUtils]Number of active threads:{}", activeThreadNum);
LOGGER.info("[ElonThreadPoolUtils]The number of tasks waiting to be processed in the queue:{}", queue.size());
poolExecutor.execute(task);
}
/**
* 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
* 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
*
* @author elon
* @since 2021/11/6
*/
@Setter
private static class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
@Override
public boolean offer(E e) {
int activeThreadNum = poolExecutor.getActiveCount();
if (activeThreadNum < maximumPoolSize) {
return false;
}
return offerLast(e);
}
}
}
关键逻辑是改写了offer()的实现,在调用offerLast()前增加了判断:如果线程池活跃线程数量小于最大线程数,新增任务直接返回false;不放到队列中。从而触发线程池创建非核心线程执行任务。
如果活跃线程数等于最大线程数,任务将放到队列中等待空闲线程来消费。
测试代码
- 定义测试用的线程任务类
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* 线程任务
*
* @author elon
* @since 2021/11/6
*/
public class ThreadTask implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ThreadTask.class);
private final String threadName;
public ThreadTask(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
LOGGER.info("threadName:{}", threadName);
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 创建线程池,执行1000个任务
import elon.threadpool.service.ThreadTask;
import elon.threadpool.util.ElonThreadPoolUtils;
/**
* 应用启动类
*
* @author elon
* @since 2021/11/6
*/
public class StartupThreadPool {
public static void main(String[] args) {
ElonThreadPoolUtils.initThreadPool(10, 100);
for (int i = 1; i <= 1000; ++i) {
ElonThreadPoolUtils.executeTask(new ThreadTask(String.valueOf(i)));
}
}
}
从控制台打印的日志可以看到,有100个线程在并发执行。
源码地址:https://github.com/ylforever/elon-threadpool
最后
以上就是柔弱电脑为你收集整理的通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码的全部内容,希望文章能够帮你解决通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复