简介 java.util.concurrent(JUC)
本篇文章仅仅是对JUC包下的工具做一些简单的阐述,其每一项都可以更加的具体深入理解。
该包下是在并发编程中常用的实用程序类。这个包包括一些小型的标准化可扩展框架,以及一些提供有用功能的类。
Atomic(原子类)
该包下主要提供一些数据类型的原子类,如AtomicBoolean、AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLong、AtomicLongArray、AtomicLongFieldUpdater、AtomicMarkableReference、AtomicReference、AtomicReferenceArray、AtomicReferenceFieldUpdater、AtomicStampedReference。这些原子类全部依托与底层的Unsafe类实现CAS的操作保证其原子性。
AtomicInteger
AtomicInteger的主要方法如下图所示
JUC包下的相关原子类都是基于Unsafe类来进行CAS实现,Unsafe通过JDNI调用本地接口来操作内存实现我们加/解锁、
Unsafe类 see: https://mikeygithub.github.io/2022/05/25/yuque/eff1dm/
使用案例
1.基于AtomicInteger实现线程交替打印AB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) throws Exception { AtomicInteger atomicInteger = new AtomicInteger (0 ); new Thread ( () -> { while (atomicInteger.get() <= 1000 ) while ( atomicInteger.get() % 2 == 0 ) { System.out.println(Thread.currentThread().getName() + atomicInteger.intValue()); atomicInteger.getAndIncrement(); } } , "Thread A " ).start(); new Thread ( () -> { while (atomicInteger.get() <= 1000 ) while ( atomicInteger.get() % 2 != 0 ) { System.out.println(Thread.currentThread().getName() + atomicInteger.intValue()); atomicInteger.getAndIncrement(); } } , "Thread B " ).start(); Thread.sleep(3000 ); }
2.基于AtomicInteger实现多线程打印ABCD
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void main (String[] args) throws Exception { int count = 1 ; AtomicInteger aa = new AtomicInteger (0 ); AtomicInteger bb = new AtomicInteger (0 ); AtomicInteger cc = new AtomicInteger (0 ); AtomicInteger dd = new AtomicInteger (0 ); new Thread ( () -> { while (aa.get() < count ) while ( dd.get() == aa.get()) { System.out.println(Thread.currentThread().getName() + "A" ); aa.getAndIncrement(); } } , "Thread A " ).start(); new Thread ( () -> { while (bb.get() < count ) while ( bb.get() < aa.get()) { System.out.println(Thread.currentThread().getName() + "B" ); bb.getAndIncrement(); } } , "Thread B " ).start(); new Thread ( () -> { while (cc.get() < count ) while ( cc.get() < bb.get()) { System.out.println(Thread.currentThread().getName() + "C" ); cc.getAndIncrement(); } } , "Thread C " ).start(); new Thread ( () -> { while (dd.get() < count ) while ( dd.get() < cc.get()) { System.out.println(Thread.currentThread().getName() + "D" ); dd.getAndIncrement(); } } , "Thread D " ).start(); Thread.sleep(3000 ); }
LongAdder 基于性能考虑。AtomicLong的incrementAndGet()方法在高并发场景下,多个线程竞争修改共享资源value,会造成循环耗时过长(一直自旋CAS),进而导致性能问题,这个时候LongAdder应运而生。
1 public class LongAdder extends Striped64 implements Serializable
可以看到LongAdder继承的是Striped64,其核心操作和属性也是在Striped64中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy;@sun .misc.Contendedstatic final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value" )); } catch (Exception e) { throw new Error (e); } } }final boolean casBase (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , BASE, cmp, val); }static final int getProbe () { return UNSAFE.getInt(Thread.currentThread(), PROBE); }final boolean casCellsBusy () { return UNSAFE.compareAndSwapInt(this , CELLSBUSY, 0 , 1 ); }final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (; ; ) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created)break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
1 2 3 4 5 public static ThreadLocalRandom current () { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0 )localInit(); return instance; }
下面我们来看看LongAdder的相关方法,可以看到其increment和decrement都调用了add方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }public void increment () { add(1L ); }public void decrement () { add(-1L ); }
结论:
内部维护了多个Cell变量,每个Cell里面有一个初始值为0的long型变量,这样同时争取一个变量的线程就变少了,而是分散成对多个变量的竞争,减少了失败次数。如果竞争某个Cell变量失败,它不会一直在这个Cell变量上自旋CAS重试,而是尝试在其他的Cell变量上进行CAS尝试,这个改变增加了当前线程重试CAS成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base返回的,即:它的 sum 是 base + 各个 Cell 中 value 的总和
LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的竞争。
LongAccumulator
它是把LongAdder的(v + x)操作换成一个LongBinaryOperator,即用户可以自定义累加操作的逻辑,其他地方都是一样的
1 2 3 public LongAccumulator (LongBinaryOperator accumulatorFunction, long identity) { this .function = accumulatorFunction; base = this .identity = identity; }
Striped64 分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
Executors(执行器)
此包中定义的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和可调用类的工厂和实用程序方法。此类支持以下类型的方法:
创建并返回使用常用配置设置设置设置的ExecutorService的方法。
创建并返回使用常用配置设置设置设置的ScheduledExecutorService。
创建并返回“包装的”ExecutorService,该服务通过使特定于实现的方法不可访问来禁用重新配置。
创建并返回将新创建的线程设置为已知状态的线程工厂。
这些方法从其他类似闭包的表单中创建并返回可调用的,因此可以在需要可调用的执行方法中使用。
Executor
该类为执行器的统一抽象接口,只规范了一个execute方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface Executor { void execute (Runnable command) ; }
ExecutorService
executorService是统一任务或者线程执行服务规范定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ScheduledThreadPoolExecutor
一种线程池执行器,可以另外安排命令在给定延迟后运行,或定期执行。当需要多个工作线程时,或者当需要ThreadPoolExecutor(该类扩展)的额外灵活性或功能时,该类比Timer更可取。
使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ScheduledThreadPoolExecutorExample { static SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ); public static String getDate () { return sdf.format(new Date ()); } @SneakyThrows public static void main (String[] args) throws Exception{ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (2 ,new ThreadPoolExecutor .AbortPolicy()); executor.schedule(()-> System.out.println(String.format("线程 %s 当前时间 %s" ,Thread.currentThread().getName(),getDate())),3 , TimeUnit.SECONDS); executor.scheduleWithFixedDelay(()-> System.out.println(String.format("线程%s 当前时间%s" ,Thread.currentThread().getName(),getDate())),3 ,3 , TimeUnit.SECONDS); Thread.sleep(30000 ); } }
ForkJoinPool ForkJoinPool旨在用于 **CPU 密集型 **工作负载。ForkJoinPool 中的默认线程数等于系统上的 CPU 数。如果任何线程由于在某个其他 ForkJoinTask 上调用 join() 而进入等待状态,则会启动一个新的补偿线程以利用系统的所有 CPU。ForkJoinPool 有一个公共池,可以通过调用ForkJoinPool.commonPool()静态方法获得。此设计的目的是在系统中仅使用一个 ForkJoinPool,线程数等于系统上的处理器数。如果所有 ForkJoinTask 都在进行计算密集型任务,则它可以利用系统的全部计算能力。
但在现实生活场景中,任务是 CPU 和 IO 密集型任务的混合。IO 密集型任务对于 ForkJoinPool 来说是一个糟糕的选择。您应该使用 Executor 服务来执行 IO 密集型任务。在 ExecutorService 中,您可以根据系统的 IO 容量而不是系统的 CPU 容量来设置线程数。
如果要从 ForkJoinTask 调用 IO 密集型操作,则应创建一个实现ForkJoinPool.ManagedBlocker接口的类,并在block()方法中执行 IO 密集型操作。您需要使用静态方法ForkJoinPool.managedBlock()调用您的ForkJoinPool.ManagedBlocker实现。此方法在调用 block() 方法之前创建补偿线程。block() 方法应该进行 IO 操作并将结果存储在某个实例变量中。调用ForkJoinPool.managedBlock()后你应该调用你的业务方法来获取 IO 操作的结果。通过这种方式,你可以将 CPU 密集型操作与 IO 密集型操作混合使用。
ForkJoinPool不同于其他类型的ExecutorService,主要是因为它采用了**工作窃取 **:池中的所有线程都试图查找并执行提交给池和/或其他活动任务创建的任务(如果不存在,则最终阻止等待工作)。当大多数任务产生其他子任务时(就像大多数ForkJoinTasks一样),以及当许多小任务从外部客户端提交到池时,这可以实现高效处理。尤其是在构造函数中将asyncMode设置为true时,ForkJoinPools可能也适合用于从未加入的事件样式任务。
静态commonPool()适用于大多数应用程序。公共池由未显式提交到指定池的任何ForkJoinTask使用。使用公共池通常会减少资源使用(其线程在不使用期间缓慢回收,并在后续使用时恢复)。
对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构造ForkJoinPool;默认情况下,等于可用处理器的数量。池尝试通过动态添加、挂起或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务在等待加入其他任务时暂停。但是,面对阻塞的I/O或其他非托管同步,无法保证进行此类调整。嵌套的ForkJoinPool。ManagedBlocker接口支持扩展所适应的同步类型。
除了执行和生命周期控制方法外,此类还提供状态检查方法(例如getStealCount),用于帮助开发、调优和监视fork/join应用程序。此外,方法toString以一种方便的形式返回池状态的指示,以便进行非正式监视。
**Fork **:Fork步骤将任务拆分为更小的子任务,这些任务同时执行。
**Join **:在执行子任务之后,任务可以将所有结果加入到一个结果中。
如下图所示:
工作窃取(work-stealing) 工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。
参考: http://research.sun.com/scalable/pubs/index.html
在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:
工作线程worker1、worker2以及worker3都从taskQueue的尾部popping获取task,而任务也从尾部Pushing,当worker3队列中没有任务的时候,就会从其他线程的队列中取stealing,这样就使得worker3不会由于没有任务而空闲。
使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class NewTask extends RecursiveAction { private long Load = 0 ; public NewTask (long Load) { this .Load = Load; } protected void compute () { List<NewTask> subtasks = new ArrayList <NewTask>(); subtasks.addAll(createSubtasks()); for (RecursiveAction subtask : subtasks) { subtask.fork(); } } private List<NewTask> createSubtasks () { List<NewTask> subtasks = new ArrayList <NewTask>(); NewTask subtask1 = new NewTask (this .Load / 2 ); NewTask subtask2 = new NewTask (this .Load / 2 ); NewTask subtask3 = new NewTask (this .Load / 2 ); subtasks.add(subtask1); subtasks.add(subtask2); subtasks.add(subtask3); return subtasks; } public static void main (final String[] arguments) throws InterruptedException { int proc = Runtime.getRuntime().availableProcessors(); System.out.println("Number of available core in the processor is: " + proc); ForkJoinPool Pool = ForkJoinPool.commonPool(); System.out.println("Number of active thread before invoking: " + Pool.getActiveThreadCount()); NewTask t = new NewTask (400 ); Pool.invoke(t); System.out.println("Number of active thread after invoking: " + Pool.getActiveThreadCount()); System.out.println("Common Pool Size is: " + Pool.getPoolSize()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class ForkJoinPoolExample { static class Fibonacci extends RecursiveTask <Integer> { final int n; Fibonacci(int n) { this .n = n; } public Integer compute () { if (n <= 1 ) return n; Fibonacci f1 = new Fibonacci (n - 1 ); f1.fork(); Fibonacci f2 = new Fibonacci (n - 2 ); return f2.compute() + f1.join(); } } @SneakyThrows public static void main (String[] args) throws Exception { ForkJoinPool Pool = ForkJoinPool.commonPool(); Fibonacci t = new Fibonacci (10 ); Pool.invoke(t); Integer ret = t.get(); System.out.println("ret : " + ret); Thread.sleep(5000 ); } }
DelegatedExecutorService
仅公开ExecutorService实现 **的ExecutorService方法的包装类 **。Delegate(委托、委派)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute (Runnable command) { e.execute(command); } public void shutdown () { e.shutdown(); } public List<Runnable> shutdownNow () { return e.shutdownNow(); } public boolean isShutdown () { return e.isShutdown(); } public boolean isTerminated () { return e.isTerminated(); } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit (Callable<T> task) { return e.submit(task); } public <T> Future<T> submit (Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
使用案例
通过调用unconfigurableExecutorService方法可直接获取DelegatedExecutorService的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class DelegatedExecutorServiceExample { static ExecutorService executorService = Executors.unconfigurableExecutorService(new ThreadPoolExecutor (3 , 5 , 1 , TimeUnit.SECONDS, new LinkedBlockingDeque <>())); public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 20 ; i++) { executorService.submit(() -> System.out.println(Thread.currentThread().getName()+ " " +DateUtils.getDate())); } Thread.sleep(2000 ); executorService.shutdown(); } }
DelegatedScheduledExecutorService
DelegatedScheduledExecutorService是Executors的一个内部类,仅供**ScheduledExecutorService **实现的ScheduledExecutorService方法的包装类。用于任务调度的线程池服务。
该类的源码较少,直接是对DelegatedExecutorService进行继承,实现ScheduledExecutorService接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super (executor); e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
使用案例
通过newSingleThreadScheduledExecutor方法可直接获取单线程的线程池时间调度执行器
1 2 3 public static ScheduledExecutorService newSingleThreadScheduledExecutor () { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor (1 )); }
1 2 3 4 5 6 7 8 9 10 public class DelegatedScheduledExecutorServiceExample { static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public static void main (String[] args) throws Exception { for (int i = 0 ; i < 5 ; i++) { scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName()+ " " +DateUtils.getDate()),2 ,TimeUnit.SECONDS); } Thread.sleep(5000 ); scheduledExecutorService.shutdown(); } }
FinalizableDelegatedExecutorService
FinalizableDelegatedExecutorService的结构非常简单,继承DelegatedExecutorService类,重写了finalize在回收当前类的时候对线程池进行关闭操作
1 2 3 4 5 6 7 8 9 static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super (executor); } protected void finalize () { super .shutdown(); } }
使用案例
FinalizableDelegatedExecutorService的获取可以通过Executors.newSingleThreadExecutor方法获取
1 2 3 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS,new LinkedBlockingQueue <Runnable>())); }
1 2 3 4 5 6 7 8 9 10 11 12 public class FinalizableDelegatedExecutorServiceExample { static ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main (String[] args) throws Exception{ for (int i = 0 ; i < 5 ; i++) { executorService.submit(() -> System.out.println(Thread.currentThread().getName()+ " " + DateUtils.getDate())); } Thread.sleep(3000 ); executorService.shutdown(); } }
ThreadPoolExecutor
ThreadPoolExecutor类是线程池的根基类,上面提到的ScheduledExecutorService、五种ExecutorService都是基于ThreadPoolExecutor进行实现的。
详细分析参考:https://mikeygithub.github.io/2022/05/30/yuque/ymxl5b/ 在本篇中不再做详细概述。
使用案例
直接通过构造函数使用
1 2 3 4 5 6 7 8 new ThreadPoolExecutor (core, max, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>());
RejectedExecutionHandler
当我们任务过多超出阻塞队列的长度时,会根据我们配置的拒绝策略来决定新任务的去向,RejectedExecutionHandler定义了拒绝策略的统一标准,JDK提供有四个实现类CallerRunsPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
AbortPolicy
AbortPolicy,缺省策略,处理程序在拒绝时抛出runtime RejectedExecutionException。
CallerRunsPolicy
CallerRunPolicy,调用自身执行的线程运行任务。这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。
DiscardPolicy
DiscardPolicy,将简单地删除无法执行的任务。
DiscardOldestPolicy
DiscardOldestPolicy,如果未关闭执行器,将丢弃工作队列头部的任务,然后重试执行(可能再次失败,导致重复执行)