我是靠谱客的博主 魁梧心情,最近开发中收集的这篇文章主要介绍容器环境-雪花算法基于Redis的workerId的自动分配,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

雪花算法在docker 容器环境中,每次启动都是新的pod,无法为每个pod设置固定且唯一workerId。使用redis或zk获取随机唯一的workerId是一种好办法。

下面以Redis为例,为Mybatis-Plus设置雪花算法的WorkerId
功能:
1.保证workerId唯一。随机WorkerId,使用Redis保证不重复(使用setNx保证)
2.采用redis过期机制 + 不断心跳续期维持workerId的占用
3.冲突自动恢复。特殊情况下,发生workerId冲突(redis的value不一致说明冲突了),尝试进行自动恢复——即获取新的workerId

import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.security.SecureRandom;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 基于redis自动分配雪花算法的workerId
* @author eric
* @date 2022/9/31
*/
@Component
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SnowFlaskWorkerIdAllocator {
/**
* workerId可以是0-1023
*/
private static final int MAX_WORKER_ID = 1024;
private static final String WORKER_PREFIX = "SnowFlakeWorker-";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* worker名称,同一个redis,不同workerName,workerId可以重复,用于共用Redis的情况
*/
@Value("${idGenerator.workerName:appName}")
private String workerName;
/**
* workerKey设置值的ttl
*/
@Value("${idGenerator.redis.ttl:3600}")
private int ttl = 3600;
/**
* redis续期一次
*/
@Value("${idGenerator.redis.heartBeatIntervalSecond:60}")
private long heartBeatIntervalSecond = 60;
/**
* spring容器关闭destroy调用后多久后,redisKey才真正删除(通过设置ttl实现)
*/
@Value("${idGenerator.redis.workerKeyDelayRemoveSecond:60}")
private long workerKeyDelayRemoveSecond;
private int snowFlaskWorkerId;
private String redisValue;
private ExecutorService executorService;
private boolean shutdown = false;
private final ReentrantLock lock = new ReentrantLock();
@PostConstruct
public void init() {
setNextSnowFlaskWorkerId();
executorService = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10), r -> new Thread(r, "SnowFlakeWorkerId-HeartBeat-Thread"));
executorService.execute(new HeartBeatTask());
}
/**
* 设置雪花算法workerId
*/
private void setNextSnowFlaskWorkerId() {
lock.lock();
try {
snowFlaskWorkerId = getSnowFlaskWorkerId();
log.info("雪花算法workerId={}设置成功", snowFlaskWorkerId);
// 高5bits是数据中心Id,低5big是workerId,
int workerId = snowFlaskWorkerId & 0x1F;
int dateCenterId = snowFlaskWorkerId >> 5;
log.info("设置IdWorker, dataCenterId={}, workerId={}", dateCenterId, workerId);
IdWorker.initSequence(workerId, dateCenterId);
// 获取一个雪花id,验证工作机器id位等于snowFlaskWorkerId
Assert.isTrue(((IdWorker.getId() >> 12) & (MAX_WORKER_ID - 1)) == snowFlaskWorkerId, "IdWorker校验机器id,机器id错误");
} finally {
lock.unlock();
}
}
/**
* 关闭做资源和workerId的释放
* 关闭心跳线程,删除redis key
*/
@PreDestroy
public void destroy() {
lock.lock();
try {
shutdown = true;
redisValue = null;
} finally {
lock.unlock();
}
redisTemplate.expire(getWorkerKey(snowFlaskWorkerId), workerKeyDelayRemoveSecond, TimeUnit.SECONDS);
log.info("关闭雪花算法workerId心跳线程");
executorService.shutdownNow();
}
/**
* 获得可用workerId
* @return 可用workerId
*/
private int getSnowFlaskWorkerId() {
SecureRandom random = new SecureRandom();
int workerId = random.nextInt(MAX_WORKER_ID);
String uuid = UUID.randomUUID().toString().replace("-", "");
boolean success = false;
// 从随机的位置开始遍历尝试workerId是否已被占用
for (int i = 0; i < MAX_WORKER_ID && !success; i++) {
log.info("尝试锁定workerId: {}", workerId);
success = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(getWorkerKey(workerId), uuid, ttl, TimeUnit.SECONDS));
if (!success) {
log.info("workerId: {} 锁定失败", workerId);
workerId = (workerId + 1) % MAX_WORKER_ID;
}
}
// 无法找到workerId,抛出异常
if (!success) {
// 1024个workerId都尝试了
throw new RuntimeException("遍历了1-1024个workerId全被占用,无法获取到WorkerId. ");
}
redisValue = uuid;
return workerId;
}
/**
* 存到redis的key
* @param workerId worker
* @return key
*/
private String getWorkerKey(int workerId) {
return WORKER_PREFIX + workerName + "-" + workerId;
}
/**
* 心跳线程类
*/
class HeartBeatTask implements Runnable {
@Override
public void run() {
lock.lock();
try {
log.info("启动雪花算法workerId心跳线程, key={}, workerId={},每{}s设置ttl={}",
getWorkerKey(snowFlaskWorkerId), snowFlaskWorkerId, heartBeatIntervalSecond, ttl);
} finally {
lock.unlock();
}
while (true) {
lock.lock();
try {
if (shutdown) {
break;
}
// 设置redis ttl,当做是心跳
doHeartBeat();
} catch (Throwable e) {
log.error("雪花算法心跳失败,{}s后重试", heartBeatIntervalSecond, e);
} finally {
lock.unlock();
}
try {
Thread.sleep(heartBeatIntervalSecond * 1000);
} catch (InterruptedException e) {
log.info("线程名称:{}已被中断", Thread.currentThread().getName());
Thread.currentThread().interrupt();
}
}
}
/**
* 进行心跳
*/
private void doHeartBeat() {
String workerKey = getWorkerKey(snowFlaskWorkerId);
String value = redisTemplate.opsForValue().get(workerKey);
if (!redisValue.equals(value)) {
log.error("雪花算法雪花WorkerId失效,workerId={}已失效或发生冲突, 尝试从新获取雪花Id修复问题", snowFlaskWorkerId);
// 尝试重新获取workerId
tryResumeWorkerId();
} else {
redisTemplate.expire(workerKey, ttl, TimeUnit.SECONDS);
log.info("心跳成功:雪花算法workerId={}, ttl={}, redisKey={},redisValue={}", snowFlaskWorkerId,
ttl, workerKey, redisValue);
}
}
/**
* 尝试从错误中恢复,重新获取workerId
*/
private void tryResumeWorkerId() {
try {
setNextSnowFlaskWorkerId();
log.error("雪花算法WorkerId失效问题已修复, 新的雪花workerId={}", snowFlaskWorkerId);
} catch (Exception e) {
log.error("致命Error,雪花算法失败,workerId={} 问题修复失败,请尽快重启部署单元!!!", snowFlaskWorkerId, e);
}
}
}
}

最后

以上就是魁梧心情为你收集整理的容器环境-雪花算法基于Redis的workerId的自动分配的全部内容,希望文章能够帮你解决容器环境-雪花算法基于Redis的workerId的自动分配所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部