我是靠谱客的博主 欢呼冬天,最近开发中收集的这篇文章主要介绍cockroachdb 替换mysql_CockroachDB 源码闲逛 - II (insert a row),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前面 3 篇文章更多看事务处理,而事务启停由于语句驱动,本文将看下 CRDB 的启动到收到处理一条简单 SQL 的过程。

启动和连接处理

CRDB 使用 go 里比较流行的 cobra 来让一个 cockroach binary 具有各种功能 cli,在使用 start 命令启动服务端后代码从这里开始运行。

在启动初期就准备好各种日志和 pprof,这里的 pprof 有点意思是支持通过开关控制定期 dump cpu 和 memory(go/jemalloc) 的 pprof, 并且对于过老的 pprof 会 gc 删除下,这个功能解决排查问题解决 oom 了但事故现场 heap 或 profile 找不到有重新复现的问题。

之后就是 Server.start() 中 CRDB 使用一个 port 同时处理 pg / http / grpc 三种协议,这个方法中会有很多处理 gossip 节点发现和 boostrap 的逻辑很复杂, 不过这里这里还是主要关注 SQL 处理也就是 pg 的处理所,客户端以 pg 协议连接上来后就会进入 pgwrire.Server#ServeConn,经过校验 version 等后进入 conn.serveImpl 这个方法是 CRDB 对请求处理的主要逻辑:调用 processCommandsAsync spawn 出处理 cmd 的 "process goroutine"

serveImpl goroutine 之后化身为 "read goroutine", 首先通过 authenticator 来和 "process goroutine" 一起配合完成 auth 过程

之后 "read goroutine" 作为生产者进入不断读取连接数据解析并 push 到 conn.stmtBuf 中并通知消费者 "process goroutine" 来处理, 处理 loop 位于 connExecutor.run

"process goroutine" 则不断处理 stmtBuf 中的 Command

这里每个客户端连接都会通过两个 goroutine 来分别处理读取解析协议和处理命令, 这样在 process 进行中还能接受客户端连接事件(比如在运行一个超级大 SQL 过程中客户端 ctrl-c 两个 goroutine 的方式可以在 sql 运行的同时接收客户端 FIN);另外 stmtBuf 可以用于起到 buf 语句的作用, 正常都是顺序一条执行但后面会看到在事务处理中可以用过这个 stmtBuf 来做 txn 重试或直接跳过 stmt 的操作, 所以 stmtBuf 的实现是一个 cmd 数组 + pos + sync.Cond 实现的可以生产者通知消费者, 消费者可以选择回滚,跳过或依次处理 stmt。

CRDB 中任意节点都可以能接收 SQL 请求, SQL 到达的节点对于当前 SQL 的处理算作 coordinator-side, 之后执行会到实际数据放置的节点进行 replica-side 的处理。

读取并处理语句状态

在处理连接的两个 goroutine 准备好后,客户的发送的语句到达后就开始单个语句在 coordinator-side 的处理。

1) read goroutine(conn#serveImpl)

在收到网络包后,会分发到多个 handle* 方法, 和 mysql 协议类似各个方法分别对不同 pg cmd 进行解析处理:SimpleQuery: 正常的文本执行

Parse/Bind/Exec: prepare 执行

Portal: pg 中的 portal

这里我们这里关注最简单的 simple query 文本执行, handleSimpleQuery 方法逻辑比较简单:使用 parser 将文本 sql 变成多个 stmts 抽象语法树 (因为可能是 ";" 分割的 multi-statements)

之后将每个 stmt 封装成 ExecStmt 然后 push 到 stmtBuf 中

然后 read goroutine 退回到外层循环(serveImp) 继续等待下一个客户端网络事件(e.g. FIN)

为了区分多个 batch, 在 push 完一组 cmd 到 stmtBuf 后会放一个哨兵 Sync 来标示当前批次结束同时标示后面的 cmd 属于下一个 batch。

2) process goroutine (connExecutor#execCmd)

在从 stmtBuff 中获取 Command 后同样是根据不同的 cmd 类型进行分发到不同的 exec*, 比如上面的 simple query 产生的 ExecStmt 会进入到 execStmt, 在进入 execStmt 前会先创建 stmtRes 来封装后续返回客户端响应的 buffer flush 逻辑。

CRDB 的每个客户连接上(准确说是 connExecutor 上)维护了一个 StateMachine, exec* 都会将当前 stmt 根据当前的状态进行执行, 并返回执行后的 fsm.Event 和 Playload, 调用 exec* 的 process goroutine(execStmt) 会根据返回的信息进行 fsm.Event 来决定,响应返回客户端或挪动 stmtBuff, 这个状态机主要和当前连接所属的事务状态相关:

3) Statement machine

CRDB 为状态机封装了一个共有的 pkg, 主要用于维护这里的事务状态和 cdc nemeses 测试中管理状态用, 通过 fsm 工具可以生成上面这这样的状态机图, 不过可能还是看代码 DSL 更直观,定义事务的状态机位于这里, 主要状态是:stateNoTxn: 当前执行命令的连接没在任何事务中当收到 eventTxnStart event 则 transition 到开始事务的状态 stateOpen, 并根据 event 的属性在 stateOpen 中区分标示隐式事务 ImplicitTxn

收到eventNonRetriableErr 但还没开启 txn 则继续保持 noTxn 状态并 skipBatch

stateOpen: 当前执行命令已在事务中如果是 Implicit Txn 相对简单:对 RetriableErr event 且 CanAutoRetry = true 则继续留在 Open(Implicit) 重试

其他的 NoRetriableErr 或 ReiableErr 但 canAutoRetry 为 false 的 event 则 cleanupAndFinish 并返回 noTxn

FinishTxn event 则 finishSQLTxn 且 advanceOne 下个 stmt 并转到 noTxn 状态

如果不是 Implict Txn 即(非 autocommit) 则:对于 RetriableErr event 且 CanAutoRetry = true 和 implict 一样留在 open 重试

NoRetriableErr 或 RetriableErr 但 canAutoRetry 为 false, 并且 isCommit = true 则 NoTxn

但如果 NoRetriableErr 或 ReiableErr 但 canAutoRetry 为 false,isCommit = false 则会流转到 stateAbort 状态

如事务中收到 eventRetryIntentSet 也就是使用了 SAVEPOINT 则会流转到 RetryIntent = true 的特殊 Open, 因为要支持 RELASE SAVEPOINT 和 ROLLBACK TO SAVEPOINT 所以引入了额外的 stateCommitWait 和 stateRestartWait 状态,另外也有特殊的 RetryIntent = true 的 stateAbort

stateAbort: 当前事务已经被 abort 需要等待 rollback收到 rollback event 则 finishSQLTxn 且回到 noTxn

这里不继续说 savepoint 相关的逻辑,有兴趣的朋友可以通过代码定义和图了解系列,其实 noTxn / open / abort + implict 还算简单但引入 savepoint 后状态变得复杂, 所以 CRDB 这种通过 fsm 来管理事务状态可以让代码更清晰,让复杂逻辑更加明显也是简化复杂逻辑的有利手段,后面会看到 CRDB 会利用状态(比如: 是否是 implict txn 或当前是否在 RetryIntent 来进行显示的不同处理和优化处理)。

CRDB 在这部分的维护了一个 stmtBuf 和连接状态机,主要为方便管理隐式事务,savepoint 和事务重试逻辑。

一条 insert 语句的流程

下面我们以为插入一行数据的简单语句insert into t (a) values (1);

为例看下在 CRDB 里怎么处理一条简单的 SQL 。建立链接并接收语句 [coordinator-side]

客户端和 CRDB 建立连接后会启动上面提到的两个 goroutine,在 insert 语句发送到 CRDB 后连接的 read goroutine 会读取解析语句并放置到 stmtBuf。

2. 通过状态机开启隐式事务 [coordinator-side]

process goroutine 从 stmtBuf 拿出 cmd 会看到是一条 ExecStmt 所以会进入 execStmt, 然后因为我们执行这条语句之前并没有 begin, 所以当前连接的 fsm 状态是 stateNoTxn 所以会先进入到 execStmtInNoTxnState, 在没有事务中我们执行的语句并非 begin 或 commit 所以进入 default 分支, 返回一个implict 的 eventTxnStart 的 event 和 eventTxnStartPayload 的 payload, connExecutor 拿到这个 event 和 payload 结果后会去前面提到的状态机定义中 apply 所以根据定义会执行 noTxnToOpen, 因为是 implict txn 所以会让 stmtBuf 停在原地等启动事务后继续执行当前 insert(反之如果是 begin 就必须 advance one 执行下条语句), 然后进入 resetForNewSQLTxn 开始启动事务(implict 也是事务), 这里会 client.NewTxn, 主要完成时间戳获取并准备 sender / coordinator 等工作, 之后会设置 advanceInfo为 advanceOne, noRewind(不用回移 stmtBuf 一般只有重试需要 rewind) 和 txnState 状态为 txnStart, 因为是 txnStart 和 stayInPlace 所以不会做什么, execCmd 会再次从 stmtBuf 中继续取出 insert 再次执行。

这次当前事务状态已经是 open 且 implict, 所以会进入 execStmtInOpenState, 在这个方法里,因为当前 sql 不是 begin/commit/releaseSavePoint 等, 所以挂完 handleAutoCommit 的 defer, 处理完 as of time 的逻辑后, 进入到 dispatchToExecutionEngine :

3. 生成逻辑计划 [coordinator-side]

在 makeExecPlan 中创建出 logic plan

tree | field | description | columns | ordering

+------------------+---------------+------------------+--------------------+----------+

count | | | () |

└── insert | | | () |

│ | into | t(a, rowid) | |

│ | strategy | inserter | |

└── values | | | (column1, column4) |

| size | 2 columns, 1 row | |

| row 0, expr 0 | 1 | |

| row 0, expr 1 | unique_rowid() | |

4. 判断能否 dist 执行 [coordinator-side]

会通过 logic plan 检查看能否 dist 执行,我们这个例子中是 rowCountNode, 目前 insert CRDB 不支持 dist plan(即对于 insert 还不会将 sub-plan 发送到 replica-side 去执行,整个 plan 会在当前节点执行,需要读取或修改其他节点数据会在 plan 执行过程中通过 key 维度的 batchCmd 去和其他节点交互)

5. 准备 Plan 上下文 [coordinator-side]

调用 execWithDistSQLEngine, 首先会创建一个 DistSQLReciever 通过 reciever 抽象了下结果处理逻辑,因为这个 insert 不可以 dist 执行,所以这里会 new 一个 local 的简化版 planCtx(这个和非简化的区别就是在做 physical plan 的时候不需要去访问 meta range 或找节点信息),如果是可以是 dist 执行这一步会准备要访问 range 和节点信息供生产物理计划使用。

6. 生成物理计划 [coordinator-side]

创建 physical plan, 因为我们这个例子中是 rowCount 算子,会直接进入 wrapPlan 生成一个在当前节点执行的 processor(PlanNodeToRowSource) 的物理计划, 因为不是 dist 所以 FinalizePlan 也会将 out stream 设置为 sync_response

5. 生成 flow 并让相关节点建立 flow [coordinator-side]

首先对在同一个 node 的 processors 准备多个 flow, 因为这个例子中的 plan 不可以被 dist 执行所以用于处理向其他 node 建立 flow 的逻辑可以跳过(dist plan 后面会专门看一看)

7. 启动执行当前节点的 flow [coordinator-side]

所以直接进入到 local 执行 的 setup, 并执行 run, flow 的 run 会运行刚才准备的 processor, 首先会 start 也就是 start planNodeToRowSource 实现逻辑是通过 visitor 让 plan node 被 start,所以首先 rowCountNode 会被 start, 进而触发 insertNode 的 BatchNext,BatchNext 就是大家熟悉的 volcano 执行模式不过一次过一个 batch 的多个 row。

8. 执行 insert 算子 [coordinator-side]

insert 的 BatchNext 中就是不停的对 sourceNode(valuesNode) 进 Next 并获取 Values 进行 processSourceRow 到 batch 中。

这里首先有个分批处理,会根据 maxInsertBatchSize(默认 10000)进行分 batch, 对于不是最后一批的 batch 在 flushAndStartNewBatch 中会执行 txn.Run 进而发送到存储节点, 所以如果 sql 是 insert 几万个 values 或 insert into select 个全表的情况, CRDB 会以 10000 行做 batch 将数据发送到存储节点(大家可以想到对于每一行其实会展开为对表对索引和对外键的多个 PutCmd), 需要注意的是发送 batch 到存储节点只会存为 intent,最后需要等 commit 才能实际完成,需要注意的是分批只是在“控制内存”,“控制网络交互”和“提前推送处理”三者的 trade off,并不会分多个 cmd 发送存储节点并不会破坏原子性; 对于非最后一批则会调用 finalize, 在里面如果不是 autocommit 会调用 txn.Run 将这条更新的的数据先发送到存储层然后继续执行其他语句, 如果是 autocommit 则会调用 txn.CommitInBatch 实际执行数据提交。(这里还有种情况是 1pc 对于这个例子刚好是 1pc 在上一篇事务文章中我们有介绍, 前提是 StartTxnCmd 和 EndTxnCmd 在一个 batch 里,结合这里的话可以理解为如果超过 10000 行那种情况一定只会用 2PC 因为一个 Batch 不会有完整 Begin 和 End,如果没有 1000 行且是 auto-commit 则会将 cmd 一条 insert 的 cmd 打包在一个 batch 中如果 batch 中的可以在通过 range meta 定位后都在一个 replica 则可以用 1PC)。

回到 processSourceRow 看下具体插入行数据的处理, 会对子节点 valuesNode 的每一行 datums 补充要插入 table 缺少的 default 列和计算表达式列值,之后调用 checkHelper 检查表约束,这里有有两种 eval 和 input 模式,前者是老逻辑,后者比较有意思,将检查在 insert 之前进行并检查结果作为额外列作为 insert 算子的输入,这里到这里直接检查结果 bool 即可,在 insert 算子之前的好处可以充分利用其他算子的优化(比如 insert 的 select 阶段的向量化主表达式执行),目前在 CBO 模式会使用 input 模式,可以看下这个 commit.

如果检查都 ok 则进入 tableInsert.row 添加到 kv batch 中,在增加 batchSize 计数后, Inserter.InsertRow 中进行后续处理, 对于普通 insert 都是遇到重复 k 值就报错,所以会选择 insertCPutFn(ConditionPut), 之后会对每个 val 进行 marshal, 如果有外键会先在 fkExistenceCheckForInsert 上添加对应外检的 fkCheck 到单独 batch 并 runCheck send 并检查 resp 来发现外键冲突; 之后编码主键和二级索引, 并在 prepareInsertOrUpdateBatch 中对表数据根据 column families 定义生成 kv cput 请求到 batch(注意这个方法也会被 update 用到,所以也有生成 delete 请求到 batch), 最后就是将二级索引也用 insertInvertedPutFn 生成 cput 堆到 batch 中, 因为是 insert 所以二级索引只需要插入就好了,如果是 update 则可以想象这里需要先 delete 再 put,另外这块类似相关处理的还有 upsert 命令这里暂时先不展开不过可以看看代码~ 最后虽然会用 table 定义的顺序将 rows 暴露给 insert 的 consumer

在添加 batch 的时候也会调用 initResult 准备每个 CPut 的 result,如果某个向 batch 添加命令发生了失败时,比如序列化失败,就会在 initResult 的时候在 result 的 err 中保存序列化失败信息。

9. 发送 BatchCommand [coordinator-side]

接下来就是将 insert 准备好的 batch 发送到 replica-side 的处理,对于当前这个一条语句的 insert 来说, 如上所述会在 finalize 中 commitTxn,进而添加一个 EndTransactionRequest 到最后一个 batch 中, 之后开始 txn.Run 这时候 batch 里只有一个 condition put 和一个 end transaction 两个 requests, 首先会在检查下 batch 中 result 是否有带 err 的情况有则失败(其实就是 put 的序列化有没有问题), 之后通过 txnCoordinateSender 后进入 DistSender.Send, 这里会各种尝试根据 range cache 将 batch 切分为多个 request, 不过我们这个 case 里这个 cput(key) 和 endTxn(transaction record author key) 都在一个 range 会一起通过 divideAndSendBatchToRanges 尝试 sendPartialBatch 发送。

最后 DistSender.sendRPC 通过 grpc transport 进行发送, 前面我们介绍过 CRDB 抽象了 transport 他抽象了和其他节点的交互同时屏蔽维护本地和远端节点的细节, 最后会调用 InternalClient.Batch 因为我们当节点启动所以一定调用的本地也就是 internalClientAdapter,请求会直接通过方法调用的方式调用服务提供方(GRPC 好像有 issue 希望优化本地调用但但一直没 address, CRDB 通过 transport 抽象自己 shortcut 了)

10. Replica 接收 BatchCommand [replica-side]

请求就到达了逻辑上的"另一个处理节点"方法 Node.batchInternal, 请求 batch 到达节点的 storage 层,找到当前节点上对应 range 的 replica 对象, 并处理可能的 pushReq 等待逻辑后, 将请求交给 replica 处理。

因为我们这个例子中是写入操作,所以会 useRaft 进入 Replica.executeWriteBatch,在这个方法里会:处理因有 merge 而需要进行 backpressure

收集 batch 触及到的 span(会以 global/local x readOnly/readWrite 分类) 并尝试或 latch(获取后挂上释放的 defer), 不过如前面在《事务优化处理流程》的 Pipeline 优化有提到的这个 latch 会在实际执行中会被 mov 并让 defer 变为 noop, latch 会一直持有等待 raft replicate 完成后才会释放 latch

因为是写请求,进行 leaseholder 检查, 进行 lease 检查会对重复请求做合并减少检查 lease 冲击, 如果检查通过则继续处理,不通过则返回 NotLeaseHolder 来让请求方更新 leaseholderCache 再重试中发到其他 store, 另外这个过程中会处理刚好 transferIn 等中间情况.

查询 TimestampCache 如果有更新的读则将读写事务 timestamp 向前推到和最新的读时间戳之后(注意这一步也需在 latch 获取之后),对于这里有 TimestampCache 的 forward 的情况之后, replica-side 会将请求打回 coordinator-side 做 refreshing。

特别说下 Latch, 在发起 Raft 提议对数据进行操作时,我们希望对有数据交叠的 batch 的串行处理 ,但对非交叠的 batch 尽可能的并行处理,所以 latch 通过维护一个 key range(or key) 到 latch,来和自己操作交叠的的 batch 进行排队等待处理,CRDB 的 latch 实现有些意思,是通过 btree 来维护 key range(or key) 到 []latch 的映射,在使用时首先快速加锁,获取 btree 的 snapshot,然后将自己的 latch 插入到 btree 中(改造后的 btree 使用支持类似 Clojure transients 或 Haskell ST monad 的方式来高速获取 snapshot 和 copy on write),然后释放 lock,再对 snapshot 去做 iter 和 等待 latch,从而要申请的 latch 数和等锁时间不会影响并发。另外 CRDB 的 latch 不仅仅处理写,也会对 batch 中的 read-only span 进行进行排队,不过 readOnly 只需等待小于或等于自己时间戳冲突的 readWrite 操作, 而 readWrite 需要等待所有冲突的 readWrite 和等于大于自己时间戳的 readOnly 操作,另外这里还有个优化是 readOnly 只会在需要的时候(有write)才 lazy 的插入 read btree 中。

11. 执行命令 [replica-side]

BatchCommand 中是 key 维度的各种 command(CPut, EndTxn), Replica.evalAndPropose 则负责进行 eval 这些 command 并对 key 进行修改。

具体是通过 evaluateWriteBatch 将 batch 中的命令各自进行 eval,这个方法中有之前文章介绍过的1PC 优化逻辑 即去掉 Batch 中的 EndTxn 并不写 intent 直接写 mvcc key,本例子中的 sql 刚好可以用这个,如果不能用 1PC 的情况其实 replica-side 的处理本质没有太多差别(除了要写 intent 而不是 mvcc 开关在这句),2PC 的处理更多是 coordinator 在发送 endTxn 时需要注意处理,如果是 parallel commit 可以一起发,如果不能 parallel commit 需要先等其他 cmd 发完才能发 endTxn 来完成 Phase2, 这块还是见之前的《CockroachDB Transaction 小记(II) - 优化流程》

对于每个命令会获取对应 cmd 的处理函数来进行处理, 比如 CPut 会调用 batcheval.ConditionPut 处理,对于写入操作 eval 其实就是将 batch 中的所有 cmd 应用生成一个 engine.Batch 对应于 rocksdb 的 WriteBatch(或他们自己 pebble 的 WriteBatch)。

最后将 engine.Batch 生产 RaftCommand 并包装出 ProposalData, 并将通过 raft 将修改的 有 engine.Batch 的 proposalData 进行提议, 在被 raft 同步到多数节点成功后会将 engine.Batch 应用到存储的 rocksdb 或 pebbel,这里就不多介绍 raft 相关的东西了~

12. 结果返回

在执行成功或失败后会按照原路返回 coordinator 并相应客户端。

小结

本文走读了 CRDB 启动到接受 SQL 连接,收到 SQL 语句并维护会话事务状态,并通过一个非常简单的本地 insert 语句了解了下一个写入 SQL的非意常处理流程,跟随一个 SQL 处理能方便后续阅读代码,后面计划看下重试处理逻辑有些依赖本文介绍的 stmtBuff 和 状态机

最后

以上就是欢呼冬天为你收集整理的cockroachdb 替换mysql_CockroachDB 源码闲逛 - II (insert a row)的全部内容,希望文章能够帮你解决cockroachdb 替换mysql_CockroachDB 源码闲逛 - II (insert a row)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部