JDK8新增高效原子累加器LongAdder源码分析

很久以前写过CAS应用之JUC下的原子类,但是LongAdder这个类没有去看,只是给了一个其他博客的参考链接。今天就自己来分析一下。

AtomicLong的问题和LongAdder的基本使用

与LongAdder功能类似的就是AtomicLong了,但我们知道,AtomicLong中只有一个原子变量value,多线程必须通过CAS自旋去尝试修改这个变量,同一时间只能有一个线程修改成功,会产生效率的问题。

而LongAdder,则是为了解决AtomicLong的效率问题,它的思路是,既然多个线程同时去抢一个原子变量会有效率问题,那么可以将这一个原子变量分为多个原子变量,让多个线程去争抢,就能提高线程CAS成功的概率。

先看看LongAdder的用法:

public static void main(String[] args) {
    LongAdder adder = new LongAdder();
    System.out.println(adder.sum());
    Runnable runnable = () -> {
        long origin = adder.sum();
        //加1
        adder.increment();
        System.out.println("加1之前:"+ origin+ ",加1之后:"+adder.sum());
    };

    for(int i = 0;i<10;i++){
        new Thread(runnable).start();
    }
}

在我电脑上的运行结果是:

0
加1之前:0,加1之后:1
加1之前:1,加1之后:2
加1之前:2,加1之后:3
加1之前:4,加1之后:5
加1之前:0,加1之后:7
加1之前:3,加1之后:4
加1之前:5,加1之后:7
加1之前:7,加1之后:8
加1之前:8,加1之后:9
加1之前:9,加1之后:10

可见,加之前和加之后获取的值并不一定相差1,但却很严谨地进行了原子累加操作,最后结果也是10。

除了通过increment()累加1外,LongAdder还可以通过add()方法增加其他的值。

源码分析

JDK8新增高效原子累加器LongAdder源码分析

LongAdder继承自Striped64,核心逻辑还是在Striped64中,体现着JDK尽可能抽取公用复用部分的思想。

LongAdder的构造器方法没有任何东西,类中也看不到任何代表存储值的字段:

public LongAdder() {
}

那么就来看看sum()方法获取的值是从哪来的:

public long sum() {
    Cell[] as = cells; Cell a;
    // 获取base值
    long sum = base;
    // 如果Cell数组不为null
    if (as != null) {
        // 遍历Cell数组,并加上每个Cell代表的值
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

从该方法看出,LongAdder存储的值分布于base和Cell数组中,为它们值的总和。

那么base和Cell数组在哪里呢?通过定位,发现在Striped64类中:

//Cell内部类
@sun.misc.Contended static final class Cell {
    volatile long value;
    //可通过构造函数赋值
    Cell(long x) { value = x; }
    //提供CAS操作
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    // value字段的偏移地址
    private static final long valueOffset;
    // 获取value字段的偏移地址
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// CPU的个数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// Cell数组
transient volatile Cell[] cells;

transient volatile long base;
// 用于保护cells数组更改的CAS变量
transient volatile int cellsBusy;

private static final sun.misc.Unsafe UNSAFE;
// base、cellsbusy字段和Thread threadLocalRandomProbe字段的偏移量
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> sk = Striped64.class;
        BASE = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("base"));
        CELLSBUSY = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("cellsBusy"));
        Class<?> tk = Thread.class;
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

threadLocalRandomProbe这个字段我们在Random在多线程下的问题以及ThreadLocalRandom类分析里见过,没想到这里也用上了。

看完了这些字段后可知,base和cells数组都没有经过初始化,即base默认为0,cells默认为null,这也符合初始化LongAdder值为0的结果。

那么,来看一下LongAdder的increment()方法:

public void increment() {
    add(1L);
}

就是调用了add()函数加1:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        // 如果Cell数组为null,或长度为0
        // 或要争抢的cell为null
        // 或CAS要争抢的cell失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 交由longAccumulate处理
            longAccumulate(x, null, uncontended);
    }
}

可见这段代码的逻辑仍是优先对base变量进行CAS操作,当cells数组为空的时候,首先尝试CAS一次base变量,仅当争抢base变量没成功才继续往下走:

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

其次,以下任意条件都能触发longAccumulate方法:
+ Cell数组为null
+ Cell数组长度为0
+ 要争抢的cell为null
+ CAS要争抢的cell失败

而线程是怎么确定要争抢哪个cell元素呢?通过代码中
a = as[getProbe() & m]可以看出,是通过线程中的threadLocalRandomProbeCell数组的长度-1做位与运算确定的。并且Striped64要求Cell的数组长度为2的整数幂,也就是Cell数组的长度-1每一位都是1,位与之后的结果会尽量均匀。

那我们现在假设Cell数组还没有初始化,即Cell数组还是null,会触发longAccumulate方法:

final void longAccumulate(long x, LongBinaryOperator fn,
                            boolean wasUncontended) {
    int h;
    // 如果probe为0,代表ThreadLocalRandom对象还没有获取,原因上一篇中看过了
    if ((h = getProbe()) == 0) {
        // 获取ThreadLocalRandom对象,同时初始化种子、探针等字段
        ThreadLocalRandom.current(); 
        // 获取探针
        h = getProbe();
        wasUncontended = true;
    }
    // 是否发生碰撞
    boolean collide = false;            
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // Cell数组是否初始化且长度不为0
        if ((as = cells) != null && (n = as.length) > 0) {
            // 如果线程对应的Cell为null
            if ((a = as[(n - 1) & h]) == null) {
                // 如果cellsBusy没有被锁上
                if (cellsBusy == 0) { 
                    // 先获取一个Cell对象
                    Cell r = new Cell(x); 
                    // 尝试锁住cellsBusy
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {  
                            // 将对应的cell赋值为创建的Cell对象
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;  
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       
                wasUncontended = true;      
            // 线程对应的Cell不为null,尝试CAS更改
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                break;
            // 如果CAS对应的cell失败
            // 若Cell数组的长度大于CPU个数,或者cells被更改了,都不进行扩容
            else if (n >= NCPU || cells != as)
                collide = false; 
            // 否则进行扩容       
            else if (!collide)
                collide = true;
            // 尝试cas锁上cellsBusy
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    //进行扩容
                    if (cells == as) {
                        //扩容一倍      
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;
            }
            //更改一下probe探针
            h = advanceProbe(h);
        }
        // 如果cell数组没有初始化,锁上cellsBusy进行初始化
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                   
                //cells没有被更改        
                if (cells == as) {
                    // 初始化一个长度为2的Cell数组
                    Cell[] rs = new Cell[2];
                    // 给线程对应的Cell初始化值
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 如果CAS锁cellsBusy变量没有成功,再次尝试CAS base变量,如果成功就不用搞了,失败继续循环
        else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
            break;                         
    }
}

可见,Cell数组中的元素是懒惰初始化的,并且数组初始容量为2,如果发生碰撞还会对数组进行扩容,不过最终不会超过CPU的个数。

过程中会反复尝试CAS cellsBusy锁变量,如果失败,还会再次去尝试CAS base变量,如果成功就不需要更改Cell数组了。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/jdk8%e6%96%b0%e5%a2%9e%e9%ab%98%e6%95%88%e5%8e%9f%e5%ad%90%e7%b4%af%e5%8a%a0%e5%99%a8longadder%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/

发表评论

电子邮件地址不会被公开。