这篇文章上次修改于 195 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
概述
在大型项目中,在代码中直接创建线程是不允许的,如果需要使用多线性则必须通过线程池了创建,因而了解线程池的使用规范以及底层实现是非常有必要的。
使用
预想的用法
但说到线程池,我们可能首先会类比连接池这类池化资源,会以为线程池是通过 acquire()
来获取资源,通过 release()
来释放资源,就像下边这样:
//采用一般意义上池化资源的设计方法
class ThreadPool{
// 获取空闲线程
Thread acquire() {
}
// 释放线程
void release(Thread t){
}
}
//期望的使用
ThreadPool pool;
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{
//具体业务逻辑
......
});
也就是说,我们在使用的时候直接从 pool 中获取一个线程实例 T1,然后直接向 T1 里边传业务逻辑就行了。但实际在使用的时候发现 Thread 类中根本没有类似于 execute(Runnable task)
这样的公共方法,因而线程池是没有办法按照池化的思想来设计的。
实际的用法
业界当前在设计线程池时,普遍采用的都是生产者-消费者模式,类似下面这种形式:
//简化的线程池,仅用来说明工作原理
class MyThreadPool{
//利用阻塞队列实现生产者-消费者模式
BlockingQueue<Runnable> workQueue;
//保存内部工作线程
List<WorkerThread> threads
= new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize,
BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
//循环取任务并执行
while(true){
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(
10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});
在 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务。也就是说在使用线程池时,我们编写的代码充当的角色类似于生产者,只要往线程池中放任务即可,线程池自己会获取任务并执行。
底层实现
当然实际线程池的功能比我们前面写的要强大的多,实现也有多种。但无论哪种实现实际上都是在 Executor框架
之下的,什么是 Executor 框架呢?
Executor 框架
实际上它是线程执行规范的抽象,它有三个模块组成:
1. 对任务的抽象(Runnable/Callable)
所要执行的任务都要实现 Runnable
或者 Callable
接口,重写其内部对应的方法,这样才能被线程池接受执行。
其中 Runnable
接口自 Java1.0 以来一直存在,但 Callable
接口仅在 Java1.5 中引入,目的是为了能够返回任务执行之后的结果或者异常的抛出。
具体到代码层面上:
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}
//---------------------------------------//
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
我们可以看到实现 Ruable
接口的类需要重写 run()
方法,该方法不返回任何结果;而实现 Callable
接口的类则中重写的方法是 call()
,该方法会返回一个泛型 V,通过这个返回值我们就可以获取到任务执行的结果。
当然两者也是进行相互转化的,通过工具类 Executors
类可以实现 Runnable 对象和 Callable 对象直接的转换。(Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)。
2. 对任务执行的抽象(Executor)
Executor
接口非常简单内部只有一个 void execute(Runnable command)
也就是说所有要接受任务(Runnable)的线程池都必须实现该接口,那 Callable 接口怎么办呢?
我们先看下边这张类关系图:
475 x 421
从类的继承关系图上可以看到,Executor 下边还有一个子接口 ExecutorService
,所有的线程池实现类都实现了该接口,我们看这个接口的结构图:
559 x 284
可以看到 ExecutorService
接口在实现了 Executor接口
的前提下还定义了许多新的方法,其中 submit(Callable)
就是用来提交 Callable
类型任务的。
3. 异步计算结果的抽象(Future)
此处为什么要强调异步呢?主要原因在于当你提交一个任务到线程池中之后,任务的执行是异步的,不会同步返回给主线程。Future 内部也比较简单,定义了常用对结果操作的方法:
323 x 147
get()
可以阻塞主线程,并获取任务的执行结果get(long,TimeUnit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完cancel(boolean)
可以取消任务的执行
Executor 的执行流程
通过前面 Executor框架
的说明,我们可以看一下下面这张流程示意图:
561 x 380
- 主线程可以创建 Ruaable 类型或者 Callable 类型的任务
- 然后通过
submit()
方法或者execut()
方法可以将任务放到 ExecutorService 中执行 - 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现 Future 接口的对象
- 最后可以通过
Future.get()
方法来阻塞主线程等待任务执行完成获取任务的返回结果。(如果通过submit(Runnable)
方法获取到的 Future 对象,其get()
结果为null
没有评论
博主关闭了评论...