我是靠谱客的博主 柔弱电脑,最近开发中收集的这篇文章主要介绍通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Java项目常使用ThreadPoolExecutor创建线程池,核心参数包含corePoolSize,maximumPoolSize,workQueue。我们希望构建的线程池能满足如下条件:

  1. 线程数量可控。需要设置一个最大线程数量maximumPoolSize,防止线程无限制创建,耗尽系统资源。
  2. 放到线程池中的任务不会被拒绝丢弃(任务被丢弃,将导致严重的业务BUG)。所以一般定义一个无界阻塞队列(不指定大小,容量最大值是Integer.MAX_VALUE),用于缓存待执行的任务。

无界队列导致maximumPoolSize无效

ThreadPoolExecutor的execute() 方法逻辑如下:新增一个任务先判断核心线程是否有空闲,核心线程有空闲则交给核心线程执行;没有空闲的核心线程则将任务放入阻塞队列。

1. 任务执行三步骤

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
		
		// 1. 有空闲核心线程,则交给核心线程执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
		
		// 2. 将任务加到队列,等待空闲线程消费
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        
        // 3. 尝试创建非核心线程执行任务
        else if (!addWorker(command, false))
            reject(command);
    }

2. 无界队列workQueue.offer(command)永远返回true

以LinkedBlockingDeque来看一下其offer的执行过程:

  1. ThreadPoolExecutor.execute()调用LinkedBlockingDeque.offer()。
  2. LinkedBlockingDeque.offer()调用LinkedBlockingDeque.offerLast()。
  3. LinkedBlockingDeque.offerLast()调用LinkedBlockingDeque.linkLast()。
    private boolean linkLast(Node<E> node) {
        // 超出队列容量时,添加失败。返回false。
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }

对于无界队列需要满足count >= Integer.MAX_VALUE才会返回false,触发创建非核心线程。这个条件基本上无法达到,所以线程池表现出来的就是只有核心线程在工作。

自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程

考虑:从调用链来看,我们可以重写linkLast方法,修改if (count >= capacity)判断条件。但linkLast()是私有方法,子类无法重写;更上层的offerLast()是public方法,但依赖内部类Node。再往上看offer(), offer()直接调用了offerLast()方法,没有依赖LinkedBlockingDequene内部定义的私有模型,符合改写条件。

将自定义的ThreadTaskLinkedBlockingQueue作为线程池工具的私有内部类定义,对外隐藏实现细节。

package elon.threadpool.util;

import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池工具类。
 *
 * @author elon
 * @since 2021/11/6
 */
public class ElonThreadPoolUtils {
    private static final Logger LOGGER = LogManager.getLogger(ElonThreadPoolUtils.class);

    private static int corePoolSize = 10;

    private static int maximumPoolSize = 100;

    private static ThreadPoolExecutor poolExecutor = null;

    private static ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();

    public static void initThreadPool(int corePoolSize, int maximumPoolSize){
        ElonThreadPoolUtils.corePoolSize = corePoolSize;
        ElonThreadPoolUtils.maximumPoolSize = maximumPoolSize;

        poolExecutor = new ThreadPoolExecutor(ElonThreadPoolUtils.corePoolSize, ElonThreadPoolUtils.maximumPoolSize, 10,
                TimeUnit.SECONDS, queue);

        LOGGER.info("[ElonThreadPoolUtils]Init thread pool success. corePoolSize:{}|maximumPoolSize:{}", corePoolSize,
                maximumPoolSize);
    }

    synchronized public static void executeTask(Runnable task){
        int activeThreadNum = poolExecutor.getActiveCount();
        LOGGER.info("[ElonThreadPoolUtils]Number of active threads:{}", activeThreadNum);
        LOGGER.info("[ElonThreadPoolUtils]The number of tasks waiting to be processed in the queue:{}", queue.size());

        poolExecutor.execute(task);
    }

    /**
     * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
     * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
     *
     * @author elon
     * @since 2021/11/6
     */
    @Setter
    private static class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
        @Override
        public boolean offer(E e) {
            int activeThreadNum = poolExecutor.getActiveCount();
            if (activeThreadNum < maximumPoolSize) {
                return false;
            }

            return offerLast(e);
        }
    }
}

关键逻辑是改写了offer()的实现,在调用offerLast()前增加了判断:如果线程池活跃线程数量小于最大线程数,新增任务直接返回false;不放到队列中。从而触发线程池创建非核心线程执行任务。

如果活跃线程数等于最大线程数,任务将放到队列中等待空闲线程来消费。

测试代码

  1. 定义测试用的线程任务类
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * 线程任务
 *
 * @author elon
 * @since 2021/11/6
 */
public class ThreadTask implements Runnable {
    private static final Logger LOGGER = LogManager.getLogger(ThreadTask.class);

    private final String threadName;

    public ThreadTask(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public void run() {
        LOGGER.info("threadName:{}", threadName);
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 创建线程池,执行1000个任务
import elon.threadpool.service.ThreadTask;
import elon.threadpool.util.ElonThreadPoolUtils;

/**
 * 应用启动类
 *
 * @author elon
 * @since 2021/11/6
 */
public class StartupThreadPool {
    public static void main(String[] args) {
        ElonThreadPoolUtils.initThreadPool(10, 100);
        for (int i = 1; i <= 1000; ++i) {
            ElonThreadPoolUtils.executeTask(new ThreadTask(String.valueOf(i)));
        }
    }
}

从控制台打印的日志可以看到,有100个线程在并发执行。

源码地址:https://github.com/ylforever/elon-threadpool

最后

以上就是柔弱电脑为你收集整理的通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码的全部内容,希望文章能够帮你解决通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务无界队列导致maximumPoolSize无效自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程测试代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部