ConcurrentLinkedQueue充分使用了atomic包的实现打造了一个无锁得并发线程安全的队列。对比锁机制的实现,个人认为使用无锁机 制的难点在于要充分考虑线程间的协调。简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够 检测并帮助完成剩下的操作。这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。上述的难点在此次分 析的并发Queue的实现中有很好的说明。首先看看其部分源码:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static final long serialVersionUID = 196745693267521676L;
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final
AtomicReferenceFieldUpdater<Node, Node>
nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
private static final
AtomicReferenceFieldUpdater<Node, Object>
itemUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "item");
Node(E x) { item = x; }
Node(E x, Node<E> n) { item = x; next = n; }
E getItem() {
return item;
}
boolean casItem(E cmp, E val) {
return itemUpdater.compareAndSet(this, cmp, val);
}
void setItem(E val) {
itemUpdater.set(this, val);
}
Node<E> getNext() {
return next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
void setNext(Node<E> val) {
nextUpdater.set(this, val);
}
}
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "tail");
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
headUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "head");
private boolean casTail(Node<E> cmp, Node<E> val) {
return tailUpdater.compareAndSet(this, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return headUpdater.compareAndSet(this, cmp, val);
}
private transient volatile Node<E> head = new Node<E>(null, null);
private transient volatile Node<E> tail = head;
...
}
先看看其内部数据结构Node的实现。由于使用了原子字段更新器AtomicReferenceFieldUpdater<T,V>(其中T 表示持有字段的类的类型,V表示字段的类型),所以其对应的需要更新的字段要使用volatile进行声明。其 newUpdater(Class<U> tclass, Class<W> vclass, String fieldName)方法实例化一个指定字段的更新器,参数分别表示:持有需要更新字段的类,字段的类,要更新的字段的名称。Node的内部变量 item,next分别有对应自己的字段更新器,并且包含了对其原子性操作的方法compareAndSet(T
obj, V expect, V update),其中T是持有被设置字段的对象,后两者分别是期望值和新值。
对于ConcurrentLinkedQueue自身也有两个volatile的线程共享变量:head,tail分别对应队列的头指针和尾指针。要保证 这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的 原子性要被保证。下面看看其对应的方法是如何完成的。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = tail;
Node<E> s = t.getNext();
if (t == tail) { //------------------------------a
if (s == null) { //---------------------------b
if (t.casNext(s, n)) { //-------------------c
casTail(t, n); //------------------------d
return true;
}
} else {
casTail(t, s); //----------------------------e
}
}
}
}
offer()方法都很熟悉了,就是入队的操作。涉及到改变尾指针的操作,所以要看这个方法实现是否保证了原子性。CAS操作配合循环是原子性操作的保 证,这里也不例外。此方法的循环内首先获得尾指针和其next指向的对象,由于tail和Node的next均是volatile的,所以保证了获得的分 别都是最新的值。
代码a:t==tail是最上层的协调,如果其他线程改变了tail的引用,则说明现在获得不是最新的尾指针需要重新循环获得最新的值。
代码b:s==null的判断。静止状态下tail的next一定是指向null的,但是多线程下的另一个状态就是中间态:tail的指向没有改变,但是其next已经指向新的结点,即完成tail引用改变前的状态,这时候s!=null。这里就是协调的典型应用,直接进入
代码e去协调参与中间态的线程去完成最后的更新,然后重新循环获得新的tail开始自己的新一次的入队尝试。另外值得注意的是a,b之间,其他的线程可能会改变tail的指向,使得协调的操作失败。从这个步骤可以看到无锁实现的复杂性。
代码c:t.casNext(s, n)是入队的第一步,因为入队需要两步:更新Node的next,改变tail的指向。代码c之前可能发生tail引用指向的改变或者进入更新的中间态, 这两种情况均会使得t指向的元素的next属性被原子的改变,不再指向null。这时代码c操作失败,重新进入循环。
代码d:这是完成更新的最后一步了,就是更新tail的指向,最有 意思的协调在这儿又有了体现。从代码看casTail(t, n)不管是否成功都会接着返回true标志着更新的成功。首先如果成功则表明本线程完成了两步的更新,返回true是理所当然的;如果 casTail(t, n)不成功呢?要清楚的是完成代码c则代表着更新进入了中间态,代码d不成功则是tail的指向被其他线程改变。意味着对于其他的线程而言:它们得到的是 中间态的更新,s!=null,进入
代码e帮助本线程执行最后一步并且先于本线程成功。这样本线程虽然代码d失败了,但是是由于别的线程的协助先完成了,所以返回true也就理所当然了。
通过分析这个入队的操作,可以清晰的看到无锁实现的每个步骤和状态下多线程之间的协调和工作。理解了入队的整个过程,出队的操作poll()的实现也就变 得简单了。基本上是大同小异的,无非就是同时牵涉到了head和tail的状态,在改变head的同时照顾到tail的协调,在此不多赘述。下面介绍一下 其无锁下的查看访问,其内部不单单是查看更包含了线程间的协调,这是无锁实现的一个特点。不管是contains(),size()还是 isEmpty(),只要获得了head后面第一个最新的Node就可以很轻松的实现,毕竟Node的getNext()和getItem()返回的都是
对应的最新值。所以先看看这些方法内部的first()如何获得最新的第一个Node:
Node<E> first() {
for (;;) {
Node<E> h = head;
Node<E> t = tail;
Node<E> first = h.getNext();
if (h == head) { //---------------------------------------a
if (h == t) { //-----------------------------------------b
if (first == null) //----------------------------------c
return null;
else
casTail(t, first); //--------------------------------d
} else {
if (first.getItem() != null) //------------------------e
return first;
else // remove deleted node and continue
casHead(h, first); //------------------------------f
}
}
}
}
此方法在尝试获得最新的第一个非head结点的时候,在不同的阶段同样在协调着head和tail的更新任务,让人感觉无锁的世界没有纯粹的工作,呵呵。
代码a:还是最上层的协调,head指向没改变的情况下才继续下面的操作。这时侯head只可能是静止的,因为poll()出队操作的步骤是反着的:首先更新head的指向进入中间态,然后更新原head的next的item为null。
代码b:之所以h==t的情况独立于其他的情况(在出队 poll()方法中同样),主要是因为first!=null时可能对应着某一个更新的中间态,而产生中间态的的必要条件就是代码b成立。如果h==t则 表示当前线程获得的首尾指针指向同一个结点,当然代码b执行之后可能其他线程会进行head或者tail的更新。
代码c:first==null表明tail并没有进入更新的中间 态而是处于静止状态,并且由于tail指向的是head的指向,所以返回null是唯一的选择。但是这美好的一切都是建立在代码b和代码c之间没有其他的 线程更新tail。一旦有其他的线程执行了入队的操作并至少进入中间态的话,h==t和first==null都遗憾的成立,这就造成了取得幻象值,而实 际上h.getNext()已经不再为null。
个人认为代码c改成if((first
= h.getNext()) == null)更能提高命中率。
代码d:只要first!=null(不管是本人修改的代码还是源码)本线程则去尝试协调其他的线程先完成tail的更新,等待循环再次获取最新的head和tail。
代码e:此处first一定不为null,tail更新与否不影响 first的item的获取,但是head的更新会有影响。如果head正在被另一个线程更新并进入中间态,既是poll()内的else if (casHead(h, first)) 成功,但是并没有执行first.setItem(null)之前。此时代码e是满足的,返回的也是当前的first的,但是随后head全部更新成功则 first的item为null。所以此处返回的first的item并不一定是item!=null的结点,在使用此方法获得的结点的item时一定要
再次的进行判断,这点在contains(...)等方法内都有体现。
代码f:如果first的item==null,则更新head的 指向。直观上看似乎多余,因为出队的操作是先更新head的指向再更新item为null的。但是另一个方法remove(...)则仅仅更新item的 值而不改变head的指向,所以针对这样的多线程调用,代码f变得非常的必需了。
这样通过这两个方法的分析可以推及对ConcurrentLinkedQueue共享变量的其他操作的实现,这样的无锁的实现印象最深的就是要考虑线程间 的协调。不像锁机制的实现虽然牺牲了一定的性能,但是至少操作这些非线程安全的共享变量时不用过多的考虑其他线程的操作。至此才算体会到无锁实现的复杂 性,这或许就是有得必有失吧,呵呵。
分享到:
相关推荐
Java 多线程与并发(15_26)-JUC集合_ ConcurrentLinkedQueue详解
ConcurrentLinkedQueue源码分析
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized 5.彻底理解volatile 6.你以为你真的了解final吗...
ConcurrentLinkedQueue
2、再考虑多线程时候的offer: · 多个线程offer · 部分线程offer,部分线程poll · offer比poll快 · poll比offer快 offer public boolean offer(E e) { checkNotNull(e); // 新建一个node ...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
并发集合:JUC提供了一些线程安全的集合类,如ConcurrentHashMap、ConcurrentLinkedQueue等,可以在多线程环境下安全地访问和修改集合。 原子操作:JUC提供了一些原子操作类,如AtomicInteger、AtomicLong等,可以...
LinkedBlockingQueue 首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: ...
自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)
主要介绍了Java concurrency集合之ConcurrentLinkedQueue,需要的朋友可以参考下
volatile关键字的非原子性、volatile关键字的使用、AtomicInteger原子性操作、线程安全小例子:多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步...
- **`CopyOnWriteArrayList`** : 线程安全的 `List`,在读多写少的场合性能非常好,远远好于 `Vector`。 - **`ConcurrentLinkedQueue`** : 高效的并发队列,使用链表实现。可以看做一个线程安全的 `LinkedList`,这...
聊聊并发(6)ConcurrentLinkedQueue的实现原理分析Java开发Java经验技巧共18页.pdf.zip
ConcurrentLinkedQueue 提供高效的多线程访问。 队列数据结构提供高效的轮询和提供操作以快速添加/获取资源。 为了防止线程冲突,使用了公平的 ReentrantReadWriteLock。 公平意味着等待线程根据它们等待的时间进行...
│ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细介绍与知识点总结.mp4 │ 高并发编程第二阶段05讲、一个解释volatile关键字作用最好的例子.mp4 │ 高并发编程第二阶段06讲、Java内存模型以及CPU缓存不一致...
│ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细介绍与知识点总结.mp4 │ 高并发编程第二阶段05讲、一个解释volatile关键字作用最好的例子.mp4 │ 高并发编程第二阶段06讲、Java内存模型以及CPU缓存不一致...
我们将详细介绍 JUC 提供的线程安全集合类,如 ConcurrentHashMap 和 ConcurrentLinkedQueue,以及它们在实际应用中的用法。 通过这份资源,您将获得全面的 Java 并发编程知识,从基础概念到高级应用,从工具使用到...
一个简单的jdbc连接池,考虑到线程安全,这里使用了ConcurrentLinkedQueue,新手入门,简单易懂
随着项目的不断迭代,加上产品经理大法(这里加一个弹窗提示,...为了防止多个线程同时操作DialogManager中的queue对象,所以我们采用线程安全的ConcurrentLinkedQueue,这里简单的介绍下ConcurrentLinkedQueue实现和数
了解多线程所带来的安全风险.mp4 从线程的优先级看饥饿问题.mp4 从Java字节码的角度看线程安全性问题.mp4 synchronized保证线程安全的原理(理论层面).mp4 synchronized保证线程安全的原理(jvm层面).mp4 单例问题...