并发机制的底层实现

本文内容基于 JDK1.8。

concurrent 包的实现

由于 Java 的 CAS 同时具有 volatile 读和 volatile 写的内存语义,因此 Java 线程之间的通信现在有了下面四种方式:

  1. A 线程写 volatile 变量,随后 B 线程读这个 volatile 变量。
  2. A 线程写 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
  3. A 线程用 CAS 更新一个 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
  4. A 线程用 CAS 更新一个 volatile 变量,随后 B 线程读这个 volatile 变量。

同时,volatile 变量的读/写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:

首先,声明共享变量为 volatile;

然后,使用 CAS 的原子条件更新来实现线程之间的同步;

同时,配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(Java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent 包的实现示意图如下:

synchronized

synchronized 的要点

关键字 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块。

synchronized 有 3 种应用方式:

  1. 同步实例方法
  2. 同步静态方法
  3. 同步代码块

同步实例方法

:x: 错误示例 - 未同步的示例

@NotThreadSafe
public class SynchronizedDemo01 implements Runnable {
    static int i = 0;

    public void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDemo01 instance = new SynchronizedDemo01();
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 输出结果: 小于 200000 的随机数字

Java 实例方法同步是同步在拥有该方法的对象上。这样,每个实例其方法同步都同步在不同的对象上,即该方法所属的实例。只有一个线程能够在实例方法同步块中运行。如果有多个实例存在,那么一个线程一次可以在一个实例同步块中执行操作。一个实例一个线程。

@ThreadSafe
public class SynchronizedDemo02 implements Runnable {
    static int i = 0;

    public synchronized void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDemo02 instance = new SynchronizedDemo02();
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 输出结果:
// 2000000

同步静态方法

静态方法的同步是指同步在该方法所在的类对象上。因为在 JVM 中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态同步方法。

对于不同类中的静态同步方法,一个线程可以执行每个类中的静态同步方法而无需等待。不管类中的那个静态同步方法被调用,一个类只能由一个线程同时执行。

@ThreadSafe
public class SynchronizedDemo03 implements Runnable {
    static int i = 0;

    public static synchronized void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new SynchronizedDemo03());
        Thread t2 = new Thread(new SynchronizedDemo03());
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 输出结果:
// 200000

同步代码块

有时你不需要同步整个方法,而是同步方法中的一部分。Java 可以对方法的一部分进行同步。

注意 Java 同步块构造器用括号将对象括起来。在上例中,使用了 this,即为调用 add 方法的实例本身。在同步构造器中用括号括起来的对象叫做监视器对象。上述代码使用监视器对象同步,同步实例方法使用调用方法本身的实例作为监视器对象。

一次只有一个线程能够在同步于同一个监视器对象的 Java 方法内执行。

@ThreadSafe
public class SynchronizedDemo04 implements Runnable {
    static int i = 0;
    static SynchronizedDemo04 instance = new SynchronizedDemo04();

    @Override
    public void run() {
        synchronized (instance) {
            for (int j = 0; j < 100000; j++) {
                i++;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 输出结果:
// 200000

synchronized 的原理

synchronized 实现同步的基础是:Java 中的每一个对象都可以作为锁。

  • 对于普通同步方法,锁是当前实例对象。
  • 对于静态同步方法,锁是当前类的 Class 对象。
  • 对于同步方法块,锁是 Synchonized 括号里配置的对象。

:point_right: 参考阅读:Java 并发编程:synchronized :point_right: 参考阅读:深入理解 Java 并发之 synchronized 实现原理

volatile

volatile 的要点

volatile 是轻量级的 synchronized,它在多处理器开发中保证了共享变量的“可见性”。

可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。

一旦一个共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:

  1. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  2. 禁止进行指令重排序。

如果一个字段被声明成 volatile,Java 线程内存模型确保所有线程看到这个变量的值是一致的。

volatile 的原理

观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令。

lock 前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供 3 个功能:

  • 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
  • 它会强制将对缓存的修改操作立即写入主存;
  • 如果是写操作,它会导致其他 CPU 中对应的缓存行无效。

volatile 的应用场景

如果 volatile 变量修饰符使用恰当的话,它比 synchronized 的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。

但是,volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性。通常来说,使用 volatile 必须具备以下 2 个条件:

  1. 对变量的写操作不依赖于当前值
  2. 该变量没有包含在具有其他变量的不变式中

应用场景:

状态标记量

volatile boolean flag = false;

while(!flag) {
    doSomething();
}

public void setFlag() {
    flag = true;
}

double check

class Singleton {
    private volatile static Singleton instance = null;

    private Singleton() {}

    public static Singleton getInstance() {
        if(instance==null) {
            synchronized (Singleton.class) {
                if(instance==null)
                    instance = new Singleton();
            }
        }
        return instance;
    }
}

:point_right: 参考阅读:Java 并发编程:volatile 关键字解析

CAS

简介

CAS(Compare and Swap),字面意思为比较并交换。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

操作

我们常常做这样的操作

if(a==b) {
    a++;
}

试想一下如果在做 a++之前 a 的值被改变了怎么办?a++还执行吗?出现该问题的原因是在多线程环境下,a 的值处于一种不定的状态。采用锁可以解决此类问题,但 CAS 也可以解决,而且可以不加锁。

int expect = a;
if(a.compareAndSet(expect,a+1)) {
    doSomeThing1();
} else {
    doSomeThing2();
}

这样如果 a 的值被改变了 a++就不会被执行。按照上面的写法,a!=expect 之后,a++就不会被执行,如果我们还是想执行 a++操作怎么办,没关系,可以采用 while 循环

while(true) {
    int expect = a;
    if (a.compareAndSet(expect, a + 1)) {
        doSomeThing1();
        return;
    } else {
        doSomeThing2();
    }
}

采用上面的写法,在没有锁的情况下实现了 a++操作,这实际上是一种非阻塞算法。

应用

非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。

现代的 CPU 提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。

拿出 AtomicInteger 来研究在没有锁的情况下是如何做到数据正确性的。

private volatile int value;

首先毫无疑问,在没有锁的机制下可能需要借助 volatile 原语,保证线程间的数据是可见的(共享的)。

这样在获取变量的值的时候才能直接读取。

public final int get() {
    return value;
}

然后来看看++i 是怎么做到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
            if (compareAndSet(current, next))
                return next;
    }
}

在这里采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。

而 compareAndSet 利用 JNI 来完成 CPU 指令的操作。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整体的过程就是这样子的,利用 CPU 的 CAS 指令,同时借助 JNI 来完成 Java 的非阻塞算法。其它原子操作都是利用类似的特性完成的。

其中 unsafe.compareAndSwapInt(this, valueOffset, expect, update)类似:

if (this == expect) {
    this = update
    return true;
} else {
    return false;
}

那么问题就来了,成功过程中需要 2 个步骤:比较 this == expect,替换 this = update,compareAndSwapInt 如何这两个步骤的原子性呢? 参考 CAS 的原理

原理

Java 代码如何确保处理器执行 CAS 操作?

CAS 通过调用 JNI(JNI:Java Native Interface 为 Java 本地调用,允许 Java 调用其他语言。)的代码实现的。JVM 将 CAS 操作编译为底层提供的最有效方法。在支持 CAS 的处理器上,JVM 将它们编译为相应的机器指令;在不支持 CAS 的处理器上,JVM 将使用自旋锁。

特点

优点

一般情况下,比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程被阻塞时的等待时间。

缺点

ABA 问题

因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。

从 Java1.5 开始 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大

自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

总结

可以用 CAS 在无锁的情况下实现原子操作,但要明确应用场合,非常简单的操作且又不想引入锁可以考虑使用 CAS 操作,当想要非阻塞地完成某一操作也可以考虑 CAS。不推荐在复杂操作中引入 CAS,会使程序可读性变差,且难以测试,同时会出现 ABA 问题。

资料

results matching ""

    No results matching ""