并发机制的底层实现
本文内容基于 JDK1.8。
concurrent 包的实现
由于 Java 的 CAS 同时具有 volatile 读和 volatile 写的内存语义,因此 Java 线程之间的通信现在有了下面四种方式:
- A 线程写 volatile 变量,随后 B 线程读这个 volatile 变量。
- A 线程写 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
- A 线程用 CAS 更新一个 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
- A 线程用 CAS 更新一个 volatile 变量,随后 B 线程读这个 volatile 变量。
同时,volatile 变量的读/写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:
首先,声明共享变量为 volatile;
然后,使用 CAS 的原子条件更新来实现线程之间的同步;
同时,配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。
AQS,非阻塞数据结构和原子变量类(Java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent 包的实现示意图如下:
synchronized
synchronized 的要点
关键字 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块。
synchronized 有 3 种应用方式:
- 同步实例方法
- 同步静态方法
- 同步代码块
同步实例方法
:x: 错误示例 - 未同步的示例
@NotThreadSafe
public class SynchronizedDemo01 implements Runnable {
static int i = 0;
public void increase() {
i++;
}
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
increase();
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDemo01 instance = new SynchronizedDemo01();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
// 输出结果: 小于 200000 的随机数字
Java 实例方法同步是同步在拥有该方法的对象上。这样,每个实例其方法同步都同步在不同的对象上,即该方法所属的实例。只有一个线程能够在实例方法同步块中运行。如果有多个实例存在,那么一个线程一次可以在一个实例同步块中执行操作。一个实例一个线程。
@ThreadSafe
public class SynchronizedDemo02 implements Runnable {
static int i = 0;
public synchronized void increase() {
i++;
}
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
increase();
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDemo02 instance = new SynchronizedDemo02();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
// 输出结果:
// 2000000
同步静态方法
静态方法的同步是指同步在该方法所在的类对象上。因为在 JVM 中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态同步方法。
对于不同类中的静态同步方法,一个线程可以执行每个类中的静态同步方法而无需等待。不管类中的那个静态同步方法被调用,一个类只能由一个线程同时执行。
@ThreadSafe
public class SynchronizedDemo03 implements Runnable {
static int i = 0;
public static synchronized void increase() {
i++;
}
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
increase();
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new SynchronizedDemo03());
Thread t2 = new Thread(new SynchronizedDemo03());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
// 输出结果:
// 200000
同步代码块
有时你不需要同步整个方法,而是同步方法中的一部分。Java 可以对方法的一部分进行同步。
注意 Java 同步块构造器用括号将对象括起来。在上例中,使用了 this
,即为调用 add 方法的实例本身。在同步构造器中用括号括起来的对象叫做监视器对象。上述代码使用监视器对象同步,同步实例方法使用调用方法本身的实例作为监视器对象。
一次只有一个线程能够在同步于同一个监视器对象的 Java 方法内执行。
@ThreadSafe
public class SynchronizedDemo04 implements Runnable {
static int i = 0;
static SynchronizedDemo04 instance = new SynchronizedDemo04();
@Override
public void run() {
synchronized (instance) {
for (int j = 0; j < 100000; j++) {
i++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
// 输出结果:
// 200000
synchronized 的原理
synchronized 实现同步的基础是:Java 中的每一个对象都可以作为锁。
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的 Class 对象。
- 对于同步方法块,锁是 Synchonized 括号里配置的对象。
:point_right: 参考阅读:Java 并发编程:synchronized :point_right: 参考阅读:深入理解 Java 并发之 synchronized 实现原理
volatile
volatile 的要点
volatile 是轻量级的 synchronized,它在多处理器开发中保证了共享变量的“可见性”。
可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。
一旦一个共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:
- 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
- 禁止进行指令重排序。
如果一个字段被声明成 volatile,Java 线程内存模型确保所有线程看到这个变量的值是一致的。
volatile 的原理
观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令。
lock 前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供 3 个功能:
- 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
- 它会强制将对缓存的修改操作立即写入主存;
- 如果是写操作,它会导致其他 CPU 中对应的缓存行无效。
volatile 的应用场景
如果 volatile 变量修饰符使用恰当的话,它比 synchronized 的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。
但是,volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性。通常来说,使用 volatile 必须具备以下 2 个条件:
- 对变量的写操作不依赖于当前值
- 该变量没有包含在具有其他变量的不变式中
应用场景:
状态标记量
volatile boolean flag = false;
while(!flag) {
doSomething();
}
public void setFlag() {
flag = true;
}
double check
class Singleton {
private volatile static Singleton instance = null;
private Singleton() {}
public static Singleton getInstance() {
if(instance==null) {
synchronized (Singleton.class) {
if(instance==null)
instance = new Singleton();
}
}
return instance;
}
}
:point_right: 参考阅读:Java 并发编程:volatile 关键字解析
CAS
简介
CAS(Compare and Swap),字面意思为比较并交换。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。
操作
我们常常做这样的操作
if(a==b) {
a++;
}
试想一下如果在做 a++之前 a 的值被改变了怎么办?a++还执行吗?出现该问题的原因是在多线程环境下,a 的值处于一种不定的状态。采用锁可以解决此类问题,但 CAS 也可以解决,而且可以不加锁。
int expect = a;
if(a.compareAndSet(expect,a+1)) {
doSomeThing1();
} else {
doSomeThing2();
}
这样如果 a 的值被改变了 a++就不会被执行。按照上面的写法,a!=expect 之后,a++就不会被执行,如果我们还是想执行 a++操作怎么办,没关系,可以采用 while 循环
while(true) {
int expect = a;
if (a.compareAndSet(expect, a + 1)) {
doSomeThing1();
return;
} else {
doSomeThing2();
}
}
采用上面的写法,在没有锁的情况下实现了 a++操作,这实际上是一种非阻塞算法。
应用
非阻塞算法 (nonblocking algorithms)
一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。
现代的 CPU 提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。
拿出 AtomicInteger 来研究在没有锁的情况下是如何做到数据正确性的。
private volatile int value;
首先毫无疑问,在没有锁的机制下可能需要借助 volatile 原语,保证线程间的数据是可见的(共享的)。
这样在获取变量的值的时候才能直接读取。
public final int get() {
return value;
}
然后来看看++i 是怎么做到的。
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
在这里采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。
而 compareAndSet 利用 JNI 来完成 CPU 指令的操作。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
整体的过程就是这样子的,利用 CPU 的 CAS 指令,同时借助 JNI 来完成 Java 的非阻塞算法。其它原子操作都是利用类似的特性完成的。
其中 unsafe.compareAndSwapInt(this, valueOffset, expect, update)类似:
if (this == expect) {
this = update
return true;
} else {
return false;
}
那么问题就来了,成功过程中需要 2 个步骤:比较 this == expect,替换 this = update,compareAndSwapInt 如何这两个步骤的原子性呢? 参考 CAS 的原理
原理
Java 代码如何确保处理器执行 CAS 操作?
CAS 通过调用 JNI(JNI:Java Native Interface 为 Java 本地调用,允许 Java 调用其他语言。)的代码实现的。JVM 将 CAS 操作编译为底层提供的最有效方法。在支持 CAS 的处理器上,JVM 将它们编译为相应的机器指令;在不支持 CAS 的处理器上,JVM 将使用自旋锁。
特点
优点
一般情况下,比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程被阻塞时的等待时间。
缺点
ABA 问题
因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。
从 Java1.5 开始 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长开销大
自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
比较花费 CPU 资源,即使没有任何用也会做一些无用功。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。
总结
可以用 CAS 在无锁的情况下实现原子操作,但要明确应用场合,非常简单的操作且又不想引入锁可以考虑使用 CAS 操作,当想要非阻塞地完成某一操作也可以考虑 CAS。不推荐在复杂操作中引入 CAS,会使程序可读性变差,且难以测试,同时会出现 ABA 问题。