生成式AI领域的最新成果都在这里!抢 QCon 展区门票 了解详情
写点什么

聊聊并发(六)——ConcurrentLinkedQueue 的实现原理分析

  • 2013-01-09
  • 本文字数:4051 字

    阅读完需:约 13 分钟

1. 引言

在并发编程中我们有时候需要使用线程安全的队列。如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环 CAS 的方式来实现,本文让我们一起来研究下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上我们能学到不少并发编程的技巧。

2. ConcurrentLinkedQueue 的介绍

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在 Michael & Scott 算法上进行了一些修改, Michael & Scott 算法的详细信息可以参见参考资料一

3. ConcurrentLinkedQueue 的结构

我们通过 ConcurrentLinkedQueue 的类图来分析一下它的结构。

(图 1)

ConcurrentLinkedQueue 由 head 节点和 tair 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用 (next) 组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列。默认情况下 head 节点存储的元素为空,tair 节点等于 head 节点。

复制代码
private transient volatile Node<E> tail = head;

4. 入队列

入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及 head 节点和 tair 节点的变化,每添加一个节点我就做了一个队列的快照图。

(图二)

  • 第一步添加元素 1。队列更新 head 节点的 next 节点为元素 1 节点。又因为 tail 节点默认情况下等于 head 节点,所以它们的 next 节点都指向元素 1 节点。
  • 第二步添加元素 2。队列首先设置元素 1 节点的 next 节点为元素 2 节点,然后更新 tail 节点指向元素 2 节点。
  • 第三步添加元素 3,设置 tail 节点的 next 节点为元素 3 节点。
  • 第四步添加元素 4,设置元素 3 的 next 节点为元素 4 节点,然后将 tail 节点指向元素 4 节点。

通过 debug 入队过程并观察 head 节点和 tail 节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。

上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用 CAS 算法来入队的。

复制代码
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
Node<E> n = new Node<E>(e);
retry:
// 死循环,入队不成功反复入队。
for (;;) {
// 创建一个指向 tail 节点的引用
Node<E> t = tail;
//p 用来表示队列的尾节点,默认情况下等于 tail 节点。
Node<E> p = t;
for (int hops = 0; ; hops++) {
// 获得 p 节点的下一个节点。
Node<E> next = succ(p);
//next 节点不为空,说明 p 不是尾节点,需要更新 p 后在将它指向 next 节点
if (next != null) {
// 循环了两次及其以上,并且当前节点还是不等于尾节点
if (hops > HOPS && t != tail)
continue retry;
p = next;
}
// 如果 p 是尾节点,则设置 p 节点的 next 节点为入队节点。
else if (p.casNext(null, n)) {
// 如果 tail 节点有大于等于 1next 节点,则将入队节点设置成 tair 节点,更新失败了也
没关系,因为失败了表示有其他线程成功更新了 tair 节点。
if (hops >= HOPS)
casTail(t, n); // 更新 tail 节点,允许失败
return true;
}
// p 有 next 节点, 表示 p 的 next 节点是尾节点,则重新设置 p 节点
else {
p = succ(p);
}
}
}
}

从源代码角度来看整个入队过程主要做二件事情。第一是定位出尾节点,第二是使用 CAS 算法能将入队节点设置成尾节点的 next 节点,如不成功则重试。

第一步定位尾节点。tail 节点并不总是尾节点,所以每次入队都必须先通过 tail 节点来找到尾节点,尾节点可能就是 tail 节点,也可能是 tail 节点的 next 节点。代码中循环体中的第一个 if 就是判断 tail 是否有 next 节点,有则表示 next 节点可能是尾节点。获取 tail 节点的 next 节点需要注意的是 p 节点等于 p 的 next 节点的情况,只有一种可能就是 p 节点和 p 的 next 节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回 head 节点。获取 p 节点的 next 节点代码如下

复制代码
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) ? head : next;
}

第二步设置入队节点为尾节点。p.casNext(null, n) 方法用于将入队节点设置为当前队列尾节点的 next 节点,p 如果是 null 表示 p 是当前队列的尾节点,如果不为 null 表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

hops 的设计意图。上面分析过对于先进先出的队列入队所要做的事情就是将入队节点设置成尾节点,doug lea 写的代码和逻辑还是稍微有点复杂。那么我用以下方式来实现行不行?

复制代码
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}

让 tail 节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环 CAS 更新 tail 节点。如果能减少 CAS 更新 tail 节点的次数,就能提高入队的效率,所以 doug lea 使用 hops 变量来控制并减少 tail 节点的更新频率,并不是每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节点和尾节点的距离大于等于常量 HOPS 的值(默认等于 1)时才更新 tail 节点,tail 和尾节点的距离越长使用 CAS 更新 tail 节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对 volatile 变量的读操作来减少了对 volatile 变量的写操作,而对 volatile 变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

复制代码
private static final int HOPS = 1;

还有一点需要注意的是入队方法永远返回 true,所以不要通过返回值判断入队是否成功。

5. 出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下 head 节点的变化。

从上图可知,并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。这种做法也是通过 hops 变量来减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。让我们再通过源码来深入分析下出队过程。

复制代码
public E poll() {
Node<E> h = head;
// p 表示头节点,需要出队的节点
Node<E> p = h;
for (int hops = 0;; hops++) {
// 获取 p 节点的元素
E item = p.getItem();
// 如果 p 节点的元素不为空,使用 CAS 设置 p 节点引用的元素为 null, 如果成功则返回 p 节点的元素。
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
// 将 p 节点下一个节点设置成 head 节点
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取 p 节点的下一个节点
Node<> next = succ(p);
// 如果 p 的下一个节点也为空,说明这个队列已经空了
if (next == null) {
// 更新头节点。
updateHead(h, p);
break;
}
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p = next;
}
return null;
}

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用 CAS 的方式将头节点的引用设置成 null,如果 CAS 成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了 head 节点,导致元素发生了变化,需要重新获取头节点。

6. 参考资料

作者介绍

方腾飞,花名清英,淘宝资深开发工程师,关注并发编程,目前在广告技术部从事无线广告联盟的开发和设计工作。个人博客: http://ifeve.com 微博: http://weibo.com/kirals 欢迎通过我的微博进行技术交流。


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2013-01-09 05:0932377

评论 3 条评论

发布
用户头像
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。/r/n
改成如下:是p的next不是p /r/n
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p的next如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。 /r/n
另外图片有问题,老铁修改下啊。/r/n
不支持换行啊。
展开
2018-11-14 17:17
回复
用户头像
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
改成如下:是p的next不是p
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p的next如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
另外图片有问题,老铁修改下啊。
2018-11-14 17:15
回复
用户头像
图有问题吧
2018-11-08 19:50
回复
没有更多了
发现更多内容

REST API 安全基础知识:保护你的应用程序和用户数据

Apifox

API 企业安全 REST API 安全认证 API 安全

开心档之C++ 多线程

雪奈椰子

解锁企业数据管理的利器——DataOps

数造万象

REST 与 SOAP 之间的差异

Apifox

Rest 协议 soap REST API SOAP Webservice

Nautilus Chain 上首个 DEX PoseiSwap 通证经济学模型解析

威廉META

C++ 循环

雪奈椰子

开心档

WeOpsV3.16持续拓展云平台能力,监管华为ManageOne云平台

嘉为蓝鲸

运维 weops

中台的下一站:行业数字化操作系统

创智荟

中台 操作系统 数字化

深度学习:理解卷积神经网络(CNN)的原理和应用

兴科Sinco

人工智能 神经网络 深度学习 自然语言 图象识别

如何打开 plist 文件

ios 开发 IPA上传

北京 Meetup 邀你来|云上 StarRocks 极速湖仓

StarRocks

数据库 活动 OLAP 大数据分析 StarRocks

集简云已支持GPT-4 API接口,将最新AI模型接入到您的业务流程中

集简云开放平台

人工智能 ChatGPT

华为云FusionInsight引领现代数据平台革新:助力企业数字化转型与增值

科技怪授

C++ 修饰符类型

雪奈椰子

iOS上架 IPA上传

基于低代码开发平台打造新时代OA系统

力软低代码开发平台

一文吃透低代码平台源代码交付的重要性

这我可不懂

软件开发 低代码 JNPF

持续领先同行?看华为云EI如何助力企业智能化转型

爱尚科技

达观助手智能写作,让写作更快更好更有趣!

NLP资深玩家

“云智一体”进化史

百度开发者中心

人工智能 云智一体 文心一言

The era of Wallys/wifi7 has arrived-ipq9574+qcn9274.

Cindy-wallys

qcn9274 ipq9574

Wallys/The IPQ9554+qcn6274 support the new WiFi 7 standard

Cindy-wallys

ipq9554 qcn6274

报名开启!成都首个ChatGPT和大模型专题研讨会,期待您的参与!

NLP资深玩家

OctConv:八度卷积复现

华为云开发者联盟

人工智能 华为云 卷积 华为云开发者联盟 企业号 4 月 PK 榜

Dapr和Rainbond集成,实现云原生BaaS和模块化微服务开发

北京好雨科技有限公司

云原生 #Kubernetes# Baas rainbond 企业号 4 月 PK 榜

华为云EI:引领企业智能化转型,助力全球行业创新

爱尚科技

plist 文件是什么

雪奈椰子

ios 开发 IPA上传

业务导向且支持开发过程的测试

测吧(北京)科技有限公司

测试

什么是plist

雪奈椰子

ios 开发 IPA上传

嘉为蓝鲸携手腾讯云亮相石油石化峰会!

嘉为蓝鲸

能源 嘉为蓝鲸 研运一体化

如何使用 Postman 发送 JSON 数据

Liam

json Postman 接口测试 API API 调试

百度智能云助力达拉特旗入选“数字城市创新成果与实践案例”

百度开发者中心

人工智能 智慧城市 云智一体

聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析_Java_方腾飞_InfoQ精选文章