Java并发成神之路-精通JUC并发工具十八般武艺——并发容器(六)

并发容器概念

  • ConcurrentHashMap:线程安全的HashMap
  • CopyOnWriteArrayList:线程安全的List
  • BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道
  • ConcurrentLinkedQueue:高效的非阻塞队列,使用链表实现。可以看作一个线程安全的LinkedList
  • ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查找

趣说集合类的历史——古老和过时的同步容器

  • Vector和Hashtable
    • 主要通过在方法上面添加synchronized关键字,来实现线程安全,效率很差
  • HashMap和ArrayList
    • HashMap和ArrayList是线程不安全的,不过可以通过Collections.synchronizedList(new ArrayList<E>())Collections.synchronizedMap(new HashMap<K,V>())使之变成线程安全的
    • 使用这两个方法包装之后,其实也是使用的synchronized,只不过锁的是同步代码块,效率并不会有所提升
  • ConcurrentHashMapCopyOnWriteArrayList
    • 取代同步的HashMap和同步的ArrayList

ConcurrentHashMap(重点、面试常考)

Map简介

  • HashMap:是根据key的hashCode进行存储的,键允许一个null值,value不做限制,线程非安全
  • Hashtable:功能和HashMap一致,线程安全,不过性能比较差
  • LinkedHashMap:保存记录的插入顺序
  • TreeMap:根据key进行排序,有默认排序规则,也可以自动逸排序规则

为什么HashMap是线程不安全的?

  • 同时put碰撞导致数据丢失
    • 如果两个线程同时存储,key的hashCode相同,会导致有一个的值丢失,只会存储一个
  • 同时put扩容导致数据丢失
    • 如果两个线程同时对hashMap进行了扩容,那么扩容只会的数组只有一个能被保存下来,导致另外一个的数据丢失
  • 死循环造成CPU 100%
    • 多线程的情况下会照成循环链表最终导致死循环
    • 原因分析: 点击查看

      死循环的实现代码如下所示:
      具体复现方式: 首先需要将JDk切换成1.7版本,然后按下图所示打上断点,并将断点设置为线程级别,当两个线程都走到第二各断点的时候,放过所有断点,就会复现循环链表的情况

HashMap死循环复现示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HashMapEndlessloop {
private static HashMap<Integer,String> map = new HashMap<>(2,1.5f);

public static void main(String[] args) {
map.put(5,"C");
map.put(7,"B");
map.put(3,"A");
new Thread(new Runnable() {
@Override
public void run() {
map.put(15,"D");
System.out.println(map);
}
},"Thread 1").start();

new Thread(new Runnable() {
@Override
public void run() {
map.put(1,"E");
System.out.println(map);
}
},"Thread 2").start();
}
}

HashMap分析

  • 在1.7版本的JDK中,底层数据结构是数组加链表
  • 在1.8中,当链表的长度达到一定的阈值之后,就会转变成红黑树

    红黑树

  • 每个节点要么是红色,要么是黑色,但是根节点永远是黑色
  • 红色节点不能连续(也即是,红色节点的孩子和父亲都不能是红色)
  • 从任意节点到其子树中每个叶子节点的路径都包含了数量相同的黑色节点
  • 所有的叶子节点都是黑色的

HashMap关于并发的特点

  • 非线程安全
  • 迭代时不允许修改内容
  • 只读的并发是安全的
  • 如果一定要把HashMap用在并发环境,用Collection.synchronizedMap(new HashMap<K,V>())

JDK1.7的ConcurrentHashMap实现和分析

  • Java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法
  • 每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高了并发效率
  • ConcurrentHashMap默认有16各segment,所以最多可以同时支持16个线程并发写(从中分别分布在不同segment上).这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的

JDK1.8的ConcurrentHashMap实现和分析

putValue流程

  • 判断key value不为空
  • 计算Hash值
  • 根据对应位置节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
  • 检查满足阈值就”红黑树化”
  • 返回oldVal

get流程

  • 计算hash值
  • 找到对应的位置,根据情况进行:
    • 直接取值
    • 红黑树里找值
    • 遍历链表找值
    • 返回找到的结果

为什么要把1.7的结构改成了1.8的结构

  • 数据结构(7是segment加链表的结构,8是数组加链表,链表长度达到8之后,会转为红黑树,7的并发度是根据segment的个数决定的,8是每个Node都是独立的)
  • Hash碰撞(7是直接使用链表,8是先使用链表,数量达到8之后,会转成红黑树)
  • 保证并发安全(7是使用分段锁,继承于Lock,8中是使用CAS加synchronized来保证的)
  • 查询复杂度(7查询链表是O(n),8为链表的是相同,为红黑树的时候降为O(logn))
  • 为什么超过8的时候转成红黑树?
    • 没有最开始就使用红黑树而是要后面转,是因为红黑树占用空间比链表大两倍
    • 作者有在注释中描述,链表最终的长度能达到8个概率只有千万分之一,这种概率已经很小了,为了应对这种极端情况,才会转成红黑树,但是一般链表长度根本不会到8

组合操作

  • replace (把一个值根据当前值的情况判断是否要替换)
  • putIfAbsent (如果当前key不存在值,就放进去,否则就取出来)

CopyOnWriteArrayList

诞生的历史和原因

  • 代替VectorSynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
  • VectorSynchronizedList的锁粒度太大,并发效率相对比较低,并且迭代时无法编辑
  • Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来代替同步Set

适用场景

  • 读操作可以尽可能地块,而写即使慢一些也没有太大关系
  • 读多写少: 黑名单,每日更新;监听器: 迭代操作远多余修改操作

读写规则

  • 读写锁规则的升级: 读取是完全不用加锁的,并且更厉害的是写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待
  • ArrayList在迭代的时候是不允许修改的,检测到修改会直接异常
  • CopyOnWriteArrayList可以在迭代中修改,不过迭代的依然是未修改时的内容

实现原理

  • CopyOnWrite的含义
    • 每次修改或者添加数据都会创建一个新的容器,之后再把变量的引用指向新的容器,旧的就可以回收,旧的容器也不会改变
  • 创建新副本,读写分离
    • 在写入的时候,会将原本的数组整个复制,然后将修改写入原本的数组中,最后再替换回去
  • “不可变”原理
    • 因为每次修改都是一个新的容器,所以对旧的容器来说就是不可变的
  • 迭代的时候
    • 在迭代的时候,如果原本的数组已经被修改过了,但是迭代器是不知道的,它使用的依然是最开始的时候的旧数组(可能会出现数据过期的问题)

缺点

  • 数据一致性问题: CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一直性,所以如果你希望写入的数据,马上就能读到,请不要使用CopyOnWrite容器
  • 内存占用问题: 因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存

源码分析

  • 底层是基于数组实现的
  • 添加数据的时候,如下图所示,使用Lock锁进行上锁,然后将新建一个比原本数组长1的数组,然后将原本数组的内容全部复制进去,然后将新数据保存到数据的最后一位

CopyOnWriteArrayList的add方法源码示意图

并发队列Queue(阻塞队列、非阻塞队列)

为什么要使用队列

  • 用队列可以在线程间传递数据: 生产者消费者模式,银行转账
  • 考虑锁等线程安全问题的重任从”你”转移到了”队列”上

各并发队列关系图

阻塞队列BlockingQueue

什么是阻塞队列

  • 阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能
  • 通常,阻塞队列的一端是给生产者方数据用,另一端给消费者拿数据用,阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的
  • 阻塞功能: 最具有特色的两个带有阻塞功能的方法:
    • take()方法: 获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据
    • put()方法: 插入元素,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间
  • 是否有界(容量有多大): 这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,约为2的31次方,是一个非常大的数,可以近似认为是无限容量)
  • 阻塞队列和线程池的关系: 阻塞队列是线程池的重要组成部分

ArrayBlockingQueue

  • 有界
  • 指定容量
  • 公平:还可以指定是否需要保证公平,如果想要保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗
  • 示例代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    Interviewer interviewer = new Interviewer(queue);
    Consumer consumer = new Consumer(queue);
    new Thread(interviewer).start();
    new Thread(consumer).start();
    }
    }
    class Interviewer implements Runnable {
    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue<String> queue) {
    this.queue = queue;
    }
    @Override
    public void run() {
    System.out.println("10个候选人都来了");
    try {
    for (int i = 0; i < 10; i++) {
    String candidate = "Candidate " + i;
    queue.put(candidate);
    System.out.println(candidate + " 安排好了");
    }
    queue.put("stop");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    class Consumer implements Runnable{
    BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
    this.queue = queue;
    }
    @Override
    public void run() {
    try {
    Thread.sleep(1000);
    String msg;
    while (!(msg = queue.take()).equals("stop")){
    System.out.println(msg+"面试结束----");
    }
    System.out.println("所有候选人面试结束");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    ArrayBlockingQueue的put方法源码如下,首先判断值是否为空,然后获取锁,之后判断队列是否满了,如果满了就进行等待,如果没满就把值放进去

ArrayBlockingQueue的put源码示意图

LinkedBlockingQueue

  • 无界
  • 容量Integer.MAX_VALUE
  • 内部结构:Node、两把锁
  • 由下图中可以看出,LinkedBlockingQueue中是有两把锁的,所以存放和获取是互不干扰的

LinkedBlockingQueue两把锁源码示意图

  • 从图中可以看出,存值的时候首先会判断值是否为空,然后将其封装成一个node,然后获取putLock锁
  • 再然后判断当前长度是否等于最大长度,是就进行等待,不是就开始存值操作,然后再判断存值之后的长度是否小于最大长度,是的话就唤醒其他的线程继续运行

LinkedBlockingQueue的put方法源码示意图

PriorityBlockingQueue

  • 支持优先级
  • 自然排序(而不是先进先出)
  • 无界队列
  • PriorityQueue的线程安全版本

SynchronousQueue

  • 它的容量为0
  • 需要注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它所作的就是直接传递
  • 效率很高
  • SynchronousQueue没有peek等函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量是0,所以连头结点都没有,也就没有peek方法,同理,也没有iterate相关方法
  • 是一个极好用的直接传递的并发数据结构
  • SynchronousQueue是线程池Executors.newCachedThreadPool()使用的阻塞队列

DelayQueue

  • 无界
  • 延迟队列,根据延迟时间排序
  • 元素需要实现Delayed接口,规定排序规则

非阻塞队列

  • 并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景,用的相对比较少一些
  • 看源码的offer方法的CAS思想,内有p.casNext方法,用了UNSAFE.compareAndSwapObject

如何选择适合自己的队列

  • 边界(根据业务情况判断需要的队列的大小)
  • 空间
  • 吞吐量