很久以前写过CAS应用之JUC下的原子类,但是LongAdder这个类没有去看,只是给了一个其他博客的参考链接。今天就自己来分析一下。
AtomicLong的问题和LongAdder的基本使用
与LongAdder功能类似的就是AtomicLong
了,但我们知道,AtomicLong
中只有一个原子变量value,多线程必须通过CAS自旋去尝试修改这个变量,同一时间只能有一个线程修改成功,会产生效率的问题。
而LongAdder,则是为了解决AtomicLong
的效率问题,它的思路是,既然多个线程同时去抢一个原子变量会有效率问题,那么可以将这一个原子变量分为多个原子变量,让多个线程去争抢,就能提高线程CAS成功的概率。
先看看LongAdder的用法:
public static void main(String[] args) {
LongAdder adder = new LongAdder();
System.out.println(adder.sum());
Runnable runnable = () -> {
long origin = adder.sum();
//加1
adder.increment();
System.out.println("加1之前:"+ origin+ ",加1之后:"+adder.sum());
};
for(int i = 0;i<10;i++){
new Thread(runnable).start();
}
}
在我电脑上的运行结果是:
0
加1之前:0,加1之后:1
加1之前:1,加1之后:2
加1之前:2,加1之后:3
加1之前:4,加1之后:5
加1之前:0,加1之后:7
加1之前:3,加1之后:4
加1之前:5,加1之后:7
加1之前:7,加1之后:8
加1之前:8,加1之后:9
加1之前:9,加1之后:10
可见,加之前和加之后获取的值并不一定相差1,但却很严谨地进行了原子累加操作,最后结果也是10。
除了通过increment()
累加1外,LongAdder还可以通过add()
方法增加其他的值。
源码分析

LongAdder继承自Striped64
,核心逻辑还是在Striped64
中,体现着JDK尽可能抽取公用复用部分的思想。
LongAdder的构造器方法没有任何东西,类中也看不到任何代表存储值的字段:
public LongAdder() {
}
那么就来看看sum()
方法获取的值是从哪来的:
public long sum() {
Cell[] as = cells; Cell a;
// 获取base值
long sum = base;
// 如果Cell数组不为null
if (as != null) {
// 遍历Cell数组,并加上每个Cell代表的值
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
从该方法看出,LongAdder存储的值分布于base和Cell数组中,为它们值的总和。
那么base和Cell数组在哪里呢?通过定位,发现在Striped64
类中:
//Cell内部类
@sun.misc.Contended static final class Cell {
volatile long value;
//可通过构造函数赋值
Cell(long x) { value = x; }
//提供CAS操作
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
// value字段的偏移地址
private static final long valueOffset;
// 获取value字段的偏移地址
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
// CPU的个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell数组
transient volatile Cell[] cells;
transient volatile long base;
// 用于保护cells数组更改的CAS变量
transient volatile int cellsBusy;
private static final sun.misc.Unsafe UNSAFE;
// base、cellsbusy字段和Thread threadLocalRandomProbe字段的偏移量
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
threadLocalRandomProbe
这个字段我们在Random在多线程下的问题以及ThreadLocalRandom类分析里见过,没想到这里也用上了。
看完了这些字段后可知,base和cells数组都没有经过初始化,即base默认为0,cells默认为null,这也符合初始化LongAdder值为0的结果。
那么,来看一下LongAdder的increment()
方法:
public void increment() {
add(1L);
}
就是调用了add()
函数加1:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 如果Cell数组为null,或长度为0
// 或要争抢的cell为null
// 或CAS要争抢的cell失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 交由longAccumulate处理
longAccumulate(x, null, uncontended);
}
}
可见这段代码的逻辑仍是优先对base变量进行CAS操作,当cells数组为空的时候,首先尝试CAS一次base变量,仅当争抢base变量没成功才继续往下走:
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
其次,以下任意条件都能触发longAccumulate
方法:
+ Cell数组为null
+ Cell数组长度为0
+ 要争抢的cell为null
+ CAS要争抢的cell失败
而线程是怎么确定要争抢哪个cell元素呢?通过代码中a = as[getProbe() & m]
可以看出,是通过线程中的threadLocalRandomProbe
与Cell数组的长度-1
做位与运算确定的。并且Striped64要求Cell的数组长度为2的整数幂,也就是Cell数组的长度-1
每一位都是1,位与之后的结果会尽量均匀。
那我们现在假设Cell数组还没有初始化,即Cell数组还是null,会触发longAccumulate
方法:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 如果probe为0,代表ThreadLocalRandom对象还没有获取,原因上一篇中看过了
if ((h = getProbe()) == 0) {
// 获取ThreadLocalRandom对象,同时初始化种子、探针等字段
ThreadLocalRandom.current();
// 获取探针
h = getProbe();
wasUncontended = true;
}
// 是否发生碰撞
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
// Cell数组是否初始化且长度不为0
if ((as = cells) != null && (n = as.length) > 0) {
// 如果线程对应的Cell为null
if ((a = as[(n - 1) & h]) == null) {
// 如果cellsBusy没有被锁上
if (cellsBusy == 0) {
// 先获取一个Cell对象
Cell r = new Cell(x);
// 尝试锁住cellsBusy
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
// 将对应的cell赋值为创建的Cell对象
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
// 线程对应的Cell不为null,尝试CAS更改
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果CAS对应的cell失败
// 若Cell数组的长度大于CPU个数,或者cells被更改了,都不进行扩容
else if (n >= NCPU || cells != as)
collide = false;
// 否则进行扩容
else if (!collide)
collide = true;
// 尝试cas锁上cellsBusy
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//进行扩容
if (cells == as) {
//扩容一倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
//更改一下probe探针
h = advanceProbe(h);
}
// 如果cell数组没有初始化,锁上cellsBusy进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
//cells没有被更改
if (cells == as) {
// 初始化一个长度为2的Cell数组
Cell[] rs = new Cell[2];
// 给线程对应的Cell初始化值
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 如果CAS锁cellsBusy变量没有成功,再次尝试CAS base变量,如果成功就不用搞了,失败继续循环
else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
break;
}
}
可见,Cell数组中的元素是懒惰初始化的,并且数组初始容量为2,如果发生碰撞还会对数组进行扩容,不过最终不会超过CPU的个数。
过程中会反复尝试CAS cellsBusy锁变量,如果失败,还会再次去尝试CAS base变量,如果成功就不需要更改Cell数组了。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/jdk8%e6%96%b0%e5%a2%9e%e9%ab%98%e6%95%88%e5%8e%9f%e5%ad%90%e7%b4%af%e5%8a%a0%e5%99%a8longadder%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/