表结构说明
字段名称 | 字段描述 |
---|---|
biz_tag | 业务标识 |
max_id | 号段最大值 |
step | 号段最小步长 |
description | 描述信息 |
update_time | 号段切换时间 |
缓存初始化
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } List<String> cacheTags = new ArrayList<String>(cache.keySet()); List<String> insertTags = new ArrayList<String>(dbTags); List<String> removeTags = new ArrayList<String>(cacheTags); //db中新加的tags灌进cache insertTags.removeAll(cacheTags); for (String tag : insertTags) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的tags从cache删除 removeTags.removeAll(dbTags); for (String tag : removeTags) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); }
号段初始化
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; if (!buffer.isInitOk()) { // 号段首次初始化 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 根据DB步长更新最大值 buffer.setStep(leafAlloc.getStep()); // 设置当前步长 buffer.setMinStep(leafAlloc.getStep()); // 最小步长为DB配置步长 } else if (buffer.getUpdateTimestamp() == 0) { // 号段首次切换 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 根据DB步长更新最大值 buffer.setUpdateTimestamp(System.currentTimeMillis()); // 记录号段切换时间 buffer.setMinStep(leafAlloc.getStep()); } else { // 号段二次切换 // 计算号段总耗时,如果耗时越短,说明消耗的越快,为了避免频繁切换,步长翻倍,反之减半,但步长不能小于DB设置的步长 long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); // 初始化下一步长 if (duration < SEGMENT_DURATION) { // 如果耗时小于15分钟, if (nextStep * 2 > MAX_STEP) { //do nothing } else { // 步长翻倍 nextStep = nextStep * 2; } } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { // 步长减半,但必须大于最小步长 nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); // 根据最新步长,更新最大值 LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); // 设置号段切换时间 buffer.setUpdateTimestamp(System.currentTimeMillis()); // 设置最新步长 buffer.setStep(nextStep); buffer.setMinStep(leafAlloc.getStep()); } // 计算号段起始值 long value = leafAlloc.getMaxId() - buffer.getStep(); // 设置号段起始值、最大值及步长 segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment);
号段取值
循环从当前号段取值,通过读写锁进行并发控制,号段切换条件:
a)下一号段未准备好
b)当前号段剩余可用小于90%
c)当前无切号线程
1)从当前号段取值
复制代码
1
2
3
4
5
6
7
8
9
10
11try { buffer.rLock().lock(); // 获取读锁 final Segment segment = buffer.getCurrent(); // 获取当前号段 long value = segment.getValue().getAndIncrement(); // 获取值并递增 if (value < segment.getMax()) { // 如果值小于号段最大值,正常获取 return new Result(value, Status.SUCCESS); } } finally { buffer.rLock().unlock(); // 释放读锁 }
2)下一号段准备
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20Segment next = buffer.getSegments()[buffer.nextPos()]; // 获取下一号段 boolean updateOk = false; // 判断下一号段是否准备好 try { updateSegmentFromDb(buffer.getKey(), next); // 更新下一号段,参考号段初始化 updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { // 如果下一号段更新成功,获取写锁,更新buffer状态 buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } }
3)号段切换
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// 自旋并休眠等待下一号段准备 waitAndSleep(buffer); try { // 获取写锁,阻塞所有读操作 buffer.wLock().lock(); // 此处可能存在多线程竞争写锁,如果其他线程已切号完成,直接从当前号段取值返回 final Segment segment = buffer.getCurrent(); long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } // 判断下一号段是否准备好 if (buffer.isNextReady()) { // 切换号段 buffer.switchPos(); // 重置下一号段状态 buffer.setNextReady(false); } else { logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { // 释放写锁 buffer.wLock().unlock(); }
最后
以上就是野性音响最近收集整理的关于Leaf关键源码解析的全部内容,更多相关Leaf关键源码解析内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复