ThreadPoolExecutor源码分析-线程池如何实现线程复用?

线程的复用问题

在开始看线程池的源码之前,先来看这么一个问题:

一个Thread对象可以重复地调用start()方法吗?

试试就知道了:

@Test
public void test(){
    Thread thread = new Thread(()->{
        System.out.println("线程启动");
    });
    thread.start();
    thread.start();
}

输出是:

线程启动

java.lang.IllegalThreadStateException
    at java.lang.Thread.start(Thread.java:708)
    at com.rhett.thread.TestThreadPool.test(TestThreadPool.java:22)
    ...

可见线程只能启动一次,并且原因就是以前介绍过的Java线程的状态:

ThreadPoolExecutor源码分析-线程池如何实现线程复用?

线程在run()方法执行完之后就会进入TERMINATED状态,Java会负责系统调用对线程进行销毁,自然也不可以再次启动。

那么我们常常说Java中的线程池可以实现线程复用,是怎么实现的呢?

一种很直观的想法是,既然线程启动会检查Thread中的Runnable类型的target是否为空,如果不为空则执行其run方法,那么只要每次线程执行完之后替换一下target对象就可以实现线程复用了。但是这种假设已经被我们验证是错误的了:线程根本不能重复调用start方法,无法做到多次启动。

那么另一种想法就是我们在Java线程池详解中手动实现的线程池的方法,就是通过阻塞队列实现的生产者消费者模型,只要队列未满,生产者就不断产生新线程,并添加到阻塞队列中,消费者只需要从阻塞队列中取线程对象就可以了。

这种方法实现线程池,看似没有问题,但并没有实现真正的线程复用,当线程用的快的时候,还是必须不断生成新线程,以满足需要。

下面就来看看JDK中线程池是怎么实现线程复用的:

ThreadPoolExecutor源码分析

其实ThreadPoolExecutor的一些属性在Java线程池详解中已经看得挺多的了,不过那篇确实借鉴别人的比较多,也没有深入了解线程复用的原理。

这里还是可以先把几个核心的内容看一下:

// ctl:前三位为线程池的状态,后面为线程池中线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 对应五种状态,是ctl的前三位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程池的状态(通过获取ctl的前三位)
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取线程池的线程数量(通过获取ctl的低位部分)
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 任务队列  
private final BlockingQueue<Runnable> workQueue;

// 工作线程的集合(核心容器)
private final HashSet<Worker> workers = new HashSet<Worker>();

// 线程工厂,可以给线程起名
private volatile ThreadFactory threadFactory;

// 拒绝策略
private volatile RejectedExecutionHandler handler;

// 救急(非核心)线程空闲状态能存在的最长时间
private volatile long keepAliveTime;

// 最大常驻(核心)线程个数
private volatile int corePoolSize;

// 线程池最多同时拥有的线程个数
private volatile int maximumPoolSize;

// 默认的拒绝策略:让调用者抛出 RejectedExecutionException 异常
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

有一些字段暂时看不懂也没有关系,看着看着就懂了,首先就从使用线程池的常用方法execute方法进入源码分析吧:

// 传入一个任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取ctl
    int c = ctl.get();
    // 看看当前工作线程个数是否小于 corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个核心Worker
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果工作线程个数不小于 corePoolSize,或添加Worker失败
    // 将任务添加到任务队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 添加一个非核心的Worker
    else if (!addWorker(command, false))
        // 如果添加非核心的Worker还失败了,则执行拒绝策略
        reject(command);
}

可以看到,大致是一个这样的流程:

提交任务:
1. 若worker数量小于corePoolSize,创建一个核心worker,完成任务
2. 若如果工作线程个数不小于 corePoolSize,或添加Worker失败,将任务添加到任务队列中
3. 若以上条件都不满足,但worker数小于maximumPoolSize,创建一个非核心worker来完成任务。
4. 若以上条件都不满足,执行拒绝策略

那么就先看一下addWorker这个关键方法:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池要关闭的情况,不添加
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程池中worker数量
            int wc = workerCountOf(c);
            // 如果大于最大数量,添加失败
            if (wc >= CAPACITY ||
            // 如果要创建核心worker,不能大于等于corePoolSize
            // 如果要创建非核心worker,不能大于等于maximumPoolSize
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 通过CAS去修改ctl字段中的worker个数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将任务传给Worker,构造一个实例对象
        w = new Worker(firstTask);
        // ...
}

看到这里,想必你也很好奇Worker到底是什么?它是干什么的?

那么就进一段小插曲,先看下Worker这个内部类,方便后面的理解:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    // worker在哪个线程中run 
    final Thread thread;
    // worker初次构造时要执行的任务
    Runnable firstTask;
    // 执行过的任务数量
    volatile long completedTasks;

    // 构造方法,传入一个firstTask
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // Worker实现了Runnable接口,这里实现了run方法
    public void run() {
        runWorker(this);
    }

    // 是被一个线程独占的吗
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试让一个线程独占
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 尝试释放独占锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    //用于中断线程的方法
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

必须要注意到的是Worker实现了Runnable接口,并继承了AQS,用于一个线程的独占。

其中的构造方法有一句this.thread = getThreadFactory().newThread(this);,是通过ThreadFactory来生成一个Thread对象,那么假设我们构造线程池的时候没有传入ThreadFactory对象,它的默认ThreadFactory对象是什么呢?关于这个可以在简要版的构造函数内看到:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

可见默认的ThreadFactory对象是Executors.defaultThreadFactory(),也就是Executors内部类DefaultThreadFactory

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

那么DefaultThreadFactory的实现如下:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                                Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                        poolNumber.getAndIncrement() +
                        "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                                namePrefix + threadNumber.getAndIncrement(),
                                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

可见这里的线程名默认是pool-线程池号-thread-线程号,而线程池号和线程号都是通过一个AtomicInteger记录的,线程池号每构造一个DefaultThreadFactory对象递增1,线程号则是DefaultThreadFactory对象每次生成一个线程则递增1。

我们还注意到,在上面调用newThread创建线程的时候,把this传进去了,this就是当前的Worker对象,也就是说,创建出来的线程对象中的Runnable类型的target字段,其实就是一个Worker对象。

那么大致性的了解了Worker结构之后,我们回到addWorker方法,继续往下看:

private boolean addWorker(Runnable firstTask, boolean core) {
    //...

    try {
        // 将任务传给Worker,构造一个实例对象
        w = new Worker(firstTask);
        // 获取创建的线程
        final Thread t = w.thread;
        if (t != null) {
            // 获取锁
            final ReentrantLock mainLock = this.mainLock;
            // 上锁,因为要修改workers集合
            mainLock.lock();
            try {
                //检查线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 如果线程已经启动,报错
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 添加worker到workers集合中
                    workers.add(w);
                    int s = workers.size();
                    // 更新最大出现过的线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    // 如果线程启动不成功
    if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
}

看完了这个方法,大致性的概括就是:

生成一个Thread和Worker对象,将Thread对象赋值到Worker中的内部字段thread,将Worker对象作为Thread启动要执行的任务。

特别注意,这样创建一个的Thread,在调用start启动后,因为内部的Runnable对象是对应的Worker对象,会去执行Worker对象中的run方法:

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取一开始要执行的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        //要么执行firstTask,要么从队列中阻塞获取任务)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池在运行
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 打断线程
                wt.interrupt();
            try {
                //留给子类实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //留给子类实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // 如果退出循环有几种可能性,一种是限制了线程池保留闲暇状态Worker的时长并超过了一定时间
        // 另一种是线程被打断
        completedAbruptly = false;
    } finally {
        // 准备退出
        processWorkerExit(w, completedAbruptly);
    }
}

getTask的实现是:

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否允许闲暇的核心线程超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果允许超时,则是限制时间的获取,否则是无时间限制的阻塞获取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从该方法可以看出,核心线程也可以设置超时,只要将allowCoreThreadTimeOut设置为true。

那么退出的流程就是:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //移除该worker
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果设置了CoreThread闲暇状态的超时,就不会创建worker
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        // 添加一个Worker
        addWorker(null, false);
    }
}

线程池的源码就看到这里,如果要概括一下,大概就是:

  1. 线程池实现线程复用的原理是,并不将用户提交的任务封装为一个Thread对象,而是让创建出来的线程循环地从阻塞队列中获取任务,在线程中执行该任务。
  2. 核心线程是在线程池中的“常驻线程”,而非核心线程只是在允许的范围内创建出来“救急的”。通常允许核心线程无时间限制地存在并从阻塞队列中获取任务,但是也可以设置核心线程的超时,比如下面这个例子:
public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    Runnable runnable = ()->{
        System.out.println(Thread.currentThread().hashCode());
    };

    for(int i = 0;i<3;i++){
        threadPoolExecutor.execute(runnable);
        Thread.sleep(1000);
    }
}

在这个例子中,我是用ThreadPoolExecutor的构造方法模拟了一个FixedThreadPool,但并不获取一个ExecutorService接口,而是直接获取它的实现类,以便在后面进行一些高级设置。

这里将线程池设置了固定大小为2,提交的任务就是输出一下当前线程的哈希值,用于分辨是不是同一个线程处理的。 在循环中,每隔1秒提交了一个任务,最终输出如下:

1868701981
1351769778
1868701981

可见,第一和第三个任务是由同一个线程处理的,这是因为核心线程会一直阻塞等待队列中的任务。

但是我们只要将上面的源程序加上一行代码:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    // 设置允许核心线程超时
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    Runnable runnable = ()->{
        System.out.println(Thread.currentThread().hashCode());
    };

    for(int i = 0;i<3;i++){
        threadPoolExecutor.execute(runnable);
        Thread.sleep(1000);
    }
}

再次运行程序,结果如下:

1868701981
1351769778
1434575218

却发现每个任务都是由不同的线程处理的,这是因为在构造方法中设置了超时时间,并且允许了核心线程超时,核心线程在这个超时时间之内没有从队列中取到任务,便会销毁。直到下一个任务到来,产生新的Worker。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/threadpoolexecutor%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90-%e7%ba%bf%e7%a8%8b%e6%b1%a0%e5%a6%82%e4%bd%95%e5%ae%9e%e7%8e%b0%e7%ba%bf%e7%a8%8b%e5%a4%8d%e7%94%a8%ef%bc%9f/

(1)
彭晨涛彭晨涛管理者
上一篇 2020年5月21日
下一篇 2020年5月22日

相关推荐

发表回复

登录后才能评论