线程的复用问题
在开始看线程池的源码之前,先来看这么一个问题:
一个Thread对象可以重复地调用start()
方法吗?
试试就知道了:
@Test
public void test(){
Thread thread = new Thread(()->{
System.out.println("线程启动");
});
thread.start();
thread.start();
}
输出是:
线程启动
java.lang.IllegalThreadStateException
at java.lang.Thread.start(Thread.java:708)
at com.rhett.thread.TestThreadPool.test(TestThreadPool.java:22)
...
可见线程只能启动一次,并且原因就是以前介绍过的Java线程的状态:
线程在run()方法执行完之后就会进入TERMINATED
状态,Java会负责系统调用对线程进行销毁,自然也不可以再次启动。
那么我们常常说Java中的线程池可以实现线程复用,是怎么实现的呢?
一种很直观的想法是,既然线程启动会检查Thread中的Runnable类型的target是否为空,如果不为空则执行其run方法,那么只要每次线程执行完之后替换一下target对象就可以实现线程复用了。但是这种假设已经被我们验证是错误的了:线程根本不能重复调用start
方法,无法做到多次启动。
那么另一种想法就是我们在Java线程池详解中手动实现的线程池的方法,就是通过阻塞队列实现的生产者消费者模型,只要队列未满,生产者就不断产生新线程,并添加到阻塞队列中,消费者只需要从阻塞队列中取线程对象就可以了。
这种方法实现线程池,看似没有问题,但并没有实现真正的线程复用,当线程用的快的时候,还是必须不断生成新线程,以满足需要。
下面就来看看JDK中线程池是怎么实现线程复用的:
ThreadPoolExecutor源码分析
其实ThreadPoolExecutor的一些属性在Java线程池详解中已经看得挺多的了,不过那篇确实借鉴别人的比较多,也没有深入了解线程复用的原理。
这里还是可以先把几个核心的内容看一下:
// ctl:前三位为线程池的状态,后面为线程池中线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 对应五种状态,是ctl的前三位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池的状态(通过获取ctl的前三位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池的线程数量(通过获取ctl的低位部分)
private static int workerCountOf(int c) { return c & CAPACITY; }
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程的集合(核心容器)
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程工厂,可以给线程起名
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 救急(非核心)线程空闲状态能存在的最长时间
private volatile long keepAliveTime;
// 最大常驻(核心)线程个数
private volatile int corePoolSize;
// 线程池最多同时拥有的线程个数
private volatile int maximumPoolSize;
// 默认的拒绝策略:让调用者抛出 RejectedExecutionException 异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
有一些字段暂时看不懂也没有关系,看着看着就懂了,首先就从使用线程池的常用方法execute
方法进入源码分析吧:
// 传入一个任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl
int c = ctl.get();
// 看看当前工作线程个数是否小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加一个核心Worker
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果工作线程个数不小于 corePoolSize,或添加Worker失败
// 将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 添加一个非核心的Worker
else if (!addWorker(command, false))
// 如果添加非核心的Worker还失败了,则执行拒绝策略
reject(command);
}
可以看到,大致是一个这样的流程:
提交任务:
1. 若worker数量小于corePoolSize
,创建一个核心worker,完成任务
2. 若如果工作线程个数不小于 corePoolSize,或添加Worker失败,将任务添加到任务队列中
3. 若以上条件都不满足,但worker数小于maximumPoolSize
,创建一个非核心worker来完成任务。
4. 若以上条件都不满足,执行拒绝策略
那么就先看一下addWorker
这个关键方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池要关闭的情况,不添加
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池中worker数量
int wc = workerCountOf(c);
// 如果大于最大数量,添加失败
if (wc >= CAPACITY ||
// 如果要创建核心worker,不能大于等于corePoolSize
// 如果要创建非核心worker,不能大于等于maximumPoolSize
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通过CAS去修改ctl字段中的worker个数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将任务传给Worker,构造一个实例对象
w = new Worker(firstTask);
// ...
}
看到这里,想必你也很好奇Worker到底是什么?它是干什么的?
那么就进一段小插曲,先看下Worker
这个内部类,方便后面的理解:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// worker在哪个线程中run
final Thread thread;
// worker初次构造时要执行的任务
Runnable firstTask;
// 执行过的任务数量
volatile long completedTasks;
// 构造方法,传入一个firstTask
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// Worker实现了Runnable接口,这里实现了run方法
public void run() {
runWorker(this);
}
// 是被一个线程独占的吗
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试让一个线程独占
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放独占锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//用于中断线程的方法
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
必须要注意到的是Worker
实现了Runnable
接口,并继承了AQS,用于一个线程的独占。
其中的构造方法有一句this.thread = getThreadFactory().newThread(this);
,是通过ThreadFactory
来生成一个Thread
对象,那么假设我们构造线程池的时候没有传入ThreadFactory
对象,它的默认ThreadFactory
对象是什么呢?关于这个可以在简要版的构造函数内看到:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可见默认的ThreadFactory
对象是Executors.defaultThreadFactory()
,也就是Executors
内部类DefaultThreadFactory
:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
那么DefaultThreadFactory
的实现如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
可见这里的线程名默认是pool-线程池号-thread-线程号
,而线程池号和线程号都是通过一个AtomicInteger
记录的,线程池号每构造一个DefaultThreadFactory
对象递增1,线程号则是DefaultThreadFactory
对象每次生成一个线程则递增1。
我们还注意到,在上面调用newThread
创建线程的时候,把this
传进去了,this
就是当前的Worker
对象,也就是说,创建出来的线程对象中的Runnable类型的target字段,其实就是一个Worker
对象。
那么大致性的了解了Worker
结构之后,我们回到addWorker
方法,继续往下看:
private boolean addWorker(Runnable firstTask, boolean core) {
//...
try {
// 将任务传给Worker,构造一个实例对象
w = new Worker(firstTask);
// 获取创建的线程
final Thread t = w.thread;
if (t != null) {
// 获取锁
final ReentrantLock mainLock = this.mainLock;
// 上锁,因为要修改workers集合
mainLock.lock();
try {
//检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经启动,报错
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加worker到workers集合中
workers.add(w);
int s = workers.size();
// 更新最大出现过的线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程启动不成功
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
看完了这个方法,大致性的概括就是:
生成一个Thread和Worker对象,将Thread对象赋值到Worker中的内部字段thread,将Worker对象作为Thread启动要执行的任务。
特别注意,这样创建一个的Thread,在调用start启动后,因为内部的Runnable对象是对应的Worker对象,会去执行Worker对象中的run方法:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取一开始要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//要么执行firstTask,要么从队列中阻塞获取任务)
while (task != null || (task = getTask()) != null) {
w.lock();
//线程池在运行
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 打断线程
wt.interrupt();
try {
//留给子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//留给子类实现
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 如果退出循环有几种可能性,一种是限制了线程池保留闲暇状态Worker的时长并超过了一定时间
// 另一种是线程被打断
completedAbruptly = false;
} finally {
// 准备退出
processWorkerExit(w, completedAbruptly);
}
}
而getTask
的实现是:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许闲暇的核心线程超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果允许超时,则是限制时间的获取,否则是无时间限制的阻塞获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从该方法可以看出,核心线程也可以设置超时,只要将allowCoreThreadTimeOut
设置为true。
那么退出的流程就是:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//移除该worker
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果设置了CoreThread闲暇状态的超时,就不会创建worker
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 添加一个Worker
addWorker(null, false);
}
}
线程池的源码就看到这里,如果要概括一下,大概就是:
- 线程池实现线程复用的原理是,并不将用户提交的任务封装为一个Thread对象,而是让创建出来的线程循环地从阻塞队列中获取任务,在线程中执行该任务。
- 核心线程是在线程池中的“常驻线程”,而非核心线程只是在允许的范围内创建出来“救急的”。通常允许核心线程无时间限制地存在并从阻塞队列中获取任务,但是也可以设置核心线程的超时,比如下面这个例子:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
Runnable runnable = ()->{
System.out.println(Thread.currentThread().hashCode());
};
for(int i = 0;i<3;i++){
threadPoolExecutor.execute(runnable);
Thread.sleep(1000);
}
}
在这个例子中,我是用ThreadPoolExecutor
的构造方法模拟了一个FixedThreadPool
,但并不获取一个ExecutorService
接口,而是直接获取它的实现类,以便在后面进行一些高级设置。
这里将线程池设置了固定大小为2,提交的任务就是输出一下当前线程的哈希值,用于分辨是不是同一个线程处理的。 在循环中,每隔1秒提交了一个任务,最终输出如下:
1868701981
1351769778
1868701981
可见,第一和第三个任务是由同一个线程处理的,这是因为核心线程会一直阻塞等待队列中的任务。
但是我们只要将上面的源程序加上一行代码:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// 设置允许核心线程超时
threadPoolExecutor.allowCoreThreadTimeOut(true);
Runnable runnable = ()->{
System.out.println(Thread.currentThread().hashCode());
};
for(int i = 0;i<3;i++){
threadPoolExecutor.execute(runnable);
Thread.sleep(1000);
}
}
再次运行程序,结果如下:
1868701981
1351769778
1434575218
却发现每个任务都是由不同的线程处理的,这是因为在构造方法中设置了超时时间,并且允许了核心线程超时,核心线程在这个超时时间之内没有从队列中取到任务,便会销毁。直到下一个任务到来,产生新的Worker。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/threadpoolexecutor%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90-%e7%ba%bf%e7%a8%8b%e6%b1%a0%e5%a6%82%e4%bd%95%e5%ae%9e%e7%8e%b0%e7%ba%bf%e7%a8%8b%e5%a4%8d%e7%94%a8%ef%bc%9f/