isShutDown:当调用shutdown()或shutdownNow()方法后返回为true。
isTerminated:当调用shutdown()方法后,并且所有提交的任务完成后返回为true;
isTerminated:当调用shutdownNow()方法后,成功停止后返回为true;
如果线程池任务正常完成,都为false
public class ExecutorServiceTest {public static void main(String[] args) throws InterruptedException {
// isShutdown();
// isTerminated_1();
// executorRunnableError();executorRunnableTask();}
}
private static void isShutdown() {ExecutorService executorService = Executors.newSingleThreadExecutor();/*** void execute(Runnable command);*/executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}});/*** 如果此执行器已关闭,则返回true。*/System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isShutdown());/*** 执行完 executorService.shutdown(); 之后 还可以执行一个Runnable?不会,抛出异常!!!*/executorService.execute(() -> System.out.println("i will executor after shutdown ..."));/*** false* true* Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.excutor.ExecutorServiceTest$$Lambda$2/932583850@cac736f rejected from java.util.concurrent.ThreadPoolExecutor@5e265ba4[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]* at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)* at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)* at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)* at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)* at com.thread.excutor.ExecutorServiceTest.isShutdown(ExecutorServiceTest.java:32)* at com.thread.excutor.ExecutorServiceTest.main(ExecutorServiceTest.java:9)*/}
private static void isTerminated_1() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});executorService.shutdown();System.out.println("executorService.isShutdown() " + executorService.isShutdown());/*** 1、当需要用到isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,* 必须在shutdown()方法关闭线程池之后才能使用,否则isTerminated()永不为TRUE,线程将一直阻塞在该判断的地方,导致程序最终崩溃。* 2、判断全部提交的任务是否完成,当调用shutdown()方法后,并且所有提交的任务完成后返回为true*/System.out.println("1-executorService.isTerminated() " + executorService.isTerminated());System.out.println("executorService.isTerminating() " + executorService.isTerminating());/*** executorService.isShutdown() true* executorService.isTerminated() false* executorService.isTerminating() true*/}
private static void executorRunnableError() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(10, new MyThreadFactory());IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() -> System.out.println(1 / 0)));executorService.shutdown();try {executorService.awaitTermination(10, TimeUnit.MINUTES);System.out.println("============over============");} catch (InterruptedException e) {e.printStackTrace();}}private static class MyThreadFactory implements ThreadFactory {private final static AtomicInteger SEQ = new AtomicInteger();@Overridepublic Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("My-Thread-" + SEQ.getAndIncrement());/*** void uncaughtException(Thread t, Throwable e);*/thread.setUncaughtExceptionHandler((t, e) -> {System.out.println("The thread " + t.getName() + " executor failed.");System.out.println("=====================");});return thread;}}
/**
The thread My-Thread-6 executor failed.
The thread My-Thread-1 executor failed.
=====================
The thread My-Thread-14 executor failed.
=====================
The thread My-Thread-5 executor failed.
=====================
The thread My-Thread-4 executor failed.
=====================
The thread My-Thread-2 executor failed.
=====================
The thread My-Thread-0 executor failed.
=====================
The thread My-Thread-3 executor failed.
=====================
=====================
The thread My-Thread-7 executor failed.
=====================
The thread My-Thread-12 executor failed.
=====================
============over============Process finished with exit code 0
*/
/*** | ---->* send ---> store db ---> 10 | ---->* | ---->*/private static void executorRunnableTask() throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10, new MyThreadFactory());IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() ->{MyTask myTask = new MyTask(i) {@Overridepublic void error(Throwable e) {System.out.println("The no : " + i + " failed, update status to ERROR.");}@Overridepublic void done() {System.out.println("The no : " + i + " successfully, update status to DONE.");}@Overridepublic void doExecutor() {if (i % 3 == 0) {int time = 1 / 0;}}@Overridepublic void doInit() {// do noting ...}};myTask.run();}));executorService.shutdown();executorService.awaitTermination(10, TimeUnit.MINUTES);System.out.println("=======================");/*** The no : 0 failed, update status to ERROR.* The no : 3 failed, update status to ERROR.* The no : 2 successfully, update status to DONE.* The no : 1 successfully, update status to DONE.* The no : 4 successfully, update status to DONE.* The no : 5 successfully, update status to DONE.* The no : 7 successfully, update status to DONE.* The no : 6 failed, update status to ERROR.* The no : 8 successfully, update status to DONE.* The no : 9 failed, update status to ERROR.* =======================*/}private abstract static class MyTask implements Runnable {public final Integer no;public MyTask(Integer no) {this.no = no;}@Overridepublic void run() {try {this.doInit();this.doExecutor();this.done();} catch (Throwable e) {this.error(e);}}public abstract void error(Throwable e);public abstract void done();public abstract void doExecutor();public abstract void doInit();}
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
package com.thread.excutor;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class RejectedExecutionHandlerTest {public static void main(String[] args) {
// testAbortPolicy();
// testDiscardPolicy();testDiscardOldestPolicy();
// testCallerRunsPolicy();}/*** 由提交任务的线程处理该任务*/private static void testCallerRunsPolicy() {ExecutorService executorService = new ThreadPoolExecutor(1,2,30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),Thread::new,new ThreadPoolExecutor.CallerRunsPolicy());IntStream.range(0, 3).boxed().forEach(item -> {executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("yyyy " + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}executorService.execute(() -> {System.out.println("xxxx == " + Thread.currentThread().getName());});/*** xxxx == main* yyyy Thread-1* yyyy Thread-0* yyyy Thread-1** Process finished with exit code 130 (interrupted by signal 2: SIGINT)*/}/*** 1、丢弃队列最前面的任务,然后重新提交被拒绝的任务。* 2、当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。* 在rejectedExecution先从任务队列种弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。*/private static void testDiscardOldestPolicy() {ExecutorService executorService = new ThreadPoolExecutor(1,2,30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),Thread::new,new ThreadPoolExecutor.DiscardOldestPolicy());IntStream.range(0, 3).boxed().forEach(item -> {executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("yyyy " + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}executorService.execute(() -> System.out.println("xxxx" + Thread.currentThread().getName()));/*** yyyy Thread-0* yyyy Thread-1* xxxxThread-0** Process finished with exit code 130 (interrupted by signal 2: SIGINT)*/}/*** 不处理新任务,直接丢弃掉*/private static void testDiscardPolicy() {ExecutorService executorService = new ThreadPoolExecutor(1,2,30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),Thread::new,new ThreadPoolExecutor.DiscardPolicy());IntStream.range(0, 3).boxed().forEach(item -> {executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("yyyy");} catch (InterruptedException e) {e.printStackTrace();}});});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}executorService.execute(() -> System.out.println("xxxx"));/*** yyyy* yyyy* yyyy** Process finished with exit code 130 (interrupted by signal 2: SIGINT)*/}/*** Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.excutor.RejectedExecutionHandlerTest$$Lambda$5/604107971@7637f22 rejected from java.util.concurrent.ThreadPoolExecutor@4926097b[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]* at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)* at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)* at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)* at com.thread.excutor.RejectedExecutionHandlerTest.testAbortPolicy(RejectedExecutionHandlerTest.java:36)* at com.thread.excutor.RejectedExecutionHandlerTest.main(RejectedExecutionHandlerTest.java:11)* yyyy* yyyy* yyyy** Process finished with exit code 130 (interrupted by signal 2: SIGINT)*/private static void testAbortPolicy() {ExecutorService executorService = new ThreadPoolExecutor(1,2,30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),Thread::new,/*** AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。* 丢弃任务,并抛出异常:最大承载=maximumPoolSize + BlockingQueue*/new ThreadPoolExecutor.AbortPolicy());IntStream.range(0, 3).boxed().forEach(item -> {executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(10);System.out.println("yyyy");} catch (InterruptedException e) {e.printStackTrace();}});});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}executorService.execute(() -> System.out.println("xxxx"));}
}
allowCoreThreadTimeOut 支持回收核心线程,合适定时任务这种回收。
package com.thread.excutor;import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class TestAllowCoreThreadTimeOut {public static void main(String[] args) {
// getActiveCount();
// testAllowCoreThreadTimeOut_1();
// testAllowCoreThreadTimeOut_2();
// testAllowCoreThreadTimeOut_3();testAllowCoreThreadTimeOut_4();}private static void getActiveCount() {/*** public static ExecutorService newFixedThreadPool(int nThreads) {* return new ThreadPoolExecutor(nThreads, nThreads,* 0L, TimeUnit.MILLISECONDS,* new LinkedBlockingQueue());* }*/ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);System.out.println("1 - " + executorService.getActiveCount());executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2 - " + executorService.getActiveCount());/*** 1 - 0* 2 - 1*/}private static void testAllowCoreThreadTimeOut_1() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);System.out.println("1===" + executorService.getActiveCount());IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}));try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2===" + executorService.getActiveCount());/*** 不会销毁* 1===0* 2===0** Process finished with exit code 130 (interrupted by signal 2: SIGINT)*/}private static void testAllowCoreThreadTimeOut_2() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);executorService.allowCoreThreadTimeOut(true);System.out.println("1===" + executorService.getActiveCount());IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}));try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2===" + executorService.getActiveCount());/*** Exception in thread "main" java.lang.IllegalArgumentException: Core threads must have nonzero keep alive times* at java.util.concurrent.ThreadPoolExecutor.allowCoreThreadTimeOut(ThreadPoolExecutor.java:1658)* at com.thread.excutor.TestU.testAllowCoreThreadTimeOut_2(TestU.java:70)* at com.thread.excutor.TestU.main(TestU.java:12)*/}private static void testAllowCoreThreadTimeOut_3() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);executorService.setKeepAliveTime(10, TimeUnit.SECONDS);executorService.allowCoreThreadTimeOut(true);System.out.println("1===" + executorService.getActiveCount());IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}));try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2===" + executorService.getActiveCount());/*** 程序会自动退出* 1===0* 2===0** Process finished with exit code 0*/}private static void testAllowCoreThreadTimeOut_4() {ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);executorService.setKeepAliveTime(10, TimeUnit.SECONDS);executorService.allowCoreThreadTimeOut(true);System.out.println("1===" + executorService.getActiveCount());IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}));System.out.println("2===" + executorService.getActiveCount());executorService.execute(() -> {try {System.out.println("xxxxxx");TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3===" + executorService.getActiveCount());/*** 1===0* 2===5* xxxxxx* 3===1** Process finished with exit code 0*/}
}