netty的每一个channel都会有一个绑定当前channel的attachment(附件),有点类似ThreadLocal,但在netty里你不能直接用ThreadLocal,因为一个EventLoop可能与多个channel绑定,一个线程可能处理几个channel,这时ThreadLocal无法做到与某一channel绑定。
直接上代码吧,attachment用起来很简单,调用channel的attr方法拿到Attribute,然后get/set即可
1
2
3
4
5private void setLoginFlag(Channel channel, String deviceId) { channel.attr(ChannelConfig._LOGIN).set(true); channel.attr(ChannelConfig._DEVICE_ID).set(deviceId); log.info("device login success ,seviceId:{}", deviceId); }
然后看看attr这个方法,这里声明了一个volatile的原子数组,实例化在attr方法里懒加载。一个channel可能会被多个线程引用,attr可能有并发问题,需要做同步,但可以看到这里并没有加锁,而是先通过原子更新工具CAS通过反射加载到字段里,看能否更新成功,如果不成功,则说明其他线程把这个attributes数组实例化过了。这个数组是用来保存attachment的容器,index方法用于生成随机的数组下标,具体的算法是key.id & 数组长度-1,key.id保存在AttributeKey里,由一个AtomicInteger递增获得。
然后通过这个数组下标看能不能拿到Atturibute,如果拿不到,那先声明一个链表头,然后把你传进去的key追加到链表尾部,再把链表头head用CAS放到原子数组里,这个CAS的过程如果失败了,说明其他线程把它放进去了,这时就需要用加锁的方法,把head的引用指向attributes.get(i),加锁循环链表直到链表尾部,然后把追加到尾部,这个循环过程也会先判断一下这个key是否已经存在,如果存在了那就直接取出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67@SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater = AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes"); private static final int BUCKET_SIZE = 4; private static final int MASK = BUCKET_SIZE - 1; // Initialize lazily to reduce memory consumption; updated by AtomicReferenceFieldUpdater above. @SuppressWarnings("UnusedDeclaration") private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes; @SuppressWarnings("unchecked") @Override public <T> Attribute<T> attr(AttributeKey<T> key) { if (key == null) { throw new NullPointerException("key"); } AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes; if (attributes == null) { // Not using ConcurrentHashMap due to high memory consumption. attributes = new AtomicReferenceArray<DefaultAttribute<?>>(BUCKET_SIZE); if (!updater.compareAndSet(this, null, attributes)) { attributes = this.attributes; } } int i = index(key); DefaultAttribute<?> head = attributes.get(i); if (head == null) { // No head exists yet which means we may be able to add the attribute without synchronization and just // use compare and set. At worst we need to fallback to synchronization and waste two allocations. head = new DefaultAttribute(); DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key); head.next = attr; attr.prev = head; if (attributes.compareAndSet(i, null, head)) { // we were able to add it so return the attr right away return attr; } else { head = attributes.get(i); } } synchronized (head) { DefaultAttribute<?> curr = head; for (;;) { DefaultAttribute<?> next = curr.next; if (next == null) { DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key); curr.next = attr; attr.prev = curr; return attr; } if (next.key == key && !next.removed) { return (Attribute<T>) next; } curr = next; } } } private static int index(AttributeKey<?> key) { return key.id() & MASK; }
最后
以上就是秀丽水蜜桃最近收集整理的关于netty-channel的attachment源码解读的全部内容,更多相关netty-channel内容请搜索靠谱客的其他文章。
发表评论 取消回复