概述
最近全投了一个难度很大的项目,使用的还是不太熟悉的C语言,目前终于基本开发完了,近期没那么忙了,这期间之前的一个项目在线程池部分出现了问题,根本原因还是对线程池的理解不够深引起的,因此今天分析下线程池的使用和源码。
上一个项目创建线程池使用的是:
Executors.newScheduledThreadPool()
即:ScheduledThreadPoolExecutor,这个类中schedule的实现如下:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
将提交的task包装成ScheduleFutureTask,再通过delayedExecute执行:
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
首先将任务放入等待队列(workQueue),之后再次检查线程池的状态来判断是否需要取消任务。如果线程池被关闭了,canRunInCurrentRunState返回false,那么从任务队列中取出任务。
canRunInCurrentRunState会根据任务是否是周期性来返回不同的状态值:
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
ScheduledThreadPoolExecutor的任务队列是DelayedWorkQueue实现的,这个类的代码有450行,这里不全部贴出,只给出部分的代码。
任务的存储形式是一个数组,实际上是一个以task的time为索引的二叉树:
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
当添加任务时,会调用offer方法:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 当数组达到最大容量的时候,进行动态的扩容
if (i >= queue.length)
grow();
size = i + 1;
// 如果数组是空的,直接将task放入索引为0的位置即可
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 如果数组中原本有值,需要调整这个二叉树
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
简单逻辑在注释中已做说明,siftUp函数的实现如下,本质上是一个堆排序:
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
首先通过减一后向右移一位的方法来获取当前索引的parent结点。
接下来通过while循环寻找合入的task位置,比较的值就是task的time值:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
同理当线程池取出任务时,也会进行堆的调整:
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* @param f the task to remove and return
*/
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
// 通过同样的移位操作选择出left child结点
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
// 如果left结点的time值大于right结点, 将标记变量指向right结点的RunnableScheduledFuture
c = queue[child = right];
// 如果当前标记结点的time相等或小于最末位的结点,break
if (key.compareTo(c) <= 0)
break;
// 否则将当前的选取出的结点上移到parent结点的位置
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
接下来围绕这个任务队列分析一个线程池的运行原理:
首先介绍下线程池中很重要的一个变量:ctl
这个变量的高3位用了表示线程池的状态,分别是:RUNNING、SHUTDOWN、、STOP、TIDYING和TERMINATED。
* RUNNING: 接受新任务并处理排队的任务
* SHUTDOWN: 不接受新任务,但处理排队的任务
* STOP: 不接受新任务,不处理排队的任务,并中断正在进行的任务
* TIDYING: 所有任务已终止,workerCount为零,线程过渡到TIDYING状态将运行终止的()挂钩方法
* TERMINATED: terminate()已完成
而剩下的29位用来表示线程的数量。
在线程池完成初始化操作之后,会执行runWorker从任务队列中不停的取出任务来执行:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果一直没有获取到任务,说明该线程是需要被回收的
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 首先判断线程池的状态,如果处于被关闭的状态且任务队列为空,或者状态为STOP、TIDYING、TERMINATED时,返回空任务,并将线程数量减1
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 当前线程数大于corePoolSize或者allowCoreThreadTimeOut为true时,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 如果工作线程大于1 ,任务队列为空时出现如下场景,线程池的数量会减1
* 1.当前线程数大于最大线程数
* 2.等待task超时
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 如果timed为true,需要通过poll(keepAliveTime, timeUnit)来获取,因为这时需要对闲置线程进行超时回收
// 如果timed为false,直接从任务队列中取出数据即可
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 标记等待超时,该结果会直接影响下一次循环的逻辑
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
compareAndDecrementWorkerCount的实现如下:
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
其中poll是等待固定时间的取任务函数:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
可以看到如果任务队列为空的话,会等待指定的时间后再次尝试获取,如果依旧为空的话,则不会再等待,直接返回null,之后就会进入线程的回收逻辑。
take函数的实现如下:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
/*
* 获取任务队列中的首任务,
* 由于任务队列是根据等待时间排序的,
* 首任务一定是所需要的等待时间最少的任务
*/
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// getDelay会返回所剩的等待时间
long delay = first.getDelay(NANOSECONDS);
// 如果任务已到达等待时间直接返回该任务即可
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
/*
* 如果还需要继续等待再执行该任务,
* available实例锁会阻塞delay指定的时间,
* delay时间后重新执行for循环来获取任务
* 这期间如果有新的任务进入队列
* available.signal会被调用,
* 此时会优先执行不需要等待、或者是等待时间短的任务
*/
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
最后
以上就是拼搏宝马为你收集整理的Android ScheduleExecutorService源码解析的全部内容,希望文章能够帮你解决Android ScheduleExecutorService源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复