概述
RingBuffer,从字面上看,它是一个环形缓冲队列。实际上,它不是一个队列,因为它不具备队列的特性,比如FIFO等。
RingBuffer本身是一个数组,之所以说它是环形队列,因为它通过算法维持了一个类似环形队列的数据结构。
上图是对RingBuffer的一个抽象描述。
假定一个8个槽的RingBuffer,则本质上是一个长度为8的数组,那如何模拟一个环形队列呢?
通过序列号(sequence)来映射数据元素在数组中的下标。Sequence是一个从0开始,一直递增的序列号,其映射关系为:index = sequence & (array length-1)。
RingBufferPad和RingBufferFields源码如下:
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad
{
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
}
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
RingBufferPad是一个抽象类,定义了7个long类型的变量,填充缓存行以避免伪共享。
RingBufferFields 也是一个抽象类,继承了RingBufferPad。RingBufferFields是RingBuffer很重要的组成部分,它维护了一个Object数组,用以存储各种类型的数据对象。另外,它还定义了对数组进行操作的方法。
RingBufferFields关键属性和方法说明:
- final int scale = UNSAFE.arrayIndexScale(Object[].class):获取每个数组元素在内存中的大小。此处为Object对象数据,所以数组中每个元素是对象引用,即对象指针。在64位操作系统中,在JVM不启用对象指针压缩的时候(vm参数添加-XX:-UseCompressedOops),scale为8。如果启动对象指针压缩,scale为4。
- private static final int BUFFER_PAD:缓存行填充,此值为128/scale,即32或16。
在定义Object数组entries时:
- this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]
在初始化Object数组entries时:
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
在entries数组前后填充32或16个对象引用,保证前后各占用128个字节的内存,这样对于缓存行大小为64或128字节的系统来说,可以避免其它变量与entries数组元素在一个缓存行内,避免伪共享问题。
- private static final long REF_ARRAY_BASE:数组在内存中的偏移量,也就是第一个元素的内存偏移量。
- REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
由于entries数组被填充了32或16个对象引用,占用了128个字节,所以entries中第一个真实的元素在内存中的偏移量要加128。
- private static final int REF_ELEMENT_SHIFT:entries数组中元素偏移量位移位数。如果scale为4,则该值为2,表示左移2位。如果scale为8,则该值为3,表示左移3位。
- UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
- this.indexMask = bufferSize – 1:entries数组下标索引掩码。
通过上面的分析,我们可以看出,RingBufferFields在设计上如下特点:
- 固定大小数组:由于数组占用一块连续的内存空间,可以利用CPU的缓存策略,预先读取数组元素附近的元素;
- 数组预填充:避免了垃圾回收代来的系统开销;
- 缓存行填充:解决伪共享问题;
- 位操作:加快系统的计算速度;
- 使用数组+系列号的这种方法最大限度的提高了速度。因为如果使用传统的队列的话,在多线程环境下对队列头和队列尾的锁竞争是一种很大的系统开销。
RingBuffer源码如下:
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
/**
* Construct a RingBuffer with the full option set.
*
* @param eventFactory to newInstance entries for filling the RingBuffer
* @param sequencer sequencer to handle the ordering of events moving through the RingBuffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
*/
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
/**
* Create a new multiple producer RingBuffer with the specified wait strategy.
*
* @param <E> Class of the event stored in the ring buffer.
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @return a constructed ring buffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
* @see MultiProducerSequencer
*/
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
/**
* Create a new multiple producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}.
*
* @param <E> Class of the event stored in the ring buffer.
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @return a constructed ring buffer.
* @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
* @see MultiProducerSequencer
*/
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
/**
* Create a new single producer RingBuffer with the specified wait strategy.
*
* @param <E> Class of the event stored in the ring buffer.
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @return a constructed ring buffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
* @see SingleProducerSequencer
*/
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
/**
* Create a new single producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}.
*
* @param <E> Class of the event stored in the ring buffer.
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @return a constructed ring buffer.
* @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
* @see MultiProducerSequencer
*/
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
{
return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
}
/**
* Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
*
* @param <E> Class of the event stored in the ring buffer.
* @param producerType producer type to use {@link ProducerType}.
* @param factory used to create events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @return a constructed ring buffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
*/
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
......略
}
RingBuffer是事件生产者与事件处理者之间进行数据交换的数据载体。
Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
RingBuffer提供了一种线程间进行信息交换的机制。
RingBuffer实现了三个接口,分别是 Cursored、EventSequencer 和 EventSink,以及继承了 RingBufferFields。
RingBuffer提供了一系列的方法,用以操作事件对象数组。
由于方法相对简单,在此不再详述。
最后
以上就是专注天空为你收集整理的Disruptor源码分析(二)RingBuffer的全部内容,希望文章能够帮你解决Disruptor源码分析(二)RingBuffer所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复