聊聊并发(一)——初始JUC
阅读原文时间:2021年09月30日阅读:1

1、介绍

  JDK 5.0 提供了java.util.concurrent包,在此包中增加了并发编程中很常用的使用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步IO和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文的Collection实现等。

2、内存可见性

  内存可见性是指当某个线程正在使用对象状态而另一线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。
  可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能实时的看到其他线程写入之后的值,有时甚至是根本不可能的事情。
  我们可以通过同步来保证对象被安全的发布。除此之外我们也可以使用一种更加轻量级的volatile变量。
  内存可见性问题:当多个线程同时操作共享数据时,对共享数据的操作彼此是不可见的。
  代码示例:内存可见性问题

1 public class VolatileDemo implements Runnable {
2
3 private boolean flag = false;
4
5 @Override
6 public void run() {
7
8 try {
9 Thread.sleep(200);
10 } catch (InterruptedException e) {
11 }
12
13 flag = true;
14 System.out.println("flag = " + isFlag());
15 }
16
17 public boolean isFlag() {
18 return flag;
19 }
20
21 public void setFlag(boolean flag) {
22 this.flag = flag;
23 }
24
25 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 VolatileDemo demo = new VolatileDemo();
5 new Thread(demo).start();
6
7 while (true) {
8 if (demo.isFlag()) {
9 System.out.println("this is main");
10 break;
11 }
12 }
13 }
14 }
15
16 // 结果1
17 flag = true
18 this is main
19 // 程序结束
20
21 // 结果2
22 flag = true
23 // 程序死循环

  问题:结果1不难理解,当线程执行完毕后,主线程才开始执行while。为什么结果2是死循环呢?
  原因:JVM为每一个执行任务的线程,它都会分配一个独立的工作内存用于提高效率。每次都会从主存中读取变量的副本到各自的工作内存中,修改后,再写回主存中。
  那么,不难理解结果2:主线程从主存读取flag = false,因为用的while循环,while属于底层的东西,执行速度非常快,没有再读主存的机会,一直读取的是自己的工作内存(flag = false)。而当线程1读到flag并修改为true,回写到主存时,主线程并不知道,所以死循环。

  解决:知道问题原因了,如何解决呢?
  代码示例:方式一、加锁

1 // 方式一
2 public class Main {
3 public static void main(String[] args) {
4 VolatileDemo demo = new VolatileDemo();
5 new Thread(demo).start();
6
7 while (true) {
8 synchronized (demo) {
9 if (demo.isFlag()) {
10 System.out.println("this is main");
11 break;
12 }
13 }
14 }
15 }
16 }

  分析:synchronize加锁可以解决。加了锁,就可以让while循环每次都从主存中去读取数据,这样就能读取到true了。但是加锁效率极低。每次只能有一个线程访问,当一个线程持有锁时,其他线程就会阻塞,效率就非常低了。不想加锁,又要解决内存可见性问题,那么就可以使用volatile关键字。
  代码示例:方式二、用volatile修饰

1 private volatile boolean flag = false;

3、volatile关键字

  Java提供了一种稍弱的同步机制——volatile关键字,当多个线程访问共享数据时,可以保证内存可见性,即内存中的数据可见。用这个关键字修饰共享数据,就会及时的把线程工作内存中的数据刷新到主存中去,也可以理解为,就是直接操作主存中的数据。
  可以将volatile看做一个轻量级的锁,相较于synchronized是一种轻量级的同步策略。与锁(synchronize)的区别:
  volatile不具备互斥性。即一个线程访问共享数据,另一个线程依然可以访问。所有的访问都在主存中完成,保证内存可见性。
  synchronized具备互斥性。即一个线程抢到锁,另一个线程进不来,必须等待。
  volatile不能保证变量的原子性。

1、原子性

  所谓原子性就是一组操作不可再细分。要么全都做,要么全都不做。前面提到volatile不能保证变量的原子性,具体表现如下:
  代码示例:原子性问题

1 public class AtomicDemo implements Runnable {
2
3 // 此时是加了volatile语义的
4 private volatile int i = 0;
5
6 @Override
7 public void run() {
8
9 try {
10 Thread.sleep(1000);
11 } catch (InterruptedException e) {
12 }
13
14 System.out.println(getI());
15 }
16
17 public int getI() {
18 return i++;
19 }
20
21 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 AtomicDemo atomicDemo = new AtomicDemo();
5
6 // 开启 10 个线程对共享数据进行自增后打印。
7 for (int i = 0; i < 10; i++) {
8 new Thread(atomicDemo).start();
9 }
10 }
11 }
12
13 // 可能的一种结果
14 0
15 5
16 4
17 3
18 2
19 1
20 0
21 6
22 6
23 7

  问题:期望结果应该每个线程对 i 自增一次,最终 i 的值为10。实际结果如上(有重复数据)。
  原因:i++操作不是一个原子性操作,实际分为读改写三步,如下:

  int temp = i; // 从主存中读
  i = i + 1; // cpu 对 i 进行+1运算
  i = temp; // 写回主存

  而volatile不能保证变量的原子性。volatile,只是相当于所有线程都是在主存中操作数据而已,并不具备互斥性。比如两个线程同时读取主存中的0,然后又同时自增,同时写入主存,结果还是会出现重复数据。volatile的不具备互斥性也导致了它不具备原子性。
  解决:知道问题原因了,如何解决呢?
  代码示例:方式一、加锁

1 public synchronized int getI() {
2 return i++;
3 }

  代码示例:方式二、原子变量

1 private AtomicInteger i = new AtomicInteger();
2 public int getI() {
3 return i.getAndIncrement();
4 }

2、原子变量

  JDK 1.5 以后java.util.concurrent.atomic包下提供了常用的原子变量。这些原子变量具备以下特点:volatile的内存可见性;CAS算法保证数据的原子性。
  类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将volatile值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。
  类AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference的实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。
  AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供volatile访问语义方面也引人注目,这对于普通数组来说是不受支持的。
  核心方法:boolean compareAndSet(int expectedValue, int updateValue)
  java.util.concurrent.atomic包下提供了一些原子操作的常用类:

  AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference
  AtomicIntegerArray、AtomicLongArray
  AtomicMarkableReference
  AtomicReferenceArray
  AtomicStampedReference

3、CAS算法

  CAS(Compare and Swap)是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问,是硬件对于并发操作共享数据的支持。
  CAS是一种无锁的非阻塞算法的实现。不存在上下文切换的问题。
  CAS包含了3个操作数:内存值V,比较值A,更新值B。当且仅当V == A时,V = B,否则不执行任何操作。
  CAS算法:当多个线程并发的对主存中的数据进行修改的时候。有且只有一个线程会成功,其他的都会失败。(同时操作,只是会失败而已,并不会被锁之类的)。
  CAS比普通同步锁效率高,原因:CAS算法当这一次不成功的时候,它下一次不会阻塞,也就是它不会放弃CPU的执行权,它可以立即再次尝试,再去更新。
  代码示例:模拟CAS算法

1 // 模拟CAS算法
2 public class CompareAndSwap {
3 private int value;
4
5 // 获取内存值
6 public synchronized int get() {
7 return value;
8 }
9
10 // 比较
11 public synchronized int compareAndSwap(int expectedValue, int newValue) {
12 int oldValue = value;
13
14 if (oldValue == expectedValue) {
15 this.value = newValue;
16 }
17
18 return oldValue;
19 }
20
21 // 设置
22 public synchronized boolean compareAndSet(int expectedValue, int newValue) {
23 return expectedValue == compareAndSwap(expectedValue, newValue);
24 }
25 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 final CompareAndSwap cas = new CompareAndSwap();
5
6 for (int i = 0; i < 10; i++) { 7 new Thread(() -> {
8
9 int expectedValue = cas.get();
10 boolean b = cas.compareAndSet(expectedValue, (int) (Math.random() * 101));
11
12 System.out.println(b);
13
14 }).start();
15 }
16
17 }
18 }

1、concurrentHashMap

  JDK 1.5之后,在java.util.concurrent包中提供了多种并发容器类来改进同步容器类的性能。其中最主要的就是ConcurrentHashMap,采用"锁分段"机制。
  HashMap是线程不安全的;Hashtable 加了锁,是线程安全的,因此它效率低。Hashtable 加锁就是将整个hash表锁起来,当有多个线程访问时,同一时间只能有一个线程访问,并行变成串行,因此效率低。
  ConcurrentHashMap是一个线程安全的hash表。对于多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用"锁分段"机制替代 Hashtable 的独占锁,进而提高性能。

  每个段都是一个独立的锁。JDK 1.8 以后concurrentHashMap的锁分段被取消了。采用的是CAS算法。
  此包还提供了设计用于多线程上下文中的 Collection 实现:

  ConcurrentHashMap
  ConcurrentSkipListMap
  ConcurrentSkipListSet
  CopyOnWriteArrayList
  CopyOnWriteArraySet

  当期望多线程访问一个给定 collection 时,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的 ArrayList。

2、CopyOnWriteArrayList

  代码示例:CopyOnWriteArrayList

1 // 不写注释也能看懂的代码
2 public class CopyOnWriteArrayListDemo implements Runnable {
3
4 private static final List list = Collections.synchronizedList(new ArrayList<>());
5 //private static final CopyOnWriteArrayList list = new CopyOnWriteArrayList<>();
6
7 static {
8 list.add("AA");
9 list.add("BB");
10 list.add("CC");
11 }
12
13 @Override
14 public void run() {
15 Iterator it = list.iterator();
16 while (it.hasNext()) {
17 System.out.println(it.next());
18
19 list.add("AA");
20 }
21 }
22 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 CopyOnWriteArrayListDemo ht = new CopyOnWriteArrayListDemo();
5
6 new Thread(ht).start();
7 }
8 }
9
10 // 结果(有并发修改异常)
11 AA
12 Exception in thread "Thread-0" java.util.ConcurrentModificationException

  如果用CopyOnWriteArrayList,则不会有异常。
  CopyOnWriteArrayList:写入并复制,添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。

1、介绍

  java.util.concurrent包中提供了多种并发容器类来改进同步容器的性能。CountDownLatch是一个同步辅助类,在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行,这就叫闭锁。

2、案例

  代码示例:计算10个线程打印偶数的时间

1 // 不写注释也能看懂的代码
2 public class CountDownLatchDemo implements Runnable {
3
4 private final CountDownLatch latch;
5
6 public CountDownLatchDemo(CountDownLatch latch) {
7 this.latch = latch;
8 }
9
10 @Override
11 public void run() {
12 try {
13 for (int i = 0; i < 50000; i++) {
14 if (i % 2 == 0) {
15 System.out.println(i);
16 }
17 }
18 } finally {
19 // 完成一个线程,计数 -1
20 latch.countDown();
21 }
22 }
23
24 }

1 // 测试类
2 public class Main {
3
4 public static void main(String[] args) {
5 final CountDownLatch latch = new CountDownLatch(10);
6 CountDownLatchDemo ld = new CountDownLatchDemo(latch);
7
8 long start = System.currentTimeMillis();
9
10 for (int i = 0; i < 10; i++) {
11 new Thread(ld).start();
12 }
13
14 try {
15 // 等待10个线程都执行完
16 latch.await();
17 } catch (InterruptedException e) {
18 }
19
20 long end = System.currentTimeMillis();
21
22 System.out.println("耗费时间为:" + (end - start));
23 }
24
25 }

  Callable和Runable的区别是,Callable带泛型,其call方法有返回值。使用的时候,需要用FutureTask来接收返回值。而且它也要等到线程执行完调用get方法才会执行,也可以用于闭锁操作。
  代码示例:

1 // 不写注释也能看懂的代码
2 public class CallableDemo implements Callable {
3
4 @Override
5 public Integer call() throws Exception {
6 int sum = 0;
7
8 for (int i = 0; i <= 100; i++) {
9 sum += i;
10 }
11
12 return sum;
13 }
14 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 CallableDemo demo = new CallableDemo();
5
6 //执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
7 FutureTask result = new FutureTask<>(demo);
8 new Thread(result).start();
9
10 try {
11 // get()方法是阻塞的
12 Integer sum = result.get();
13 System.out.println(sum);
14
15 System.out.println("--------表明 get()方法是阻塞的-------------");
16 } catch (InterruptedException | ExecutionException e) {
17 e.printStackTrace();
18 }
19 }
20 }

  在JDK1.5之前,解决多线程安全问题用sychronized隐式锁:同步代码块;同步方法。
  在JDK1.5之后,出现了更加灵活的方式Lock显式锁:同步锁。
  Lock需要通过lock()方法上锁,通过unlock()方法释放锁。为了保证锁能释放,所有unlock方法一般放在finally中去执行。
  代码示例:卖票问题

1 // 不写注释也能看懂的代码
2 public class LockDemo implements Runnable {
3
4 private int tick = 100;
5
6 private final Lock lock = new ReentrantLock();
7
8 @Override
9 public void run() {
10 while (true) {
11 //上锁
12 lock.lock();
13
14 try {
15 if (tick > 0) {
16
17 try {
18 Thread.sleep(200);
19 } catch (InterruptedException e) {
20 }
21
22 System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + --tick);
23 } else {
24 break;
25 }
26 } finally {
27 //释放锁
28 lock.unlock();
29 }
30 }
31 }
32 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 LockDemo ticket = new LockDemo();
5
6 // 开了3个窗口卖票
7 new Thread(ticket, "1号窗口").start();
8 new Thread(ticket, "2号窗口").start();
9 new Thread(ticket, "3号窗口").start();
10 }
11 }
12
13 // 可能的结果.这里只出最后10张票的结果值
14 2号窗口 完成售票,余票为:9
15 2号窗口 完成售票,余票为:8
16 3号窗口 完成售票,余票为:7
17 3号窗口 完成售票,余票为:6
18 3号窗口 完成售票,余票为:5
19 3号窗口 完成售票,余票为:4
20 3号窗口 完成售票,余票为:3
21 3号窗口 完成售票,余票为:2
22 1号窗口 完成售票,余票为:1
23 2号窗口 完成售票,余票为:0

  多个线程并发读数据,是不会出现问题。但是,多个线程并发写数据,到底是写入哪个线程的数据呢?所以,写写/读写需要互斥,读读不需要互斥。这个时候可以用读写锁来提高效率。
  ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。
  读锁,可以多个线程并发的持有。
  写锁,是独占的。
  源码示例:读写锁

1 public interface ReadWriteLock {
2 // 返回一个读锁
3 Lock readLock();
4
5 // 返回一个写锁
6 Lock writeLock();
7 }

  代码示例:

1 public class ReadWriteLockDemo {
2
3 private int number = 0;
4
5 private final ReadWriteLock lock = new ReentrantReadWriteLock();
6
7 // 读.可以多个线程并发读
8 public void read() {
9 // 上读锁
10 lock.readLock().lock();
11
12 try {
13 System.out.println(Thread.currentThread().getName() + " : " + number);
14 } finally {
15 // 释放读锁
16 lock.readLock().unlock();
17 }
18 }
19
20 // 写.一次只能有一个线程操作
21 public void write(int number) {
22 // 上写锁
23 lock.writeLock().lock();
24
25 try {
26 System.out.println(Thread.currentThread().getName());
27 this.number = number;
28 } finally {
29 // 释放写锁
30 lock.writeLock().unlock();
31 }
32 }
33 }

1 // 测试类
2 public class Main {
3 public static void main(String[] args) {
4 ReadWriteLockDemo rw = new ReadWriteLockDemo();
5
6 // 开启 1 个线程写
7 new Thread(new Runnable() {
8 @Override
9 public void run() {
10 rw.write((int) (Math.random() * 101));
11 }
12 }, "Write:").start();
13
14 // 开启 100 个线程读
15 for (int i = 0; i < 100; i++) {
16 new Thread(new Runnable() {
17
18 @Override
19 public void run() {
20 rw.read();
21 }
22 }).start();
23 }
24 }
25 }