该文所涉及的 netty 源码版本为 4.1.6。
在 Netty 的核心中的核心成员 NioEventLoop 中,其中任务队列的实现 taskQueue 便是 MpscLinkedQueue。MpscLinkedQueue 是 Netty 所实现的一个基于多生产者单消费者的无锁队列,针对 NioEventLoop 中任务队列的特点,其单消费者的场景在一开始就避免了从队列中取数据时加锁的必要,而其最精妙的地方便是在多生产者并发从队列中添加数据的时候也没有加锁,达到 Netty 所期望的高性能实现。这是如何实现的?
首先,MpscLinkedQueue 继承自 AtomicReference,也就是说 MpscLinkedQueue 通过继承自 AtomicReference 的方式,显式地维护了一个提供原子读写能力的变量 value。而在 MpscLinkedQueue 中,这个 value 是其内部维护的队列的尾结点。
而后,来看 MpscLinkedQueue 的构造方法。
MpscLinkedQueue() {
MpscLinkedQueueNode<E> tombstone = new DefaultNode<E>(null);
headRef = new FullyPaddedReference<MpscLinkedQueueNode<E>>();
headRef.set(tombstone);
setTail(tombstone);
}
在 MpscLinkedQueue 中,维护着 headRef 头结点字段,其队列内部节点的实现是一个 MpscLinkedQueueNode。MpscLinkedQueueNode 是一个除了存放具体队列元素外只有 next 字段的节点,也就是说,MpscLinkedQueue 的队列是单向的。在构造方法的最后,通过 setTail()方法的,将 MpscLinkedQueue 的尾结点字段 value 也设置为头结点。MpscLinkedQueue 的头结点字段 headRef 的存在可以方便后续直接从头结点开始的队列操作,消费者可以简单判断头尾节点是否相等来确认队列中是否有元素可以消费。
@Override
@SuppressWarnings("unchecked")
public boolean offer(E value) {
if (value == null) {
throw new NullPointerException("value");
}
final MpscLinkedQueueNode<E> newTail;
if (value instanceof MpscLinkedQueueNode) {
newTail = (MpscLinkedQueueNode<E>) value;
newTail.setNext(null);
} else {
newTail = new DefaultNode<E>(value);
}
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
oldTail.setNext(newTail);
return true;
}
private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
return getAndSet(node);
}
MpscLinkedQueue 的 offer()方法很简短,但是恰恰就是整个添加队列元素加入的流程,当元素被加入的时候,首先判断加入的元素是否是 MpscLinkedQueueNode,如果不是则进行封装。之后便是整个操作的重点:
在 MpscLinkedQueue 中,是不支持 remove()的方法去从队列中移除任意一个元素的。原因很简单,消费者和生产者是无锁的,消费者可以通过比较队首和队尾元素是否一致来保证线程安全地从队首取数据,但是 remove()从队列中任意位置修改数据是线程不安全的,主要体现在移除队尾元素可能会导致正在加入的新元素被丢弃。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。