JDK8新增高效原子累加器LongAdder源码分析

很久以前写过CAS应用之JUC下的原子类,但是LongAdder这个类没有去看,只是给了一个其他博客的参考链接。今天就自己来分析一下。

AtomicLong的问题和LongAdder的基本使用

与LongAdder功能类似的就是AtomicLong了,但我们知道,AtomicLong中只有一个原子变量value,多线程必须通过CAS自旋去尝试修改这个变量,同一时间只能有一个线程修改成功,会产生效率的问题。

而LongAdder,则是为了解决AtomicLong的效率问题,它的思路是,既然多个线程同时去抢一个原子变量会有效率问题,那么可以将这一个原子变量分为多个原子变量,让多个线程去争抢,就能提高线程CAS成功的概率。

先看看LongAdder的用法:

public static void main(String[] args) {
    LongAdder adder = new LongAdder();
    System.out.println(adder.sum());
    Runnable runnable = () -> {
        long origin = adder.sum();
        //加1
        adder.increment();
        System.out.println("加1之前:"+ origin+ ",加1之后:"+adder.sum());
    };

    for(int i = 0;i<10;i++){
        new Thread(runnable).start();
    }
}

在我电脑上的运行结果是:

0
加1之前:0,加1之后:1
加1之前:1,加1之后:2
加1之前:2,加1之后:3
加1之前:4,加1之后:5
加1之前:0,加1之后:7
加1之前:3,加1之后:4
加1之前:5,加1之后:7
加1之前:7,加1之后:8
加1之前:8,加1之后:9
加1之前:9,加1之后:10

可见,加之前和加之后获取的值并不一定相差1,但却很严谨地进行了原子累加操作,最后结果也是10。

除了通过increment()累加1外,LongAdder还可以通过add()方法增加其他的值。

源码分析

JDK8新增高效原子累加器LongAdder源码分析

LongAdder继承自Striped64,核心逻辑还是在Striped64中,体现着JDK尽可能抽取公用复用部分的思想。

LongAdder的构造器方法没有任何东西,类中也看不到任何代表存储值的字段:

public LongAdder() {
}

那么就来看看sum()方法获取的值是从哪来的:

public long sum() {
    Cell[] as = cells; Cell a;
    // 获取base值
    long sum = base;
    // 如果Cell数组不为null
    if (as != null) {
        // 遍历Cell数组,并加上每个Cell代表的值
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

从该方法看出,LongAdder存储的值分布于base和Cell数组中,为它们值的总和。

那么base和Cell数组在哪里呢?通过定位,发现在Striped64类中:

//Cell内部类
@sun.misc.Contended static final class Cell {
    volatile long value;
    //可通过构造函数赋值
    Cell(long x) { value = x; }
    //提供CAS操作
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    // value字段的偏移地址
    private static final long valueOffset;
    // 获取value字段的偏移地址
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// CPU的个数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// Cell数组
transient volatile Cell[] cells;

transient volatile long base;
// 用于保护cells数组更改的CAS变量
transient volatile int cellsBusy;

private static final sun.misc.Unsafe UNSAFE;
// base、cellsbusy字段和Thread threadLocalRandomProbe字段的偏移量
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> sk = Striped64.class;
        BASE = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("base"));
        CELLSBUSY = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("cellsBusy"));
        Class<?> tk = Thread.class;
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

threadLocalRandomProbe这个字段我们在Random在多线程下的问题以及ThreadLocalRandom类分析里见过,没想到这里也用上了。

看完了这些字段后可知,base和cells数组都没有经过初始化,即base默认为0,cells默认为null,这也符合初始化LongAdder值为0的结果。

那么,来看一下LongAdder的increment()方法:

public void increment() {
    add(1L);
}

就是调用了add()函数加1:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        // 如果Cell数组为null,或长度为0
        // 或要争抢的cell为null
        // 或CAS要争抢的cell失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 交由longAccumulate处理
            longAccumulate(x, null, uncontended);
    }
}

可见这段代码的逻辑仍是优先对base变量进行CAS操作,当cells数组为空的时候,首先尝试CAS一次base变量,仅当争抢base变量没成功才继续往下走:

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

其次,以下任意条件都能触发longAccumulate方法:
+ Cell数组为null
+ Cell数组长度为0
+ 要争抢的cell为null
+ CAS要争抢的cell失败

而线程是怎么确定要争抢哪个cell元素呢?通过代码中
a = as[getProbe() & m]可以看出,是通过线程中的threadLocalRandomProbeCell数组的长度-1做位与运算确定的。并且Striped64要求Cell的数组长度为2的整数幂,也就是Cell数组的长度-1每一位都是1,位与之后的结果会尽量均匀。

那我们现在假设Cell数组还没有初始化,即Cell数组还是null,会触发longAccumulate方法:

final void longAccumulate(long x, LongBinaryOperator fn,
                            boolean wasUncontended) {
    int h;
    // 如果probe为0,代表ThreadLocalRandom对象还没有获取,原因上一篇中看过了
    if ((h = getProbe()) == 0) {
        // 获取ThreadLocalRandom对象,同时初始化种子、探针等字段
        ThreadLocalRandom.current(); 
        // 获取探针
        h = getProbe();
        wasUncontended = true;
    }
    // 是否发生碰撞
    boolean collide = false;            
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // Cell数组是否初始化且长度不为0
        if ((as = cells) != null && (n = as.length) > 0) {
            // 如果线程对应的Cell为null
            if ((a = as[(n - 1) & h]) == null) {
                // 如果cellsBusy没有被锁上
                if (cellsBusy == 0) { 
                    // 先获取一个Cell对象
                    Cell r = new Cell(x); 
                    // 尝试锁住cellsBusy
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {  
                            // 将对应的cell赋值为创建的Cell对象
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;  
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       
                wasUncontended = true;      
            // 线程对应的Cell不为null,尝试CAS更改
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                break;
            // 如果CAS对应的cell失败
            // 若Cell数组的长度大于CPU个数,或者cells被更改了,都不进行扩容
            else if (n >= NCPU || cells != as)
                collide = false; 
            // 否则进行扩容       
            else if (!collide)
                collide = true;
            // 尝试cas锁上cellsBusy
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    //进行扩容
                    if (cells == as) {
                        //扩容一倍      
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;
            }
            //更改一下probe探针
            h = advanceProbe(h);
        }
        // 如果cell数组没有初始化,锁上cellsBusy进行初始化
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                   
                //cells没有被更改        
                if (cells == as) {
                    // 初始化一个长度为2的Cell数组
                    Cell[] rs = new Cell[2];
                    // 给线程对应的Cell初始化值
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 如果CAS锁cellsBusy变量没有成功,再次尝试CAS base变量,如果成功就不用搞了,失败继续循环
        else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
            break;                         
    }
}

可见,Cell数组中的元素是懒惰初始化的,并且数组初始容量为2,如果发生碰撞还会对数组进行扩容,不过最终不会超过CPU的个数。

过程中会反复尝试CAS cellsBusy锁变量,如果失败,还会再次去尝试CAS base变量,如果成功就不需要更改Cell数组了。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/jdk8%e6%96%b0%e5%a2%9e%e9%ab%98%e6%95%88%e5%8e%9f%e5%ad%90%e7%b4%af%e5%8a%a0%e5%99%a8longadder%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/

(0)
彭晨涛彭晨涛管理者
上一篇 2020年5月19日
下一篇 2020年5月20日

相关推荐

  • ArrayList源码分析

    总结 总结放前面防止太长不看: ArrayList内部是用数组实现的。 如果使用无参构造函数建立ArrayList,在添加第一个元素的时候会分配10个元素的空间。 ArrayLis…

    2019年11月22日
    0160
  • Java中SPI机制介绍和源码分析

    本文参考资源: 高级开发必须理解的Java中SPI机制 - 简书 什么是SPI SPI全称为Service Provider Interface,是一种服务发现机制。SPI 的本质…

    Java 2020年3月19日
    0710
  • 深入理解java虚拟机第三版读书笔记01

    做笔记之前的感言 谈到《深入理解java虚拟机》,在业内可太有名了,是国内的一位大神写的一本关于java虚拟机的畅销书,基本上对java稍有深入的程序员都听说过这本书。不过遗憾的是…

    2020年1月4日
    0300
  • Java线程池详解

    线程池就是享元模式和生产者消费者模式的应用 动手实现线程池 步骤1:自定义拒绝策略接口 @FunctionalInterface // 拒绝策略 interface RejectP…

    2020年2月3日
    0310
  • 为什么说Java只有值传递?

    先说一下。。以后可能不会怎么写Java相关的博客了,因为找到了字节跳动的实习工作,用Go/Python开发后端,所以这几天在抓紧时间学Go,在学Go的时候,了解到Go语言只有值传递…

    Java 2020年6月26日
    03110
  • 遗留线程安全类Vector和HashTable简要源码分析

    总结 总结放前面防止太长不看 Vector Vector就是使用synchronized限制线程安全的一个List实现。 Vector是基于数组实现的,默认初始容量是10,在构造的…

    Java 2020年2月15日
    0710
  • NIO零拷贝与其系统函数调用

    本文参考资源: Java NIO学习笔记四(零拷贝详解)Java拿笔小星的博客-CSDN博客 关于Buffer和Channel的注意事项和细节 ByteBuffer不止可以存取by…

    2020年3月14日
    0510
  • HashMap源码分析

    HashMap是java中非常常见的一个数据结构,在这篇文章里,我依然以Map中的操作为导向来循序渐进研究HashMap中的源码,阅读这篇文章需要的前置知识有: 弱平衡的二叉查找树…

    Java 2020年2月12日
    0230
  • 深入理解java虚拟机第三版读书笔记04

    以下是第三章 垃圾收集器与内存分配策略的内容 概述 程序计数器、虚拟机栈、本地方法栈是线程独有的,栈帧更是随方法结束而消亡,不需要垃圾回收。而堆和方法区则需要经过垃圾回收的设计 对…

    2020年1月8日
    0290
  • 深入理解java虚拟机第三版读书笔记13

    以下是第十三章 Java内存模型与线程的内容 线程安全 当多个线程同时访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任…

    2020年1月30日
    0130

发表回复

登录后才能评论