创建线程是一个重量级操作,因为需要调用操作系统内核的API,所以最好不要频繁的创建和销毁线程,为了能够复用创建的线程,常用的办法的就是创建线程池。
Executor java.util.concurren包中提供了若干接口和类来实现线程池,最常用的有Executor,ExecutorService,ThreadPoolExecutor。
Executor接口很简单定义如下:
1 2 3 public interface Executor { void execute (Runnable command) ; }
这个接口的目的在于将任务与执行机制解耦,使得用户不需要手动创建线程,只要交给Executor就行了。
ExecutorService ExecutorService接口则扩展了Executor接口,增加了若干实用的方法,最常用的两个方法:
1 2 3 4 void shutdown () ;<T> Future<T> submit (Callable<T> task) ;
AbstractExecutorService抽象类是ExecutorService的实现,实现了若干模板方法。
最重要的类莫过于ThreadPoolExecutor,它是最最常用的ExecutorService实现类,下面重点说说。
ThreadPoolExecutor ThreadPoolExecutor在构造时可以指定的参数最多有7个,另外还有3个使用一些默认参数的简化版本。
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 是保留的核心线程数,即使线程处于空闲也不会被回收,除非设置了allowCoreThreadTimeOut属性。
maximumPoolSize 最大线程数。当workQueue满了,会给新提交的任务创建新线程,这种情况下线程数会超过corePoolSize,但整个线程池的线程数必须有个上限,就是maximumPoolSize了。
keepAliveTime 回收线程前,允许保留空闲线程的时长。
workQueue 存储提交的任务的队列
threadFactory 创建线程的工厂类(ThreadFactory这个接口就定义了一个方法Thread newThread(Runnable r);
)
handler handler用于没有可用线程(线程数达到最大值,没有空闲线程)且workQueue队列满了的时候。
ThreadPoolExecutor 已经提供了以下 4 种策略。 CallerRunsPolicy:提交任务的线程自己去执行该任务。 AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。 DiscardPolicy:直接丢弃任务,没有任何异常抛出。 DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
ThreadPoolExecutory的构造函数一共有四种,使得用户可以省略threadFactory和handler中的一个或两个。
需要注意的情况 当maximumPoolSize>corePoolSize时,如果workQueue满了,新提交的任务会被新线程马上执行,而之前提交的在队列中等待的队列则继续等待。 也就是说后提交的任务可能先执行了。 当新线程执行完新提交的这个任务后,会转去执行队列中的数据,这时消费任务队列的线程数可能会大于corePoolSize,消费速度加快了。 下面做个实验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 package io.github.liam8.conimport java.util.concurrent.{ArrayBlockingQueue , Callable , Future , RejectedExecutionException , ThreadPoolExecutor , TimeUnit }object ExecutorDemo { def main (args: Array [String ]): Unit = { val executor = new ThreadPoolExecutor ( 1 , 2 , 10 , TimeUnit .SECONDS , new ArrayBlockingQueue [Runnable ](1 ) ) val task1 = new Runnable { override def run (): Unit = { println("task1 running" ) Thread .sleep(3000 ) println("task1 complete" ) } } val task2 = new Runnable { override def run (): Unit = { println("task2 running" ) Thread .sleep(3000 ) println("task2 complete" ) } } val task3 = new Callable [String ] { override def call (): String = { println("task3 running" ) Thread .sleep(3000 ) println("task3 complete" ) "xxx" } } val task4 = new Runnable { override def run (): Unit = { println("task4 running" ) Thread .sleep(3000 ) println("task4 complete" ) } } var task2Result: Future [String ] = null var taskCount = 1 try { executor.execute(task1) println("task1 submitted" ) taskCount += 1 executor.execute(task2) println("task2 submitted" ) taskCount += 1 task2Result = executor.submit(task3) println("task3 submitted" ) taskCount += 1 executor.execute(task4) println("task4 submitted" ) } catch { case e: RejectedExecutionException => println(s"task $taskCount be rejected" ) } val th = new Thread { var threadNum = 0 override def run (): Unit = while (true ) { if (executor.getPoolSize != threadNum) { threadNum = executor.getPoolSize println("pool size:" + threadNum) } Thread .sleep(100 ) } } th.setDaemon(true ) th.start() if (task2Result != null ) { println(task2Result.get(7 , TimeUnit .SECONDS )) } Thread .sleep(5000 ) executor.shutdown() } }
output
1 2 3 4 5 6 7 8 9 10 11 12 13 task1 running task1 submitted task2 submitted task3 submitted task3 running //task3在task2之前运行了! task 4 be rejected // 线程数达到最大值,任务队列也满了,task4被拒绝(默认的handler) pool size:2 task1 complete task3 complete xxx task2 running // 空闲的线程开始消费队列 task2 complete pool size:0
Executors Executors是JUC包中的一个静态工厂类,其中除了newFixedThreadPool,newSingleThreadExecutor方法,其他方法都不推荐使用,因为其他方法创建的线程池使用的是无界队列,可能会占用过多内存,甚至OOM,所以建议使用有界队列。
ExecutionContext Scala另外提供了ExecutionContext和Future来简化线程池的使用,Future可以接受一个ExecutionContext类型的隐式参数,将传入的函数提交到ExecutionContext的线程池中运行。 下面举个栗子,不做深入探讨。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package io.github.liam8.conimport java.util.concurrent.Executors import scala.concurrent.{Await , ExecutionContext , ExecutionContextExecutorService , Future }import scala.concurrent.duration._object ExecutionContextDemo { def main (args: Array [String ]): Unit = { val pool = Executors .newFixedThreadPool(2 ) implicit val ec: ExecutionContextExecutorService = ExecutionContext .fromExecutorService(pool) val f = Future { val t = Thread .currentThread().getName println(s"$t : future is coming" ) 123 } val re = f.map(r => { val t = Thread .currentThread().getName println(s"$t : mapping" ) r * r }) re.onSuccess { case x: Int => println(x) } Await .result(f, 3. seconds) ec.shutdown() } }
output
1 2 3 pool-1-thread-1: future is coming pool-1-thread-2: mapping 15129
参考文献 Executor与线程池:如何创建正确的线程池?
Futures Made Easy with Scala
本文代码 Github仓库
转载请注明原文地址:https://liam-blog.ml/2019/09/22/Scala-Concurrency-Executor/