概述
1. 集群伸缩
集群伸缩会导致分片数的增加,从而影响数据分布。以前16384个槽对应N个分片,现在对应M个分片。
1.1 扩容集群
扩容节点加入集群
- 启动新的节点redis-server redis.conf
- 加入集群,使用cluster meet(这里如果新节点已经加入了别的集群,那操作需慎重)
- 集群节点通过一段时间的ping/pong通信之后,所有节点会发现新节点并将状态保存在本地nodes.conf
- 其他节点是否发现了新节点,可以用cluster nodes命令check,程序中为了获取这个命令的结果,会调用clusterGenNodesDescription函数,使用内存中的最新信息生成结果返回
- 其他节点本地的nodes.conf是通过clusterSaveConfig函数更新
- 新节点刚开始都是主节点状态,后续对它的处理有两种
- 迁移槽和数据实现扩容
- 变成其他master节点的从
迁移槽和数据
- 明确新节点负责的槽位(迁移需保证每个分片负责的槽数量大致相同)
- 逐个迁移槽位,每个槽位的迁移流程如下
- 向目标节点发送cluster setslot {slot} importing {sourceNodeId}
- 向源节点发送cluster setslot {slot} migrating {targetNodeId}
- 源节点循环执行cluster getkeysinslot {slot} {count} 获取slot上count个key
- 在源节点上执行migrate {targetIp} {targetPort} “” 0 {timeout} keys {keys…}。源节点批量迁移的migrate命令在Redis3.0.6以上版本提供。批量迁移可以减少节点之间的网络I/O次数
- 重复3和4,直到所有的slot就位
- 向集群内所有节点发送cluster setslot {slot} node {targetNodeId},通知大家槽分配给目标节点了(为了确保槽分配被及时传播)
- redis-trib有槽重新分片的功能
扩容节点也需要增加从节点,保证高可靠
1.2 收缩集群
主要流程:
- 处理即将收缩节点所负责的槽(参考上述的迁移槽和数据的过程)
- 下线节点不负责槽位或者为从节点时,才可通知集群内的其他节点下线该节点
- cluster forget {downNodeId},当其他节点收到该命令,会将downNodeId节点加入禁用列表中。在禁用列表中的节点不会发送gossip消息,但禁用列表的有效期是60s,超过60s之后会再次进行消息交换。【在60s的时间,需要让集群内的所有节点忘记下线节点】
- 如果只下线主节点,要安排好从节点的归属
- 如果下线主从节点,建议先下线从节点,再下线主节点
# 60s 有效期
/* -----------------------------------------------------------------------------
* CLUSTER nodes blacklist
*
* The nodes blacklist is just a way to ensure that a given node with a given
* Node ID is not readded before some time elapsed (this time is specified
* in seconds in CLUSTER_BLACKLIST_TTL).
*
* This is useful when we want to remove a node from the cluster completely:
* when CLUSTER FORGET is called, it also puts the node into the blacklist so
* that even if we receive gossip messages from other nodes that still remember
* about the node we want to remove, we don't re-add it before some time.
*
* Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
* that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
* in the cluster without dealing with the problem of other nodes re-adding
* back the node to nodes we already sent the FORGET command to.
*
* The data structure used is a hash table with an sds string representing
* the node ID as key, and the time when it is ok to re-add the node as
* value.
* -------------------------------------------------------------------------- */
#define CLUSTER_BLACKLIST_TTL 60
/* 1 minute. */
2. 请求路由
Redis集群对客户端通信协议做了比较大的修改,官方的客户端连接redis集群会直连
2.1 请求重定向
- 集群模式下,当redis节点接收到key相关的命令,会先计算对应的slot,再根据slot找到节点。如果发现节点就是自己,直接操作;如果发现节点是别人,则给客户端返回{moved 正确节点}重定向错误。客户端收到之后,将连接正确节点进行操作。
- 使用命令cluster keyslot {key} 返回key对应的槽
- redis-cli 命令加上-c会自动支持重定向,redis-cli其实本质也是一个连接集群的客户端,在内部实现了如果key不属于所连接的节点,则向正确的节点再次发起请求
槽的计算
根据键的有效部分使用CRC16计算出散列值,然后%16384
- 源代码见下,很清晰
- 键的有效部分:如果没有{}则是全部key;如果有{}则是{}中间的key【该内容称之为hash_tag】
- 通过hash_tag可以实现不同的key属于相同的slot,常用于RedisIO优化
- 当我们想在集群模式下使用mget执行批量调用,键列表必须属于相同的slot否则会报错,这时可以使用hash_tag
- pipeline只能向一个节点批量发送命令,也需要pipeline中涉及的key属于相同的slot
- 通过hash_tag可以实现不同的key属于相同的slot,常用于RedisIO优化
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
2.2 Redis集群客户端
一般分为两种dummy client和smart client
- dummy client:每次都随机连接一个节点,如果返回moved则重新发送请求
- smart client:在内部维护slot和node的映射关系
Java的Jedis框架
相关的资料:
-
文档:https://javadoc.io/doc/redis.clients/jedis/latest/index.html
-
代码:https://github.com/redis/jedis (以下代码举例为当前master分支,最新的发版版本为4.2.3)
-
jediscluster会选择一个节点,发送cluster slots获取槽与节点的映射关系
-
jediscluster会解析cluster slots的结果缓存在本地,并为每一个节点创建唯一的jedisPool链接池
-
解析cluster slots的结果在JedisClusterInfoCache
- https://javadoc.io/doc/redis.clients/jedis/latest/redis/clients/jedis/JedisClusterInfoCache.html
-
JedisCluster执行键命令的过程
-
public final <T> T executeCommand(CommandObject<T> commandObject) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Connection connection = null;
try {
if (redirect != null) {
// 获取目标节点的连接
connection = provider.getConnection(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
}
return connection.executeCommand(commandObject);
} catch (JedisClusterOperationException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// 如果上次连接失败,则handleConnectionProblem函数中会遍历别的节点discoverClusterSlots然后刷新ta
// 在刷新slot和node映射的过程中,需要加锁
lastException = jce;
++consecutiveConnectionFailures;
log.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
log.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred, moved发生,则刷新slot和node映射的缓存
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
provider.renewSlotCache(connection);
}
} finally {
IOUtils.closeQuietly(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}
}
JedisClusterOperationException maxAttemptsException
= new JedisClusterOperationException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
}
3. jedisCluster的这种方式,当集群规模大,客户端需要维护很多连接并消耗很多内存
4. jedisCluster执行renewSlotCache的过程中,最新的版本相比以前的优化点
1. 使用rediscoverLock保证同一时刻只有一个线程更新slots缓存
public void renewClusterSlots(Connection jedis) {
// If rediscovering is already in process - no need to start one more same rediscovering, just return
if (rediscoverLock.tryLock()) {
try {
// First, if jedis is available, use jedis renew.
if (jedis != null) {
try {
discoverClusterSlots(jedis);
return;
} catch (JedisException e) {
// try nodes from all pools
}
}
// Then, we use startNodes to try, as long as startNodes is available,
// whether it is vip, domain, or physical ip, it will succeed.
if (startNodes != null) {
for (HostAndPort hostAndPort : startNodes) {
try (Connection j = new Connection(hostAndPort, clientConfig)) {
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
}
// Finally, we go back to the ShuffledNodesPool and try the remaining physical nodes.
for (ConnectionPool jp : getShuffledNodesPool()) {
try (Connection j = jp.getResource()) {
// If already tried in startNodes, skip this node.
if (startNodes != null && startNodes.contains(j.getHostAndPort())) {
continue;
}
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
} finally {
rediscoverLock.unlock();
}
}
}
JedisCluster客户端定义
-
JedisCluster的初始化方法有很多,举例其中之一
- soTimeout:读写超时
- poolConfig:JedisCluter会为RedisCluster的每个节点创建连接池
- 注意:JedisCluster一般不要执行close()操作,该操作会对所有的JedisPool执行destroy操作
public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout, int maxAttempts,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, poolConfig);
}
- 一些特殊的命令需要到多个节点上去执行,比如keys、flushall、删除指定模式的键等
- 代码实现中,需要获取所有节点的JedisPool,然后逐个在节点上实现
- 事务和Lua脚本,可以使用hash_tag标记key到同一个slot,然后获取指定slot的连接池执行
JedisCluster遇到ASK重定向
客户端ASK重定向流程
- ASK重定向发生在redis集群数据迁移的过程中。当一个slot数据从A节点迁移到B节点,可能有一部分key存在于A节点,另外一部分key存在于B节点
- 客户端会根据本地slots缓存发送命令到源节点。如果key还在源节点A则直接返回,如果key在目标节点B,源节点A会返回ASK重定向异常
- 客户端再根据ASK重定向异常提取目标节点的信息,发送asking命令到目标节点打开客户端连接标识,再执行键命令
JedisClient收到moved重定向会更新slots缓存,但收到ask重定向并不会更新slots缓存,因为ask重定向是一个临时迁移中状态
redis节点内部如何处理
- 当节点收到键命令,会根据clusterState内的迁移属性对命令进行处理。
- 需注意asking是一次性命令,每次执行完客户端标识都会修改回原状态,可以参考以下代码
/* resetClient prepare the client to process the next command */
void resetClient(client *c) {
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
freeClientArgv(c);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
c->flags &= ~CLIENT_ASKING;
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
* to the next command will be sent, but set the flag if the command
* we just processed was "CLIENT REPLY SKIP". */
c->flags &= ~CLIENT_REPLY_SKIP;
if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
c->flags |= CLIENT_REPLY_SKIP;
c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
}
}
- 在redis集群中,当槽处于迁移状态的时候,对于批量操作的case怎么办?(比如pipeline或者mget)
- jediscluster执行mget(如果key不存在同一个节点,则会报错)
- jediscluster执行pipeline:会逐个执行,并将结果返回到结果集。如果结果集里面有JedisAskDataException则客户端可以重写逻辑使得重新发送请求。为什么可以这么做呢
- 因为pipeline的结果是严格按照顺序返回的,即使有异常也是如此
- 通过ASK重定向的原理,我们可以手动发起ASK相关流程保证pipeline的正确执行
最后
以上就是还单身百合为你收集整理的redis集群系列二的全部内容,希望文章能够帮你解决redis集群系列二所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复