我是靠谱客的博主 专注天空,最近开发中收集的这篇文章主要介绍Disruptor源码分析(二)RingBuffer,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

RingBuffer,从字面上看,它是一个环形缓冲队列。实际上,它不是一个队列,因为它不具备队列的特性,比如FIFO等。

RingBuffer本身是一个数组,之所以说它是环形队列,因为它通过算法维持了一个类似环形队列的数据结构。

上图是对RingBuffer的一个抽象描述。

假定一个8个槽的RingBuffer,则本质上是一个长度为8的数组,那如何模拟一个环形队列呢?

通过序列号(sequence)来映射数据元素在数组中的下标。Sequence是一个从0开始,一直递增的序列号,其映射关系为:index = sequence & (array length-1)。

RingBufferPad和RingBufferFields源码如下:

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static
    {
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
        else
        {
            throw new IllegalStateException("Unknown pointer size");
        }
        BUFFER_PAD = 128 / scale;
        // Including the buffer pad in the array base offset
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
    }

    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;

    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence)
    {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

RingBufferPad是一个抽象类,定义了7个long类型的变量,填充缓存行以避免伪共享。

RingBufferFields 也是一个抽象类,继承了RingBufferPad。RingBufferFields是RingBuffer很重要的组成部分,它维护了一个Object数组,用以存储各种类型的数据对象。另外,它还定义了对数组进行操作的方法。

RingBufferFields关键属性和方法说明:

  • final int scale = UNSAFE.arrayIndexScale(Object[].class):获取每个数组元素在内存中的大小。此处为Object对象数据,所以数组中每个元素是对象引用,即对象指针。在64位操作系统中,在JVM不启用对象指针压缩的时候(vm参数添加-XX:-UseCompressedOops),scale为8。如果启动对象指针压缩,scale为4。
  • private static final int BUFFER_PAD:缓存行填充,此值为128/scale,即32或16。

在定义Object数组entries时:

  1. this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]

在初始化Object数组entries时:

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

在entries数组前后填充32或16个对象引用,保证前后各占用128个字节的内存,这样对于缓存行大小为64或128字节的系统来说,可以避免其它变量与entries数组元素在一个缓存行内,避免伪共享问题。

  • private static final long REF_ARRAY_BASE:数组在内存中的偏移量,也就是第一个元素的内存偏移量。
  1. REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;

由于entries数组被填充了32或16个对象引用,占用了128个字节,所以entries中第一个真实的元素在内存中的偏移量要加128。

  • private static final int REF_ELEMENT_SHIFT:entries数组中元素偏移量位移位数。如果scale为4,则该值为2,表示左移2位。如果scale为8,则该值为3,表示左移3位。
  1. UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
  • this.indexMask = bufferSize – 1:entries数组下标索引掩码。

通过上面的分析,我们可以看出,RingBufferFields在设计上如下特点:

  1. 固定大小数组:由于数组占用一块连续的内存空间,可以利用CPU的缓存策略,预先读取数组元素附近的元素;
  2. 数组预填充:避免了垃圾回收代来的系统开销;
  3. 缓存行填充:解决伪共享问题;
  4. 位操作:加快系统的计算速度;
  5. 使用数组+系列号的这种方法最大限度的提高了速度。因为如果使用传统的队列的话,在多线程环境下对队列头和队列尾的锁竞争是一种很大的系统开销。

RingBuffer源码如下:

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    /**
     * Construct a RingBuffer with the full option set.
     *
     * @param eventFactory to newInstance entries for filling the RingBuffer
     * @param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer with the specified wait strategy.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new single producer RingBuffer with the specified wait strategy.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see SingleProducerSequencer
     */
    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

    /**
     * Create a new single producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
    {
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param producerType producer type to use {@link ProducerType}.
     * @param factory      used to create events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
......略
}
 

RingBuffer是事件生产者与事件处理者之间进行数据交换的数据载体。

Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.

RingBuffer提供了一种线程间进行信息交换的机制。

RingBuffer实现了三个接口,分别是 Cursored、EventSequencer 和 EventSink,以及继承了 RingBufferFields。

RingBuffer提供了一系列的方法,用以操作事件对象数组。

由于方法相对简单,在此不再详述。

最后

以上就是专注天空为你收集整理的Disruptor源码分析(二)RingBuffer的全部内容,希望文章能够帮你解决Disruptor源码分析(二)RingBuffer所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部