概述
表结构说明
字段名称 | 字段描述 |
---|---|
biz_tag | 业务标识 |
max_id | 号段最大值 |
step | 号段最小步长 |
description | 描述信息 |
update_time | 号段切换时间 |
缓存初始化
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
List<String> insertTags = new ArrayList<String>(dbTags);
List<String> removeTags = new ArrayList<String>(cacheTags);
//db中新加的tags灌进cache
insertTags.removeAll(cacheTags);
for (String tag : insertTags) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags从cache删除
removeTags.removeAll(dbTags);
for (String tag : removeTags) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
号段初始化
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
if (!buffer.isInitOk()) {
// 号段首次初始化
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 根据DB步长更新最大值
buffer.setStep(leafAlloc.getStep()); // 设置当前步长
buffer.setMinStep(leafAlloc.getStep()); // 最小步长为DB配置步长
} else if (buffer.getUpdateTimestamp() == 0) {
// 号段首次切换
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 根据DB步长更新最大值
buffer.setUpdateTimestamp(System.currentTimeMillis()); // 记录号段切换时间
buffer.setMinStep(leafAlloc.getStep());
} else { // 号段二次切换
// 计算号段总耗时,如果耗时越短,说明消耗的越快,为了避免频繁切换,步长翻倍,反之减半,但步长不能小于DB设置的步长
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep(); // 初始化下一步长
if (duration < SEGMENT_DURATION) { // 如果耗时小于15分钟,
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
// 步长翻倍
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
// 步长减半,但必须大于最小步长
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
// 根据最新步长,更新最大值
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
// 设置号段切换时间
buffer.setUpdateTimestamp(System.currentTimeMillis());
// 设置最新步长
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());
}
// 计算号段起始值
long value = leafAlloc.getMaxId() - buffer.getStep();
// 设置号段起始值、最大值及步长
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
号段取值
循环从当前号段取值,通过读写锁进行并发控制,号段切换条件:
a)下一号段未准备好
b)当前号段剩余可用小于90%
c)当前无切号线程
1)从当前号段取值
try {
buffer.rLock().lock(); // 获取读锁
final Segment segment = buffer.getCurrent(); // 获取当前号段
long value = segment.getValue().getAndIncrement(); // 获取值并递增
if (value < segment.getMax()) { // 如果值小于号段最大值,正常获取
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock(); // 释放读锁
}
2)下一号段准备
Segment next = buffer.getSegments()[buffer.nextPos()]; // 获取下一号段
boolean updateOk = false; // 判断下一号段是否准备好
try {
updateSegmentFromDb(buffer.getKey(), next); // 更新下一号段,参考号段初始化
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
// 如果下一号段更新成功,获取写锁,更新buffer状态
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
3)号段切换
// 自旋并休眠等待下一号段准备
waitAndSleep(buffer);
try {
// 获取写锁,阻塞所有读操作
buffer.wLock().lock();
// 此处可能存在多线程竞争写锁,如果其他线程已切号完成,直接从当前号段取值返回
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
// 判断下一号段是否准备好
if (buffer.isNextReady()) {
// 切换号段
buffer.switchPos();
// 重置下一号段状态
buffer.setNextReady(false);
} else {
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
// 释放写锁
buffer.wLock().unlock();
}
最后
以上就是野性音响为你收集整理的Leaf关键源码解析的全部内容,希望文章能够帮你解决Leaf关键源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复