我是靠谱客的博主 孝顺耳机,这篇文章主要介绍基于线程池的并发编程取消运行中的任务,现在分享给大家,希望可以做个参考。

        我们都知道,java.util.concurrent包给我们提供了并发编程的相关工具类,有线程池,队列,CountDownLatch,Semaphore等等工具类,使我们编写并发程序非常方便。

        我今天要讲的是如何在将任务提交到线程池后取消正在运行的任务?

有线程池编程经验的朋友可能第一反应就是利用Future.cancel(boolean)方法。这个方法确实可以取消任务,但这个方法有一个缺陷。当任务还在队列中没有得到执行的时候,也会被取消掉。这种情况有时候可能不满足用户需求,例如:我的需求是当任务正在运行,并且运行时间超过5秒钟,则取消任务。这种需求Future.cancel(boolean)就不能满足了。接下来我给大家分享一下我的实现方式,如果有不对的地方还请大家指正,谢谢!

        为了验证实践方法,我写了两个类,分别为工作任务类:Task.java实现了Runnable接口,此类模拟了一个耗时任务。另一个:Main.java,此类模拟提交任务,并对超时任务取消操作。具体代码如下(代码中的注释已经很全面):

Task.java:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/** * <p> * Create Time: 2018年5月25日 * </p> * @version 1.0 */ package cn.concurrent1; import java.lang.Thread.State; import java.util.UUID; /** * 任务类,实现了Runnable接口 * <p> * Create Time: 2018年5月25日 * </p> * * @version 1.0 */ public class Task implements Runnable { private String taskName;// 任务名称 private volatile long start = 0L; // 任务开始时间 private State state; // 线程状态 private Thread taskInThread; // 当前任务所处的线程 public Task(String taskName) {     this.taskName = taskName; } @Override public void run() { // 将当前任务与当前线程相关连,为的是后续设置中断状态做准备 this.setTaskInThread(Thread.currentThread()); // 记录任务执行开始时间 start = System.currentTimeMillis(); long num = 0; try { String uuid = ""; // 模拟任务耗时开始 // uuid = UUID.randomUUID().toString(); while (!Thread.currentThread().isInterrupted()) { if (uuid.startsWith("0")) { break; } } long now = System.currentTimeMillis(); num = now - start; System.out.println(taskName + "=>开始 start==" + start + " 模拟耗时" + num / 1000); // 模拟任务耗时结束 // // 任务执行结束,在存储任务容器中删除该任务 if (Main.map.get(this.getTaskName()) != null) { Main.map.remove(this.getTaskName()); if (!Thread.currentThread().isInterrupted()) { System.out.println(taskName + "==>正常结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时==" + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now); } else { System.out.println(taskName + "==>中断结束,模拟清除 耗时:" + (num / 1000) + " 实际耗时==" + ((System.currentTimeMillis() - start) / 1000) + " start=" + start + " end=" + now); } } } catch (Exception e) { long end = System.currentTimeMillis(); System.out.println(taskName + "===>被中断了 模拟耗时:" + (num / 1000) + " start:" + start + " end:" + end + " 实际耗时==" + ((System.currentTimeMillis() - start) / 1000)); // 任务中断,在存储任务容器中删除该任务 if (Main.map.get(this.getTaskName()) != null) { Main.map.remove(this.getTaskName()); } } } // 对外提供设置任务中断的方法 public void setInterrupte(Thread thread) { try { thread.interrupt(); } catch (Exception e) { e.printStackTrace(); } } // Getter Setter方法 public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public State getThreadStatus() { this.state = Thread.currentThread().getState(); return this.state; } public long getStart() { return start; } public Thread getTaskInThread() { return taskInThread; } public void setTaskInThread(Thread taskInThread) { this.taskInThread = taskInThread; } }

Main.java

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/** * <p> * Create Time: 2018年5月25日 * </p> * @version 1.0 */ package cn.concurrent1; import java.lang.Thread.State; import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * <p> * Create Time: 2018年5月25日 * </p> * * @version 1.0 */ public class Main { // 处理器个数 public final static int PROCESS_NUM = Runtime.getRuntime().availableProcessors(); // 根据处理器个数定义线程个数 public final static int THREAD_NUM = Math.max(PROCESS_NUM, 4) * 5; // 任务队列大小 public final static int DISPENSE_MAX_WAITTING_THREAD_NUM = Short.MAX_VALUE >> 1;// 16383 // 最大任务执行时间,时间上限 public final static int MAXIMUM_TASK_EXECUTION_TIME = 5; // 线程池,拒绝策略为丢弃旧的任务 private static ThreadPoolExecutor es = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(DISPENSE_MAX_WAITTING_THREAD_NUM), new NamedThreadFactory("测试任务"), new ThreadPoolExecutor.DiscardOldestPolicy()); // 定时任务,定时删除任务执行时间大于5秒钟的任务 private static ScheduledExecutorService threadCostTimeEs = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("定时任务"), new ThreadPoolExecutor.AbortPolicy()); // 存储任务容器 public static ConcurrentHashMap<String, Task> map = new ConcurrentHashMap<>(); // mian方法 public static void main(String[] args) { // 模拟提交100个任务 for (int i = 0; i < 100; i++) { Task task = new Task("Task-" + ((i + 1) < 10 ? "0" + (i + 1) : (i + 1))); map.put(task.getTaskName(), task); es.submit(task); } // 定时任务定时处理逻辑 threadCostTimeEs.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 监控线程池 System.out.println("获取HTML线程池任务总数:" + es.getQueue().size() + "," + "活动线程数量:" + es.getActiveCount() + "," + "总任务数量:" + es.getTaskCount()); Set<Entry<String, Task>> set = map.entrySet(); Iterator<Entry<String, Task>> itr = set.iterator(); while (itr.hasNext()) { try { Entry<String, Task> entry = itr.next(); Task t = entry.getValue(); long start = t.getStart(); if (start != 0) { long currentTime = System.currentTimeMillis(); long sub = currentTime - start; sub = sub / 1000; // 队列中包含该任务,并且任务执行时间大于最大任务执行时间(目前5秒) if (sub > MAXIMUM_TASK_EXECUTION_TIME && !es.getQueue().contains(t)) { System.out.println(t.getTaskName() + "====>开始中断 耗时====" + sub + " start==" + start + " currentTime=" + currentTime); State state = t.getThreadStatus(); // 任务为Runnable状态的,设置中断状态 if (state == State.RUNNABLE) { t.setInterrupte(t.getTaskInThread()); } } } } catch (Exception e) { e.printStackTrace(); } } } }, 1, 1, TimeUnit.SECONDS); // while (es.getActiveCount() == 0) { // es.shutdown(); // threadCostTimeEs.shutdown(); // } } }

 

NameThreadFactory 线程命名工具类:

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package cn.concurrent1; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NameThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String threadName; public NamedThreadFactory(String threadName) { this.threadName = threadName; } public Thread newThread(Runnable r) { Thread t = new Thread(r, threadName + threadNumber.getAndIncrement()); return t; } }

 

最后

以上就是孝顺耳机最近收集整理的关于基于线程池的并发编程取消运行中的任务的全部内容,更多相关基于线程池内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部