我是靠谱客的博主 勤劳星星,最近开发中收集的这篇文章主要介绍redis 源码系列(11):command 处理流程浅析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

经过上一篇文章的分析,我们已经知道 redis 是如何处理 client 的请求,解析处一个完整的 command,进一步处理这个 command,并且向 client 发送响应。今天来学习一下 redis 的 command 相关代码。

redisCommand

struct redisCommand {
// 命令名字
char *name;
// command 对应的处理函数
redisCommandProc *proc;
// 参数个数,如果是负数,代表参数个数需要大于 -arity
int arity;
// 字符串表示的 FLAG
char *sflags;
// 实际 FLAG,有如下几个:
/*
w: 写入
r: 读
m: 可能会分配内存,当服务器已经处于 oom 时禁止调用
a: 管理员命令,比如 SAVE
p: PUB/SUB 相关命令
f: 与 aof 相关,后续再说
s: 不允许在脚本中使用的命令
R: 随机命令,相同输入的结果可能不同
还有几个本文不涉及,略过
*/
int flags;
// 从命令中判断命令的键参数。在 Redis 集群转向时使用。
redisGetKeysProc *getkeys_proc;
// 指定哪些参数是 key
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey;
/* The last argument that's a key */
int keystep;
/* The step between first and last key */
// 统计字段,耗时、调用次数
long long microseconds, calls;
};

在 src/redis.c 文件中,有一个 commandTable,保存了所有 redis 支持的命令:

struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
// 略
};

因为数组的查询效率较低,所以 redis 在服务启动前,将所有的 command 转为一个 dict 来存储,方便后续的查询(这个也符合 table driven 编程的方法):

// 根据 commandTable 构造 server.commands 查询 dict
void populateCommandTable(void) {
int j;
// 命令的数量
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
// 指定命令
struct redisCommand *c = redisCommandTable+j;
// 取出字符串 FLAG
char *f = c->sflags;
int retval1, retval2;
// 解析 sflags,计算 flags
while(*f != '') {
switch(*f) {
case 'w': c->flags |= REDIS_CMD_WRITE; break;
case 'r': c->flags |= REDIS_CMD_READONLY; break;
case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
case 'a': c->flags |= REDIS_CMD_ADMIN; break;
case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break;
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= REDIS_CMD_LOADING; break;
case 't': c->flags |= REDIS_CMD_STALE; break;
case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
case 'k': c->flags |= REDIS_CMD_ASKING; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
}
// 在 server.commands 中添加命令
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
// redis 配置文件中,可以使用 rename-command 指令修改 command 对应的名字
// 这里将原始的 command name 存在另一个不会被修改的 orig_commands 中
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
redisAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
struct redisCommand *lookupCommandByCString(char *s) {
struct redisCommand *cmd;
sds name = sdsnew(s);
cmd = dictFetchValue(server.commands, name);
sdsfree(name);
return cmd;
}
struct redisCommand *lookupCommandOrOriginal(sds name) {
// 先在 commands 中查询
struct redisCommand *cmd = dictFetchValue(server.commands, name);
// 如果有需要的话,查找原始表
if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
return cmd;
}

书接上回,我们看看当客户端发送了一条完整命令以后,redis 如何处理:

// 如果当前客户端发送的是一个事务命令,设置 REDIS_DITRTY_EXEC flag
// 让之后的 EXEC command 失败,每次命令出错的时候都应该调用
void flagTransaction(redisClient *c) {
if (c->flags & REDIS_MULTI)
c->flags |= REDIS_DIRTY_EXEC;
}
int processCommand(redisClient *c) {
// 特别处理 quit 命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= REDIS_CLOSE_AFTER_REPLY;
return REDIS_ERR;
}
// 查找命令,并进行命令合法性检查,以及命令参数个数检查
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 没找到指定的命令
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 参数个数错误
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
// 鉴权,如果配置中需要密码,那么 client 执行的第一个命令必须是 auth
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return REDIS_OK;
}
// 集群模式相关,先略过后面讲到集群我们再回头看
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
// 集群已下线
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more informationrn"));
return REDIS_OK;
// 集群运作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slotrn"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slotrn"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%drn",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
return REDIS_OK;
}
// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}
// oom 状态下拒绝执行 REDIS_CMD_DENYOOM command
if (server.maxmemory) {
// 如果内存已超过限制,那么尝试通过删除过期键来释放内存
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
}
// 如果是 master 节点,而且 BGSAVE 或者 AOF 写入出现问题,拒绝写入
// 防止造成数据损坏
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR) ||
server.aof_last_write_status == REDIS_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & REDIS_CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == REDIS_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %srn",
strerror(server.aof_last_write_errno)));
return REDIS_OK;
}
// 当配置了关于主从同步的字段(min_slaves_to_write 和 min_salves_max_lag) 
// 在线 salves 数目不足时,拒绝写入请求
if (server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & REDIS_CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return REDIS_OK;
}
// 如果这个服务器是一个只读 slave 的话,那么拒绝执行写命令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
c->cmd->flags & REDIS_CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return REDIS_OK;
}
// 在订阅于发布模式的上下文中,只能执行订阅和退订相关的命令
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK;
}
// server.repl_server_stale_data 代表的是如果 salve 节点与 master 节点
// 的连接断开情况下,还能不能处理请求,如果不允许处理,那么返回错误
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
// 如果服务正在从硬盘载入数据,只处理 REDIS_CMD_LOADING command
if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
// Lua 脚本超时,只允许执行限定的操作,比如 SHUTDOWN 和 SCRIPT KILL
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事务上下文中,除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令
// 是在接受到以后立马执行,其他所有命令都会被入队到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 执行命令
call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
// 处理那些解除了阻塞的键
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
// 执行命令,注意这个 flags 和 command 的 flags 不一样
// 函数中屡次提到的传播是在 AOF 和 redis 高可用(主从)中用到的概念,暂时可以忽略
void call(redisClient *c, int flags) {
long long dirty, start, duration;
// 记录命令开始执行前的 FLAG
int client_old_flags = c->flags;
// 如果可以的话,将命令发送到 MONITOR
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
// 清除 REDIS_FORCE_AOF 和 REDIS_FORCE_REPL 位
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
// 保留旧 dirty 计数器值,与 AOF 相关
dirty = server.dirty;
// 计算命令开始执行的时间
start = ustime();
// 执行实现函数
c->cmd->proc(c);
// 计算命令执行耗费的时间
duration = ustime()-start;
// 计算命令执行之后的 dirty 值
dirty = server.dirty-dirty;
// 不将从 Lua 中发出的命令放入 SLOWLOG ,也不进行统计
if (server.loading && c->flags & REDIS_LUA_CLIENT)
flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
// 如果调用者是 Lua ,那么根据命令 FLAG 和客户端 FLAG
// 打开传播(propagate)标志
if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {
if (c->flags & REDIS_FORCE_REPL)
server.lua_caller->flags |= REDIS_FORCE_REPL;
if (c->flags & REDIS_FORCE_AOF)
server.lua_caller->flags |= REDIS_FORCE_AOF;
}
// 如果有需要,将命令放到 SLOWLOG 里面
if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
// 更新命令的统计信息
if (flags & REDIS_CALL_STATS) {
c->cmd->microseconds += duration;
c->cmd->calls++;
}
// 将命令复制到 AOF 和 slave 节点
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
// 强制 REPL 传播
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
// 强制 AOF 传播
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
// 如果数据库有被修改,那么启用 REPL 和 AOF 传播
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
// 将客户端的 FLAG 恢复到命令执行之前
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
// 传播额外的命令
if (server.also_propagate.numops) {
int j;
redisOp *rop;
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
}
redisOpArrayFree(&server.also_propagate);
}
server.stat_numcommands++;
}

redis 命令中,可以粗略分为两种命令:

  1. 事务命令
  2. 非事务命令

对于事务命令,客户端会发送若干 command 过来,在最终执行前,redis 需要将一个事物内的 commands 按照顺序入队,最后在客户端调用 EXEC 以后,统一执行(有一些类似 discard、watch 的 command 也会立即执行)。非事务命令则很简单了,收到请求,校验通过以后执行就可以了。

总结

  1. redis 使用一个 dict 保存所有的 command,这符合 table driven 的编程范式
  2. 事务命令由多个命令共同构成一个事务,在组成完整事务以后在依次执行事务命令队列里面的命令

最后

以上就是勤劳星星为你收集整理的redis 源码系列(11):command 处理流程浅析的全部内容,希望文章能够帮你解决redis 源码系列(11):command 处理流程浅析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部