Java并发成神之路-精通JUC并发工具十八般武艺——原子类(四)

什么是原子类,有什么作用?

  • 不可分割
  • 一个操作是不可中断的,即便是多线程的情况下也可以保证
  • java.util.concurrent.atomic
  • 原子类的作用和锁类似,是为了保证并发情况下线程安全。不过原子类相比于锁,有一定的优势:
    • 粒度更细:原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况了,通常锁的粒度都要大于原子变量的粒度
    • 效率更高:通常,使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况

6类原子类纵览

类型描述 原子类
Atomic*基本类型原子类 AtomicInteger
AtomicLong
AtomicBoolean
Atomic*array数组类型原子类 AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
Atomic*Reference引用类型原子类 AtomicReference
AtomicStampedReference
AtomicMarkableReference
Atomic*FieldUpdater升级类型原子类 AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
Adder累加器 LongAdder
DoubleAdder
Accumulator累加器 LongAccumulator
DoubleAccumulator

Atomic*基本类型原子类,以AtomicInteger为例

AtomicInteger常用方法

  • public final int get()//获取当前的值
  • public final int getAndSet()//获取当前的值,并设置新值
  • public final int getAndIncrement()//获取当前的值,并自增
  • public final int getAndDecrement()//获取当前值,并自减
  • public final int getAndAdd(int delta)//获取当前值,并加上预期的值
  • boolean compareAndSet(int expect,int update)//如果输入的数组等于预期值,则以原子方式将该值设置为输入值(先判断当前值是否是expect,如果是则修改为update的值,不是返回false)

Atomic*Array数组类型原子类

  • 里面的每个元素都是原子类,都是线程安全的

Atomic*Reference引用类型原子类

  • AtomicReference:AtomicReference类的作用,和AtomicInteger并没有本质区别,AtomicInteger可以让一个整数保证原子性,而AtomicReference可以让一个对象保证原子性,当然AtomicReference的功能明显比AtomicInteger强,因为一个对象里面可以包含很多属性。
  • 用法和AtomicInteger类似

    代码如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class SpinLock {

    private AtomicReference<Thread> sign = new AtomicReference<>();

    public void lock() {
    Thread thread = Thread.currentThread();
    while (!sign.compareAndSet(null, thread)) {
    }
    }

    public void unlock() {
    Thread thread = Thread.currentThread();
    sign.compareAndSet(thread, null);
    }

    public static void main(String[] args) {
    SpinLock spinLock = new SpinLock();
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");
    spinLock.lock();
    System.out.println(Thread.currentThread().getName() + "获得自旋锁");
    try {
    Thread.sleep(300);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    spinLock.unlock();
    System.out.println(Thread.currentThread().getName()+"释放了自旋锁");
    }
    }
    };
    new Thread(runnable).start();
    new Thread(runnable).start();
    }
    }

    把普通变量升级为原子类:用AtomicIntegerFieldUpdater升级原有变量

  • AtomicIntegerFieldUpdater对普通变量进行升级
  • 使用场景:偶尔需要一个原子get-set操作(大多数情况的时候不会有并发情况,只有在某一时间段,或者某一个操作的时候才会有)

    代码如下所示,在使用的时候使用AtomicIntegerFieldUpdater处理即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class AtomicIntegerFieldUpdaterDemo implements Runnable {

    static Candidate tom;
    static Candidate peter;

    public static AtomicIntegerFieldUpdater<Candidate> sourceUpdater =
    AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    @Override
    public void run() {
    for (int i = 0; i < 10000; i++) {
    peter.score++;
    sourceUpdater.getAndIncrement(tom);
    }
    }

    public static class Candidate {
    volatile int score;
    }

    public static void main(String[] args) throws InterruptedException {
    tom = new Candidate();
    peter = new Candidate();
    AtomicIntegerFieldUpdaterDemo a = new AtomicIntegerFieldUpdaterDemo();
    Thread t1 = new Thread(a);
    Thread t2 = new Thread(a);
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println("普通变量:"+peter.score);
    System.out.println("升级后的记过:"+tom.score);
    }
    }

    AtomicIntegerFieldUpdater的注意点

  • 可见范围(如果变量是不可见的就不能操作)
  • 不支持static

Adder累加器

  • 是Java8中引入的,相对是比较新的一个类
  • 高并发下LongAdder比AtomicLong效率高,不过本质是空间换时间
  • 竞争激烈的时候,LongAdder把不同的线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的概念,提高了并发性

    使用AtomicLongLongAdder进行自增操作,如下代码所示
    使用20个线程执行10000个任务,每个任务里面是自增一万次,总计一亿次
    LongAdder的耗时要远远低于AtomicLong
    AtomicLong每次执行加1之后,都需要把结果刷到主内存中,以保证其他线程可见
    LongAdder则不需要,只是每个线程单独计数,之后在进行汇总

AtomicLong

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AtomicLongDemo {

public static void main(String[] args) throws InterruptedException {
AtomicLong counter = new AtomicLong(0);
ExecutorService executorService = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
executorService.submit(new Task(counter));
}
executorService.shutdown();
while (!executorService.isTerminated()){}
long end = System.currentTimeMillis();
System.out.println(counter.get());
System.out.println("AtomicLong耗时:"+(end-start));
}

private static class Task implements Runnable {
private AtomicLong count;

public Task(AtomicLong count) {
this.count = count;
}

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
count.getAndIncrement();
}
}
}
}

结果:

1
2
100000000
AtomicLong耗时:1773

LongAdder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LongAdderDemo {

public static void main(String[] args) throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService executorService = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
executorService.submit(new Task(counter));
}
executorService.shutdown();
while (!executorService.isTerminated()){}
long end = System.currentTimeMillis();
System.out.println(counter.sum());
System.out.println("LongAdder 耗时:"+(end-start));
}

private static class Task implements Runnable {
private LongAdder count;

public Task(LongAdder count) {
this.count = count;
}

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
count.increment();
}
}
}
}
1
2
100000000
LongAdder 耗时:172

LongAdder带来的改进和原理

  • 在内部,这个LongAdder的实现原理和刚才的AtomicLong是有不同的,刚才的AtomicLong的实现原理是每一次加法都需要做同步,所以在高并发的时候会导致冲突比较多,也就降低了效率
  • 而此时的LongAdder,每个线程会有自己的一个计数器,仅用来在自己线程内计数,这样一来就不会有其他线程的计数器干扰
  • 第一线程的计数器数值为1的时候,可能线程2的计数器的值已经是3了,他们之间并不存在竞争关系,所以在加和的过程中,根本不需要同步机制,也不需要刚才的flush和refresh,这里也没有一个公共的counter来给所有线程统一计数
  • LongAdder引入分段累加的概念,内部有一个base变量和一个Cell[]数组共同参与计数:
    • base变量:竞争不激烈,直接累加到该变量上
    • Cell[]数组:竞争激烈,各个线程分散累加到自己的槽Cell[i]
  • sum()方法源码

    如下图所示,方法注释中也有讲解,由于是将Cell[]数组和base中的所有值相加,当循环Cell[]数组的时候,如果里面的值又有变动,最终获得结果就可能不精确,循环过的对象在发生改变,无法将值再累计到结果上
    如果调用方法的时候Cell[]数组不会再发生改变,得到的值就是精确结果

LongAdder类sum方法源码

对比AtomicLongLongAdder

  • 在低争用下,AtomicLongLongAdder这两个类具有相似的特征,但是在竞争激烈的情况下,LongAdder的预期吞吐量要高的多,但要消耗更多的空间
  • LongAdder适合的场景是统计求和和计数的场景,而且LongAdder基本只提供了add方法,而AtomicLong还具有cas方法

Accumulator累加器

  • AccumulatorAdder非常相似,Accumulator就是一个更通用版本的Adder

    具体代码实现如下所示,可以实现1-9的叠加,构造函数中的表达式里面的算法可以根据需要进行更该

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class LongAccumulatorDemo {

    public static void main(String[] args) {
    LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
    ExecutorService executorService = Executors.newFixedThreadPool(8);
    IntStream.range(1,10).forEach(i->executorService.submit(()->accumulator.accumulate(i)));
    System.out.println(accumulator.get());
    }
    }

    使用场景

  • 适用于需要大量计算,并且可以并行计算的场景
  • 不保证计算的顺序(如上代码,使用了线程池,不能保证那个线程先执行,也许是1+3+4+2…)