Disruptor 外围原理剖析及其在i主题业务中的运行 高性能无锁队列
高性能无锁队列 Disruptor 外围原理剖析及其在i主题业务中的运行
本文首先引见了 Disruptor 高性能内存队列的基本概念、经常使用 Demo、高性能原理及源码剖析,最后经过两个例子引见了 Disruptor 在i主题业务中的运行。一、i主题及 Disruptor 简介
i主题是 vivo 旗下的一款主题商店 app,用户可以经过下载主题、壁纸、字体等,成功对手机界面格调的一键改换和自定义。
Disruptor 是英国外汇买卖公司 LMAX 开发的一个高性能的内存队列(用于系统外部线程间传递信息,不同于 RocketMQ、Kafka这种散布式信息队列),基于 Disruptor 开发的系统复线程能撑持每秒600万订单。目前,包括 ApacheStorm、Camel、Log4j 2在内的很多出名名目都运行了 Disruptor 以失掉高性能。在 vivo外部它也有不少运行,比如自定义监控中经常使用 Disruptor 队列来暂存经过监控 SDK上报的监控数据,i主题中也经常使用它来统计本地内存目的数据。
接上去从 Disruptor 和 JDK 内置队列的对比、Disruptor 外围概念、Disruptor 经常使用Demo、Disruptor外围源码、Disruptor 高性能原理、Disruptor 在 i主题业务中的运行几个角度来引见 Disruptor。
二、和 JDK 中内置的队列对比
上方来看下 JDK 中内置的队列和 Disruptor 的对比。队列的底层成功普通分为三种:数组、链表和堆,其中堆普通是为了成功带有优先级特性的队列,暂不思考。另外,像 ConcurrentLinkedQueue 、LinkedTransferQueue属于无界队列,在稳固性要求特意高的系统中,为了防止消费者速渡过快,造成内存溢出,只能选用有界队列。这样 JDK 中剩下可选的线程安保的队列还有ArrayBlockingQueue
由于 LinkedBlockingQueue 是基于链表成功的,由于链表存储的数据在内存里不延续,关于高速缓存并不友好,而且LinkedBlockingQueue 是加锁的,性能较差。ArrayBlockingQueue有雷同的疑问,它也须要加锁,另外,ArrayBlockingQueue 存在伪共享疑问,也会造成性能变差。而当天要引见的 Disruptor是基于数组的有界无锁队列,合乎空间部分性原理,可以很好的应用 CPU 的高速缓存,同时它防止了伪共享,大大优化了性能。
三、Disruptor 外围概念
如下图,从数据流转的角度先对 Disruptor 有一个直观的概念。Disruptor 支持单(多)消费者、单(多)消费者形式。消费时支持广播消费(HandlerA会消费处置一切信息,HandlerB 也会消费处置一切信息)、集群消费(HandlerA 和 HandlerB各消费部分信息),HandlerA 和HandlerB 消费成功后会把信息交给 HandlerC 继续处置。
上方联合 Disruptor 官网的架构图引见下 Disruptor 的外围概念:
四、Disruptor经常使用Demo
Event 是详细的数据实体,消费者消费 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它启动逻辑处置。Event 就是一个普通的 Java 对象,无需成功 Disruptor 内定义的接口。
public class OrderEvent {private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}}
用于创立 Event 对象。
public class OrderEventFactory implements EventFactory<OrderEvent> {public OrderEvent newInstance() {return new OrderEvent();}}
可以看到,生成者关键是持有 RingBuffer 对象启动数据的颁布。这里有几个点须要留意:
public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer>
消费者可以成功 EventHandler 接口,定义自己的处置逻辑。
public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event,long sequence,boolean endOfBatch) throws Exception {System.out.println("消费者: " + event.getValue());}}
public static void main(String[] args) {OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(1);/*** 1. 实例化disruptor对象1) eventFactory: 信息(event)工厂对象2) ringBufferSize: 容器的长度3) executor:4) ProducerType: 单消费者还是多消费者5) waitStrategy: 期待战略*/Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());// 2. 参与消费者的监听disruptor.handleEventsWith(new OrderEventHandler());// 3. 启动disruptordisruptor.start();// 4. 失掉实践存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
五、Disruptor 源码剖析
本文剖析时以单(多)消费者、单消费者为例启动剖析。
首先是经过传入的参数创立 RingBuffer,将创立好的 RingBuffer 与传入的 executor 交给 Disruptor 对象持有。
public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,final Executor executor,final ProducerType producerType,final WaitStrategy waitStrategy){this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),executor);}
接上去剖析 RingBuffer 的创立环节,分为单消费者与多消费者。
public static <E> RingBuffer<E> create(ProducerType producerType,EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){switch (producerType){case SINGLE:// 单消费者return createSingleProducer(factory, bufferSize, waitStrategy);case MULTI:// 多消费者return createMultiProducer(factory, bufferSize, waitStrategy);default:throw new IllegalStateException(producerType.toString());}}
不论是单消费者还是多消费者,最终都会创立一个 RingBuffer 对象,只是传给 RingBuffer 的 Sequencer 对象不同。可以看到,RingBuffer 外部最终创立了一个Object 数组来存储 Event 数据。这里有几点须要留意:
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize,waitStrategy);return new RingBuffer<E>(factory, sequencer);}
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){// 省略部分代码...// 额外创立2个填充空间的大小, 首尾填充, 防止数组的有效载荷和其它成员加载到同一缓存行this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];fill(eventFactory);}private void fill(EventFactory<E> eventFactory){for (int i = 0; i < bufferSize; i++){// BUFFER_PAD + i为真正的数组索引entries[BUFFER_PAD + i] = eventFactory.newInstance();}}
参与消费者的外围代码如下所示,外围就是为将一个
而后参与到consumerRepository 中,后续启动 Disruptor 时,会遍历 consumerRepository 中的一切 BatchEventProcessor(成功了Runnable 接口),将 BatchEventProcessor 义务提交到线程池中。
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){// 经过disruptor对象间接调用handleEventsWith方法时传的是空的Sequence数组return createEventProcessors(new Sequence[0], handlers);}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers) {// 搜集参与的消费者的序号final Sequence[] processorSequences = new Sequence[eventHandlers.length];// 本批次消费由于参与在同一个节点之后, 因此共享该屏障final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);// 为每个EventHandler创立一个BatchEventProcessorfor (int i = 0, eventHandlersLength = eventHandlers.length;i < eventHandlersLength; i++) {final EventHandler<? super T> eventHandler = eventHandlers[i];final BatchEventProcessor<T> batchEventProcessor =new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null){batchEventProcessor.setExceptionHandler(exceptionHandler);}// 参与到消费者信息仓库中consumerRepository.add(batchEventProcessor, eventHandler, barrier);processorSequences[i] = batchEventProcessor.getSequence();}// 降级网关序列(消费者只有要关注一切的末端消费者节点的序列)updateGatingSequencesForNextInChain(barrierSequences, processorSequences);return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}
创立完 Disruptor 对象之后,可以经过 Disruptor 对象参与 EventHandler,这里有一须要留意:经过 Disruptor对象间接调用 handleEventsWith 方法时传的是空的 Sequence 数组,这是什么意思?可以看到createEventProcessors 方法接纳该空 Sequence 数组的字段名是barrierSequences,翻译成中文就是栅栏序号。怎样了解这个字段?
比如经过如下代码给 Disruptor 参与了两个handler,记为 handlerA 和 handlerB,这种是串行消费,关于一个Event,handlerA 消费完后才干轮到 handlerB 去消费。关于 handlerA来说,它没有前置消费者(生成者消费到哪里,消费者就可以消费到哪里),因此它的 barrierSequences 是一个空数组。而关于handlerB 来说,它的前置消费者是 handlerA,因此它的 barrierSequences 就是A的消费进展,也就是说handlerB 的消费进展是要小于 handlerA 的消费进展的。
disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);
假设是经过如下形式参与的 handler,则 handlerA 和handlerB 会消费一切 Event 数据,相似 MQ 信息中的广播消费,而 handlerC 的 barrierSequences 数组就是蕴含了 handlerA 的消费进展和 handlerB 的消费进展,这也是为什么barrierSequences 是一个数组,后续 handlerC在消费数据时,会取A和B消费进展的较小值启动判别,比如A消费到进展6,B消费到进展4,那么C只能去消费下标为3的数据,这也是barrierSequences 的含意。
disruptor.handleEventsWith(handlerA, handlerB).handleEventsWith(handlerC);
Disruptor的启动逻辑比拟繁复,就是遍历consumerRepository 中搜集的 EventProcessor(成功了Runnable接口),将它提交到创立 Disruptor 时指定的executor中,EventProcessor 的 run 方法会启动一个while 循环,不时尝试从 RingBuffer 中失掉数据启动消费。
disruptor.start();
public RingBuffer<T> start() {checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository) {consumerInfo.start(executor);}return ringBuffer;}public void start(final Executor executor) {executor.execute(eventprocessor);}
在剖析 Disruptor 的颁布数据的源码前,先来回忆下颁布数据的全体流程。
public void sendData(ByteBuffer>
next 方法自动放开一个序号。nextValue 示意已调配的序号,nextSequence 示意在此基础上再放开n个序号(此处n为1),cachedValue 示意缓存的消费者的最小消费进展。
假定有一个 size 为8的 RingBuffer,下标为6的数据曾经颁布好(nextValue为6),消费者不时未开启消费(cachedValue 和
cachedGatingSequence 为-1),此时消费者想继续颁布数据,调用 next() 方法放开失掉序号为7的位置(nextSequence为7),计算失掉的 wrapPoint 为7-8=-1,此时 wrapPoint 等于
cachedGatingSequence,可以继续颁布数据,如左图。最后将 nextValue 赋值为7,示意序号7的位置曾经被消费者占用了。
接着消费者继续调用 next() 方法放开序号为0的数据,此时 nextValue为7,nextSequence 为8,wrapPoint 等于0, 由于消费者迟迟未消费
(cachedGatingSequence为-1),此时 wrapPoint 大于了 cachedGatingSequence,因此 next 方法的if判别成立,会调用LockSupport.parkNanos 阻塞期待消费者启动消费。其中 getMinimumSequence方法是失掉多个消费者的最小消费进展。
public long next() {return next(1);}
public long next(int n) {/*** 已调配的序号的缓存(已调配到这里), 初始-1. 可以看该方法的前往值nextSequence,* 接上去消费者就会该往该位置写数据, 它赋值给了nextValue, 所以下一次性调用next方* 法时, nextValue位置就是示意曾经消费好了数据, 接上去要放开nextSequece的数据*/long nextValue = this.nextValue;// 本次放开调配的序号long nextSequence = nextValue + n;// 造成环路的点:环形缓冲区或者追尾的点 = 等于本次放开的序号-环形缓冲区大小// 假设该序号大于最慢消费者的进展, 那么示意追尾了, 须要期待long wrapPoint = nextSequence - bufferSize;// 上次缓存的最小网关序号(消费最慢的消费者的进展)long cachedGatingSequence = this.cachedValue;// wrapPoint > cachedGatingSequence 示意消费者追上消费者发生环路(追尾), 即缓冲区已满,// 此时须要失掉消费者们最新的进展, 以确定能否队列满if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {// 拔出StoreLoad内存屏障/栅栏, 保障可见性。// 由于publish经常使用的是set()/putOrderedLong, 并不保障其余消费者能及时看见颁布的数据// 当我再次放开更多的空间时, 必定保障消费者能消费颁布的数据cursor.setVolatile(nextValue);long minSequence;// minSequence是多个消费者的最小序号, 要等一切消费者消费完了才干继续消费while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))) {LockSupport.parkNanos(1L);}// 缓存消费者们最新的消费进展this.cachedValue = minSequence;}// 这里只写了缓存, 并未写volatile变量, 由于只是预调配了空间然而并未被颁布数据,// 不须要让其余消费者感知到。消费者只会感知到真正被颁布的序号this.nextValue = nextSequence;return nextSequence;}
间接经过 Unsafe 工具类失掉指定序号的 Event 对象,此时失掉的是空对象,因此接上去须要对该 Event 对象启动业务赋值,赋值成功后调用 publish 方法启动最终的数据颁布。
OrderEvent event = ringBuffer.get(sequence);
public E get(long sequence) {return elementAt(sequence);}
protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries,REF_ARRAY_BASE +((sequence & indexMask) << REF_ELEMENT_SHIFT));}
消费者失掉到可用序号后,首先对该序号处的空 Event 对象启动业务赋值,接着调用 RingBuffer 的 publish 方法颁布数据,RingBuffer 会委托给其持有的sequencer(单消费者和多消费者对应不同的 sequencer)对象启动真正颁布。单消费者的颁布逻辑比拟繁难,降级下 cursor进展(cursor 示意消费者的消费进展,该位置已实践颁布数据,而 next 方法中的 nextSequence示意消费者放开的最大序号,或者还未实践颁布数据),接着唤醒期待的消费者。
waitStrategy 有不同的成功,因此唤醒逻辑也不尽相反,如驳回 BusySpinWaitStrategy战略时,消费者失掉不到数据时自旋期待,而后继续判别能否有新数据可以消费了,因此 BusySpinWaitStrategy 战略的signalAllWhenBlocking 就是一个空成功,啥也不做。
ringBuffer.publish(sequence);
public void publish(long sequence) {sequencer.publish(sequence);}
public void publish(long sequence) {// 更重消费者进展cursor.set(sequence);// 唤醒期待的消费者waitStrategy.signalAllWhenBlocking();}
前面提到,Disruptor 启动时,会将封装 EventHandler 的EventProcessor(此处以 BatchEventProcessor为例)提交到线程池中运转,BatchEventProcessor 的 run 方法会调用 processEvents 方法不时尝试从RingBuffer 中失掉数据启动消费,上方剖析下 processEvents 的逻辑(代码做了精简)。它会开启一个 while 循环,调用sequenceBarrier.waitFor方法失掉最大可用的序号,比如失掉序号一节所提的,消费者继续消费,消费者不时未消费,此时消费者曾经将整个 RingBuffer数据都消费满了,消费者不可再继续消费,消费者此时会阻塞。假定这时刻消费者开局消费,因此 nextSequence 为0,而
availableSequence 为7,此时消费者可以批量消费,将这8条已消费者的数据所有消费完,消费成功后降级下消费进展。降级消费进展后,消费者经过Util.getMinimumSequence 方法就可以感知到最新的消费进展,从而不再阻塞,继续颁布数据了。
private void processEvents() {T event = null;// sequence记载消费者的消费进展, 初始为-1long nextSequence = sequence.get() + 1L;// 死循环,因此不会让出线程,须要独立的线程(每一个EventProcessor都须要独立的线程)while (true) {// 经过屏障失掉到的最大可用序号final long availableSequence = sequenceBarrier.waitFor(nextSequence);// 批量消费while (nextSequence <= availableSequence) {event =>
上方剖析下 SequenceBarrier 的 waitFor 方法。首先它会调用 waitStrategy 的 waitFor 方法失掉最大可用序号,以 BusySpinWaitStrategy 战略为例,它的 waitFor 方法的三个参数的含意区分是:
由于 dependentSequence 分为两种状况,所以 waitFor 的逻辑也可以分为两种状况探讨:
在 waitStrategy 的 waitFor 方法前往,失掉最大可用的序号 availableSequence 后,最后须要再调用下 sequencer 的
getHighestPublishedSequence失掉真正可用的最大序号,这和消费者模型有相关,假设是单消费者,由于数据是延续颁布的,间接前往传入的 availableSequence。而假设是多消费者,由于多消费者是有多个线程在消费数据,颁布的数据是不延续的,因此须要经过
getHighestPublishedSequence 方法失掉已颁布的且延续的最大序号,由于失掉序号启动消费时须要是顺序的,不能腾跃。
public long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException {/*** sequence: 消费者希冀失掉的序号* cursorSequence: 消费者的序号* dependentSequence: 消费者须要依赖的序号*/long availableSequence = waitStrategy.waitFor(sequence,cursorSequence,dependentSequence, this);if (availableSequence < sequence) {return availableSequence;}// 目的sequence曾经颁布了, 这里失掉真正的最大序号(和消费者模型无关)return sequencer.getHighestPublishedSequence(sequence, availableSequence);}
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence,final SequenceBarrier barrier) throws AlertException, InterruptedException {long availableSequence;// 确保该序号曾经被我前面的消费者消费(协调与其余消费者的相关)while ((availableSequence = dependentSequence.get()) < sequence) {barrier.checkAlert();// 自旋期待ThreadHints.onSpinWait();}return availableSequence;}
六、Disruptor 高性能原理剖析
前文剖析源码时引见到,RingBuffer 外部保养了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object数组就曾经经常使用EventFactory 初始化了一些空 Event,后续就不须要在运转时来创立了,防止频繁GC。
另外,RingBuffe的数组中的元素是在初始化时一次性性所有创立的,所以这些元素的内存地址大略率是延续的。消费者在消费时,是遵照空间部分性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的前面的 Event 也加载进Cache 中,这样当消费第二个 Event 时,它曾经在 CPU Cache 中了,所以就不须要从内存中加载了,这样也可以大大优化性能。
如下代码所示,定义了一个 Pointer 类,它有2个 long 类型的成员变量x、y,而后在 main 方法中其中2个线程区分对同一个 Pointer对象的x和y自增 100000000 次,最后统计下方法耗时,在我本机电脑上测试屡次,平均约为3600ms。
public class Pointer {volatile long x;volatile long y;@Overridepublic String toString() {return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]").add("x=" + x).add("y=" + y).toString();}}
public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();int num = 100000000;long start = System.currentTimeMillis();Thread t1 = new Thread(() -> {for(int i = 0; i < num; i++){pointer.x++;}});Thread t2 = new Thread(() -> {for(int i = 0; i < num; i++){pointer.y++;}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis() - start);System.out.println(pointer);}
接着将 Pointer 类修正如下:在变量x和y之间拔出7个 long 类型的变量,仅此而已,接着继续经过上述的 main方法统计耗时,平均约为500ms。可以看到,修正前的耗时是修正后(防止了伪共享)的7倍多。那么什么是伪共享,为什么防止了伪共享能有这么大的性能优化呢?
public class Pointer {volatile long x;long p1, p2, p3, p4, p5, p6, p7;volatile long y;@Overridepublic String toString() {return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]").add("x=" + x).add("y=" + y).toString();}}
6.2.2、防止伪共享为什么可以优化性能
内存的访问速度是远远慢于 CPU 的,为了高效应用 CPU,在 CPU 和内存之间加了缓存,称为 CPU Cache。为了提高性能,须要更多地从 CPU Cache里失掉数据,而不是从内存中失掉数据。CPU Cache 加载内存里的数据,是以缓存行(通常为64字节)为单位加载的。Java 的 long类型是8字节,因此一个缓存行可以寄存8个 long 类型的变量。
然而,这种加载带来了一个坏处,如上述例子所示,假定有一个 long 类型的变量x,另外还有一个 long 类型的变量y紧挨着它,那么当加载x时也会加载y。假设此时 CPU Core1的线程在对x启动修正,另一个 CPU Core2 的线程却在对y启动读取。者修正x时,会把x和y同时加载到 CPU Core1 对应的CPU Cache 中,降级完后x和其它一切蕴含x的缓存行都将失效。而当 CPU Core2的线程读取y时,发现这个缓存行曾经失效了,须要从主内存中从新加载。
这就是伪共享,x和y不相干,然而却由于x的降级造成须要从新从主内存读取,拖慢了程序性能。处置方法之一就是如上述示例中所做,在x和y之间填充7个 long 类型的变量,保障x和y不会被加载到同一个缓存行中去。Java8中也参与了新的注解@Contended(JVM加上启动参数-XX:-RestrictContended 才会失效),也可以防止伪共享。
Disruptor 中经常使用 Sequence 类的 value 字段来示意消费/消费进展,可以看到在该字段前后各填充了7个 long 类型的变量,来防止伪共享。另外,向 RingBuffer 外部的数组、
SingleProducerSequencer 等也经常使用了该技术。
class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;}class Value extends LhsPadding {protected volatile long value;}class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;}
消费者消费数据时,须要入队。消费者消费数据时,须要出队。入队时,不能笼罩没有消费的元素。出队时,不能读取没有写入的元素。因此,Disruptor 中须要保养一个入队索引(消费者数据消费到哪里,对应 AbstractSequencer 中的 cursor)和一个出队索引(一切消费者中消费进展最小的序号)。
Disruptor 中最复杂的是入队操作,上方以多消费者(MultiProducerSequencer)的 next(n) 方法(放开n个序号)为例剖析下Disruptor 是如何成功无锁操作的。代码如下所示,判别下能否有足够的序号(空余位置),假设没有,就让出 CPU经常使用权,而后从新判别。假设有,则经常使用 CAS 设置 cursor(入队索引)。
public long next(int n) {do {// cursor相似于入队索引, 指的是上次消费到这里current = cursor.get();// 目的是再消费n个next = current + n;// 前文剖析过, 用于判别消费者能否曾经追上消费进展, 消费者能否放开到n个序号long wrapPoint = next - bufferSize;// 失掉缓存的上一次性的消费进展long cachedGatingSequence = gatingSequenceCache.get();// 第一步:空间无余就继续期待if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {// 从新计算下一切消费者里的最小消费进展long gatingSequence = Util.getMinimumSequence(gatingSequences, current);// 依然没有足够的空间, 让出CPU经常使用权if (wrapPoint > gatingSequence) {LockSupport.parkNanos(1);continue;}// 降级下最新的最小的消费进展gatingSequenceCache.set(gatingSequence);}// 第二步:看见空间足够时尝试CAS竞争空间else if (cursor.compareAndSet(current, next)) {break;}} while (true);return next;}
这个比拟好了解,在前文剖析消费数据的逻辑时引见了,消费者会失掉下最大可用的序号,而后批量消费这些信息。
七、Disruptor 在i主题业务中的经常使用
很多开源名目都经常使用了 Disruptor,比如日志框架 Log4j2 经常使用它来成功异步日志。HBase、Storm 等名目中也经常使用了到了 Disruptor。vivo 的 i主题业务也经常使用了 Disruptor,上方繁难引见下它的2个经常使用场景。
业务监控系统关于企业来说十分关键,可以协助企业及时发现和处置疑问,可以繁难的检测业务目的数据,改良业务决策,从而保障业务的可继续开展。i主题经常使用 Disruptor(多消费者单消费者)来暂存待上报的业务目的数据,而后有定时义务不时提取数据上报到监控平台,如下图所示。
i主题业务中少量经常使用了本地缓存,为了统计本地缓存中key 的个数(去重)以及每种缓存形式 key 的数量,思考经常使用 Disruptor来暂存并消费处置数据。由于业务代码里很多中央触及到本地缓存的访问,也就是说,消费者是多线程的。思考到消费处置比拟繁难,而假设经常使用多线程消费的话又触及到加锁同步,因此消费者驳回复线程形式。
全体流程如下图所示,首先在缓存访问工具类中参与缓存访问统计上报的调用,缓存访问数据进入到 RingBuffer 后,复线程消费者经常使用 HyperLogLog 来去重统计不同key的个数,经常使用正则婚配来统计每种形式key的数量。而后有异步义务定时失掉统计结果,启动展现。
须要留意的是,由于 RingBuffer 队列大小是固定的,假设消费者消费过快而消费者消费不上来,假设经常使用 next方法放开序号,假设残余空间不够会造成消费者阻塞,因此倡导经常使用 tryPublishEvent 方法去颁布数据,它外部是经常使用 tryNext方法放开序号,该方法假设放开不到可用序号会抛出意外,这样消费者感知到了就可以做兼容处置,而不是阻塞期待。
八、经常使用倡导
九、总结
本文首先经过对比 JDK 中内置的线程安保的队列和Disruptor 的特点,引入了高性能无锁内存队列 Disruptor。接着引见了 Disruptor的外围概念和基本经常使用,使读者对 Disruptor 建设后来步的意识。接着从源码和原理角度引见了 Disruptor的外围成功以及高性能原理(空间预调配、防止伪共享、无锁、支持批量消费)。其次,联合i主题业务引见了 Disruptor在通常中的运行。最后,基于上述原理剖析及运行实战,总结了一些 Disruptor 最佳通常战略。
参考文章: