概述
netty的每一个channel都会有一个绑定当前channel的attachment(附件),有点类似ThreadLocal,但在netty里你不能直接用ThreadLocal,因为一个EventLoop可能与多个channel绑定,一个线程可能处理几个channel,这时ThreadLocal无法做到与某一channel绑定。
直接上代码吧,attachment用起来很简单,调用channel的attr方法拿到Attribute,然后get/set即可
private void setLoginFlag(Channel channel, String deviceId) {
channel.attr(ChannelConfig._LOGIN).set(true);
channel.attr(ChannelConfig._DEVICE_ID).set(deviceId);
log.info("device login success ,seviceId:{}", deviceId);
}
然后看看attr这个方法,这里声明了一个volatile的原子数组,实例化在attr方法里懒加载。一个channel可能会被多个线程引用,attr可能有并发问题,需要做同步,但可以看到这里并没有加锁,而是先通过原子更新工具CAS通过反射加载到字段里,看能否更新成功,如果不成功,则说明其他线程把这个attributes数组实例化过了。这个数组是用来保存attachment的容器,index方法用于生成随机的数组下标,具体的算法是key.id & 数组长度-1,key.id保存在AttributeKey里,由一个AtomicInteger递增获得。
然后通过这个数组下标看能不能拿到Atturibute,如果拿不到,那先声明一个链表头,然后把你传进去的key追加到链表尾部,再把链表头head用CAS放到原子数组里,这个CAS的过程如果失败了,说明其他线程把它放进去了,这时就需要用加锁的方法,把head的引用指向attributes.get(i),加锁循环链表直到链表尾部,然后把追加到尾部,这个循环过程也会先判断一下这个key是否已经存在,如果存在了那就直接取出来。
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater =
AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes");
private static final int BUCKET_SIZE = 4;
private static final int MASK = BUCKET_SIZE - 1;
// Initialize lazily to reduce memory consumption; updated by AtomicReferenceFieldUpdater above.
@SuppressWarnings("UnusedDeclaration")
private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes;
@SuppressWarnings("unchecked")
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
if (key == null) {
throw new NullPointerException("key");
}
AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
if (attributes == null) {
// Not using ConcurrentHashMap due to high memory consumption.
attributes = new AtomicReferenceArray<DefaultAttribute<?>>(BUCKET_SIZE);
if (!updater.compareAndSet(this, null, attributes)) {
attributes = this.attributes;
}
}
int i = index(key);
DefaultAttribute<?> head = attributes.get(i);
if (head == null) {
// No head exists yet which means we may be able to add the attribute without synchronization and just
// use compare and set. At worst we need to fallback to synchronization and waste two allocations.
head = new DefaultAttribute();
DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
head.next = attr;
attr.prev = head;
if (attributes.compareAndSet(i, null, head)) {
// we were able to add it so return the attr right away
return attr;
} else {
head = attributes.get(i);
}
}
synchronized (head) {
DefaultAttribute<?> curr = head;
for (;;) {
DefaultAttribute<?> next = curr.next;
if (next == null) {
DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
curr.next = attr;
attr.prev = curr;
return attr;
}
if (next.key == key && !next.removed) {
return (Attribute<T>) next;
}
curr = next;
}
}
}
private static int index(AttributeKey<?> key) {
return key.id() & MASK;
}
最后
以上就是秀丽水蜜桃为你收集整理的netty-channel的attachment源码解读的全部内容,希望文章能够帮你解决netty-channel的attachment源码解读所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复