2019独角兽企业重金招聘Python工程师标准>>>
线程池相关
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803package java.util.concurrent; import java.util.AbstractQueue; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import static java.util.concurrent.TimeUnit.NANOSECONDS; public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true; private volatile boolean removeOnCancel = false; private static final AtomicLong sequencer = new AtomicLong(); private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; private long time; private final long period; RunnableScheduledFuture<V> outerTask = this; int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } public boolean isPeriodic() { return period != 0; } private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; } public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } } static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; private Thread leader = null; private final Condition available = lock.newCondition(); private void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask) f).heapIndex = idx; } private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); } private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); } private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); } private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; // Sanity check; x could conceivably be a // ScheduledFutureTask from some other pool. if (i >= 0 && i < size && queue[i] == x) return i; } else { for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; } public boolean contains(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { return indexOf(x) != -1; } finally { lock.unlock(); } } public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } public boolean isEmpty() { return size() == 0; } public int remainingCapacity() { return Integer.MAX_VALUE; } public RunnableScheduledFuture<?> peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); } private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { RunnableScheduledFuture<?> t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } size = 0; } finally { lock.unlock(); } } private RunnableScheduledFuture<?> peekExpired() { // assert lock.isHeldByCurrentThread(); RunnableScheduledFuture<?> first = queue[0]; return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } public int drainTo(Collection<? super Runnable> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first; int n = 0; while ((first = peekExpired()) != null) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super Runnable> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first; int n = 0; while (n < maxElements && (first = peekExpired()) != null) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size, Object[].class); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < size) return (T[]) Arrays.copyOf(queue, size, a.getClass()); System.arraycopy(queue, 0, a, 0, size); if (a.length > size) a[size] = null; return a; } finally { lock.unlock(); } } public Iterator<Runnable> iterator() { return new Itr(Arrays.copyOf(queue, size)); } private class Itr implements Iterator<Runnable> { final RunnableScheduledFuture<?>[] array; int cursor = 0; // index of next element to return int lastRet = -1; // index of last element, or -1 if no such Itr(RunnableScheduledFuture<?>[] array) { this.array = array; } public boolean hasNext() { return cursor < array.length; } public Runnable next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); DelayedWorkQueue.this.remove(array[lastRet]); lastRet = -1; } } } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } //创建并执行在给定延迟后启用的一次性操作 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } //创建并执行在给定延迟后启用的 ScheduledFuture public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } //创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在initialDelay后开始执行,然后每隔period执行一次 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } //创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } //使用所要求的零延迟执行命令 public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } //提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } //提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); } //提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); } //设置有关在此执行程序已shutdown的情况下是否继续执行现有定期任务的策略 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); } //获取有关在此执行程序已shutdown的情况下、是否继续执行现有定期任务的策略 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; } //设置有关在此执行程序已shutdown的情况下是否继续执行现有延迟任务的策略 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); } //获取有关在此执行程序已shutdown的情况下是否继续执行现有延迟任务的策略 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; } //设置移除策略 public void setRemoveOnCancelPolicy(boolean value) { removeOnCancel = value; } //得到移除策略 public boolean getRemoveOnCancelPolicy() { return removeOnCancel; } //在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务 public void shutdown() { super.shutdown(); } //尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表 public List<Runnable> shutdownNow() { return super.shutdownNow(); } //返回此执行程序使用的任务队列 public BlockingQueue<Runnable> getQueue() { return super.getQueue(); } final long now() { return System.nanoTime(); } boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } @Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); } //修改或替换用于执行 runnable 的任务 protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; } //修改或替换用于执行 callable 的任务 protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; } }
类 ScheduledThreadPoolExecutor
所有已实现的接口:
Executor, ExecutorService, ScheduledExecutorService
在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor
具有额外的灵活性或功能时,此类要优于 Timer
。
一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。
虽然此类继承自 ThreadPoolExecutor
,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果。
扩展注意事项:此类重写 AbstractExecutorService
的 submit 方法,以生成内部对象控制每个任务的延迟和调度。若要保留功能性,子类中任何进一步重写的这些方法都必须调用超类版本,超类版本有效地禁用附加任务的定制。
但是,此类提供替代受保护的扩展方法 decorateTask(为 Runnable 和 Callable 各提供一种版本),可定制用于通过 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 进入的执行命令的具体任务类型。默认情况下,ScheduledThreadPoolExecutor 使用一个扩展 FutureTask
的任务类型。但是,可以使用下列形式的子类修改或替换该类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } protected <V> RunnableScheduledFuture<V> decorateTask( Runnable r, RunnableScheduledFuture<V> task) { return new CustomTask<V>(r, task); } protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> c, RunnableScheduledFuture<V> task) { return new CustomTask<V>(c, task); } // ... add constructors, etc. }
从类 java.util.concurrent.ThreadPoolExecutor 继承的嵌套类/接口
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
构造方法摘要
ScheduledThreadPoolExecutor(int corePoolSize) 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。 |
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。 |
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。 |
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。 |
从类 java.util.concurrent.ThreadPoolExecutor 继承的方法
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, beforeExecute, finalize,getActiveCount,getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getRejectedExecutionHandler, getTaskCount, getThreadFactory,isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, terminated
从类 java.util.concurrent.AbstractExecutorService 继承的方法
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor
从类 java.lang.Object 继承的方法
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
从接口 java.util.concurrent.ExecutorService 继承的方法
awaitTermination, invokeAll, invokeAll, invokeAny, invokeAny, isShutdown, isTerminated
ScheduledThreadPoolExecutor
1public ScheduledThreadPoolExecutor(int corePoolSize)
使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数(包括空闲线程)
抛出:
IllegalArgumentException
- 如果 corePoolSize < 0
ScheduledThreadPoolExecutor
1
2public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数(包括空闲线程)
threadFactory
- 执行程序创建新线程时使用的工厂
抛出:
IllegalArgumentException
- 如果 corePoolSize < 0
NullPointerException
- 如果 threadFactory 为 null
ScheduledThreadPoolExecutor
1
2public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数(包括空闲线程)
handler
- 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
抛出:
IllegalArgumentException
- 如果 corePoolSize < 0
NullPointerException
- 如果处理程序为 null
ScheduledThreadPoolExecutor
1
2
3public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
参数:
corePoolSize
- 池中所保存的线程数(包括空闲线程)
threadFactory
- 执行程序创建新线程时使用的工厂
handler
- 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
抛出:
IllegalArgumentException
- 如果 corePoolSize < 0
NullPointerException
- 如果 threadFactory 或处理程序为 null
remove
1public boolean remove(Runnable task)
从类 ThreadPoolExecutor
复制的描述
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,ThreadPoolExecutor.purge()
方法可用于移除那些已被取消的 Future。
覆盖:
类 ThreadPoolExecutor
中的 remove
参数:
task
- 要移除的任务
返回:
如果已经移除任务,则返回 true
decorateTask
1
2protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)
修改或替换用于执行 runnable 的任务。此方法可重写用于管理内部任务的具体类。默认实现只返回给定任务。
参数:
runnable
- 所提交的 Runnable
task
- 执行 runnable 所创建的任务
返回:
可以执行 runnable 的任务
decorateTask
1
2protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)
修改或替换用于执行 callable 的任务。此方法可重写用于管理内部任务的具体类。默认实现返回给定任务。
参数:
callable
- 所提交的 Callable
task
- 执行 callable 所创建的任务
返回:
可以执行 callable 的任务
schedule
1
2
3public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
从接口 ScheduledExecutorService
复制的描述
创建并执行在给定延迟后启用的一次性操作。
指定者:
接口 ScheduledExecutorService
中的 schedule
参数:
command
- 要执行的任务
delay
- 从现在开始延迟执行的时间
unit
- 延迟参数的时间单位
返回:
表示挂起任务完成的 ScheduledFuture,并且其 get() 方法在完成后将返回 null
schedule
1
2
3public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
从接口 ScheduledExecutorService
复制的描述
创建并执行在给定延迟后启用的 ScheduledFuture。
指定者:
接口 ScheduledExecutorService
中的 schedule
参数:
callable
- 要执行的功能
delay
- 从现在开始延迟执行的时间
unit
- 延迟参数的时间单位
返回:
可用于提取结果或取消的 ScheduledFuture
scheduleAtFixedRate
1
2
3
4public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
从接口 ScheduledExecutorService
复制的描述
创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。
指定者:
接口 ScheduledExecutorService
中的 scheduleAtFixedRate
参数:
command
- 要执行的任务
initialDelay
- 首次执行的延迟时间
period
- 连续执行之间的周期
unit
- initialDelay 和 period 参数的时间单位
返回:
表示挂起任务完成的 ScheduledFuture,并且其 get() 方法在取消后将抛出异常
scheduleWithFixedDelay
1
2
3
4public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
从接口 ScheduledExecutorService
复制的描述
创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。
指定者:
接口 ScheduledExecutorService
中的 scheduleWithFixedDelay
参数:
command
- 要执行的任务
initialDelay
- 首次执行的延迟时间
delay
- 一次执行终止和下一次执行开始之间的延迟
unit
- initialDelay 和 delay 参数的时间单位
返回:
表示挂起任务完成的 ScheduledFuture,并且其 get() 方法在取消后将抛出异常
execute
1public void execute(Runnable command)
使用所要求的零延迟执行命令。这在效果上等同于调用 schedule(command, 0, anyUnit)。注意,对由 shutdownNow 所返回的队列和列表的检查将访问零延迟的 ScheduledFuture
,而不是 command 本身。
指定者:
接口 Executor
中的 execute
覆盖:
类 ThreadPoolExecutor
中的 execute
参数:
command
- 要执行的任务。
抛出:
RejectedExecutionHandler 随意决定的 RejectedExecutionException,如果由于执行程序已关闭而无法接受要执行的任务 。
NullPointerException
- 如果 command 为 null。
submit
1public Future<?> submit(Runnable task)
从接口 ExecutorService
复制的描述
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在 成功 完成时将会返回 null。
指定者:
接口 ExecutorService
中的 submit
覆盖:
类 AbstractExecutorService
中的 submit
参数:
task
- 要提交的任务
返回:
表示任务等待完成的 Future
submit
1public <T> Future<T> submit(Runnable task,T result)
从接口 ExecutorService
复制的描述
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
指定者:
接口 ExecutorService
中的 submit
覆盖:
类 AbstractExecutorService
中的 submit
参数:
task
- 要提交的任务
result
- 返回的结果
返回:
表示任务等待完成的 Future
submit
1public <T> Future<T> submit(Callable<T> task)
从接口 ExecutorService
复制的描述
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。
注:Executors
类包括了一组方法,可以转换某些其他常见的类似于闭包的对象,例如,将 PrivilegedAction
转换为 Callable
形式,这样就可以提交它们了。
指定者:
接口 ExecutorService
中的 submit
覆盖:
类 AbstractExecutorService
中的 submit
参数:
task
- 要提交的任务
返回:
表示任务等待完成的 Future
setContinueExistingPeriodicTasksAfterShutdownPolicy
1public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。在这种情况下,仅在执行 shutdownNow 时,或者在执行程序已关闭、将策略设置为 false 后才终止这些任务。此值默认为 false。
参数:
value
- 如果为 true,则在关闭后继续执行;否则不执行。
另请参见:
getContinueExistingPeriodicTasksAfterShutdownPolicy()
getContinueExistingPeriodicTasksAfterShutdownPolicy
1public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。在这种情况下,仅在执行 shutdownNow 时,或者在执行程序已关闭时将策略设置为 false 后才终止这些任务。此值默认为 false。
返回:
如果关闭后继续执行,则返回 true。
另请参见:
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean)
setExecuteExistingDelayedTasksAfterShutdownPolicy
1public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。在这种情况下,仅在执行 shutdownNow 时,或者在执行程序已关闭、将策略设置为 false 后才终止这些任务。此值默认为 true。
参数:
value
- 如果为 true,则在关闭后执行;否则不执行。
另请参见:
getExecuteExistingDelayedTasksAfterShutdownPolicy()
getExecuteExistingDelayedTasksAfterShutdownPolicy
1public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。在这种情况下,仅在执行 shutdownNow 时,或者在执行程序已关闭时将策略设置为 false 后才终止这些任务。此值默认为 true。
返回:
如果关闭后执行,则返回 true
另请参见:
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean)
shutdown
1public void shutdown()
在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。如果已将 ExecuteExistingDelayedTasksAfterShutdownPolicy 设置为 false,则取消尚未超出其延迟的现有延迟任务。并且除非已将ContinueExistingPeriodicTasksAfterShutdownPolicy 设置为 true,否则将取消现有定期任务的后续执行。
指定者:
接口 ExecutorService
中的 shutdown
覆盖:
类 ThreadPoolExecutor
中的 shutdown
shutdownNow
1public List<Runnable> shutdownNow()
尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。
虽然尽最大努力,但并不保证可以停止处理正在执行的任务。此实现通过 Thread.interrupt()
取消任务,所以任何无法响应中断的任务都可能永远无法终止。
指定者:
接口 ExecutorService
中的 shutdownNow
覆盖:
类 ThreadPoolExecutor
中的 shutdownNow
返回:
从未开始执行的任务列表。此列表中的每个元素都是一个 ScheduledFuture
,包括用 execute 所提交的那些任务,出于安排的目的,这些任务用作零延迟 ScheduledFuture 的基础。
抛出:
SecurityException
- 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission
("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
getQueue
1public BlockingQueue<Runnable> getQueue()
返回此执行程序使用的任务队列。此队列中的每个元素都是一个 ScheduledFuture
,包括用 execute 所提交的那些任务,出于安排的目的,这些任务用作零延迟 ScheduledFuture 的基础。 无法 保证对此队列进行迭代的迭代器会以任务执行的顺序遍历各任务。
覆盖:
类 ThreadPoolExecutor
中的 getQueue
返回:
任务队列。
实现原理
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口。
1public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行。
首先看核心方法execute:
1
2
3public void execute(Runnable command) { schedule(command, 0, TimeUnit.NANOSECONDS); }
execute方法调用了另外一个方法schedule,同时我们发现三个submit方法也是同样调用了schedule方法,因为有两种类型的任务:Callable和Runnable,因此schedule也有两个重载方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
两个方法逻辑基本一致,都是把任务包装成RunnableScheduledFuture对象,然后调用delayedExecute来实现延迟执行。
任务包装类继承自ThreadPoolExecutor的包装类RunnableFuture,同时实现ScheduledFuture接口使包装类具有了延迟执行和重复执行这些功能以匹配ScheduledThreadPoolExecutor。
因此首先来看ScheduledFutureTask,以下是ScheduledFutureTask专有的几个变量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** 针对线程池所有任务的序列号 */ private final long sequenceNumber; /** 距离任务开始执行的时间,纳秒为单位 */ private long time; /** * 重复执行任务的间隔,即每隔多少时间执行一次任务 */ private final long period; /** 重复执行任务和排队时用这个类型的对象, */ RunnableScheduledFuture<V> outerTask = this; /** * 在延迟队列的索引,这样取消任务时使用索引会加快查找速度 */ int heapIndex;
核心方法run:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public void run() { boolean periodic = isPeriodic(); // 检测是否可以运行任务,这里涉及到另外两个变量:continueExistingPeriodicTasksAfterShutdown // 和executeExistingDelayedTasksAfterShutdown // 前者允许在shutdown之后继续执行重复执行的任务 // 后者允许在shutdown之后继续执行延时执行的任务, // 因此这里根据任务是否为periodic来决定采用哪个选项,然后 // 如果线程池正在运行,那么肯定可以执行 // 如果正在shutdown,那么要看选项的值是否为true来决定是否允许执行任务 // 如果不被允许的话,就会取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 如果可以执行任务,对于不用重复执行的任务,直接执行即可 else if (!periodic) ScheduledFutureTask.super.run(); // 对于需要重复执行的任务,则执行一次,然后reset // 更新一下下次执行的时间,调用reExecutePeriodic更新任务在执行队列的 // 位置(其实就是添加到队列的末尾) else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
因此可以得出关于重复执行的实现:任务执行一次,Reset状态,重新加入到任务队列。
回到delayedExecute,它可以保证任务在准确时间点执行,来看delayedExecute是如果实现延迟执行的:
1
2
3
4
5
6
7
8
9
10
11private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
把任务加入到任务队列中,那么这个延时执行的功能是如何实现的,秘密就在任务队列的实现。
1
2
3
4
5
6
7
8public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
ScheduledThreadPoolExecutor的任务队列不是普通的BlockingQueue,而是一个特殊的实现DelayedWorkQueue。
在ScheduledThreadPoolExecutor使用DelayedWorkQueue来存放要执行的任务,因为这些任务是带有延迟的,而每次执行都是取第一个任务执行,因此在DelayedWorkQueue中任务必然按照延迟时间从短到长来进行排序的。
DelayedWorkQueue使用堆来实现的。首先来看offer方法,基本就是一个添加元素到堆的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture e = (RunnableScheduledFuture) x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 因为元素时存储在一个数组中,随着堆变大,当数组存储不够时,需要对数组扩容 if (i >= queue.length) grow(); size = i + 1; // 如果原来队列为空 if (i == 0) { queue[0] = e; // 这个i就是RunnableScheduledFuture用到的heapIndex setIndex(e, 0); } else { // 添加元素到堆中 siftUp(i, e); } // 如果队列原先为空,那么可能有线程在等待元素,这时候既然添加了元素,就需要通过Condition通知这些线程 if (queue[0] == e) { // 因为有元素新添加了,第一个等待的线程可以结束等待了,因此这里 // 删除第一个等待线程 leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
这里顺带看一下siftUp,熟悉堆的实现的朋友应该很容易看懂这是一个把元素添加已有堆中的算法。
1
2
3
4
5
6
7
8
9
10
11
12
13private void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
那么接着就看看poll:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 因为即使拿到任务,线程还是需要等待,而这个等待过程是由队列帮助完成的 // 因此poll方法只能返回已经到执行时间点的任务 RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } }
因为poll方法只能返回已经到了执行时间点的任务,所以对于我们理解队列如何实现延迟执行没有意义,因此重点看看take方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { // 尝试获取第一个元素,如果队列为空就进入等待 RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { // 获取任务执行的延迟时间 long delay = first.getDelay(TimeUnit.NANOSECONDS); // 如果任务不用等待,立刻返回该任务给线程 if (delay <= 0) // 从堆中拿走任务 return finishPoll(first); // 如果任务需要等待,而且前面有个线程已经等待执行任务(leader线程 // 已经拿到任务了,但是执行时间没有到,延迟时间肯定是最短的), // 那么执行take的线程肯定继续等待, else if (leader != null) available.await(); // 当前线程的延迟时间是最短的情况,那么更新leader线程 // 用Condition等待直到时间到点,被唤醒或者被中断 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { // 重置leader线程以便进行下一次循环 if (leader == thisThread) leader = null; } } } } } finally { // 队列不为空发出signal很好理解,这里附带了没有leader线程 // 的条件是因为leader线程存在时表示leader线程正在等待执行时间点的 // 到来,如果此时发出signal会触发awaitNanos提前返回 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
take方法的重点就是leader线程,因为存在延迟时间,即使拿到任务,线程还是需要等待的,leader线程就那个最先执行任务的线程。
因为线程拿到任务之后还是需要等待一段延迟执行的时间,所以对于超时等待的poll方法来说就有点意思了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50public RunnableScheduledFuture poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture first = queue[0]; // 任务队列为空的情况 if (first == null) { // nanos小于等于0有两种可能: // 1. 参数值设定 // 2. 等待已经超时 if (nanos <= 0) return null; else // 等待一段时间,返回剩余的等待时间 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; // leader线程存在并且nanos大于delay的情况下, // 依然等待nanos这么长时间,不用担心会超过delay设定 // 的时间点,因为leader线程到时间之后会发出signal // 唤醒线程,而那个时候显然还没有到delay设定的时间点 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // 剩余的超时时间 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
通过分析以上代码基本上已经理清楚了DelayedWorkQueue实现延迟执行的原理:
- 1. 按照执行延迟从短到长的顺序把任务存储到堆;
- 2. 通过leader线程让拿到任务的线程等到规定的时间点再执行任务;
转载于:https://my.oschina.net/langwanghuangshifu/blog/2963472
最后
以上就是坚定溪流最近收集整理的关于java.util.concurrent.ScheduledThreadPoolExecutor 源码的全部内容,更多相关java.util.concurrent.ScheduledThreadPoolExecutor内容请搜索靠谱客的其他文章。
发表评论 取消回复