gRPC负载均衡
范围
本文档解释了gRPC内的负载均衡设计。
背景
每个呼叫负载均衡
值得注意的是,gRPC内部的负载均衡是基于每个呼叫而不是每个连接的基础上发生的。换句话说,即使所有请求来自单个客户端,我们仍然希望它们在所有服务器之间进行负载均衡。
负载均衡的方法
在任何gRPC细节之前,我们探讨一些常用的方法来实现负载均衡。
代理模式
使用代理提供了一个坚实可靠的客户端,可以向负载均衡系统报告负载。代理通常需要更多的资源来操作,因为它们具有RPC请求和响应的临时副本。这个模型也增加了RPC的延迟。
在考虑像存储这样的大请求量服务时,代理模式被认为是低效的。
均衡感知客户端
这个厚重客户端将更多的负载均衡逻辑放在客户端中。例如,客户端可以包含许多用于从列表中选择服务器的负载均衡策略(循环[Round Robin], 随机[Random],等)。在这个模型中,服务器列表将在客户端被静态配置,由名称解析系统(一个外部负载均衡器)提供,等等。无论如何,客户端负责从列表中选择首选服务器。
这种方法的缺点之一是以多种语言和/或客户端版本编写和维护负载均衡策略。这些策略可能相当复杂。某些算法还需要客户端到服务器的通信,因此除了发送用户请求的RPC之外,客户端还需要变得更厚以支持额外的RPC以获取运行状况或负载信息。
外部负载均衡服务
客户端负载均衡代码保持简单和便携,实现用于服务器选择的众所周知的算法(例如循环[Round Robin])。负载均衡器提供复杂的负载均衡算法。客户端依靠负载平衡器来提供负载平衡配置以及客户端应该向其发送请求的服务器列表。均衡器根据需要更新服务器列表以均衡负载以及处理服务器不可用性或健康问题。负载均衡器将做出任何必要的复杂决策并通知客户端。负载均衡器可以与后端服务器通信以收集负载和健康信息。
要求
简单的API和客户端
gRPC客户端负载均衡代码必须简单且便携。客户端应该只包含用于服务器选择的简单算法(例如循环)。对于复杂的算法,客户端应该依靠负载均衡器来提供负载均衡配置以及客户端应该向其发送请求的服务器列表。均衡器根据需要更新服务器列表以均衡负载以及处理服务器不可用性或健康问题。负载均衡器将做出任何必要的复杂决策并通知客户端。负载均衡器可以与后端服务器通信以收集负载和健康信息。
安全
负载均衡器可能与实际的服务器后端分离,而负载均衡器的折衷只会导致负载均衡功能的损害。换句话说,在负载均衡的情况下,受损的负载均衡器不应该能够导致客户端信任(可能是恶意的)后端服务器。
架构
概貌
gRPC中负载均衡的主要机制是外部负载均衡,外部负载均衡器为简单的客户端提供最新的服务器列表。
gRPC客户端确实支持用于内置负载均衡策略的API。但是,其中只有少数(其中一个是实现外部负载平衡的grpclb
策略),并且不鼓励用户尝试通过添加更多的来扩展gRPC。相反,应该在外部负载均衡器中实施新的负载均衡策略。
工作流程
在名称解析和连接到服务器之间,负载均衡策略适合gRPC客户端工作流程。 它是这样工作的:
在启动时,gRPC客户端发出服务器名称的名称解析请求。该名称将解析为一个或多个IP地址,每个IP地址将指示它是服务器地址还是负载均衡器地址,以及指示要使用哪个客户端负载均衡策略(例如,
round_robin
或grpclb
)的服务配置。客户端实例化负载均衡策略。
- 注意:如果解析器返回的任何一个地址都是均衡器地址,则客户端将使用
grpclb
策略,而不管服务配置请求的什么负载平衡策略。否则,客户端将使用服务配置请求的负载均衡策略。如果服务配置没有请求负载均衡策略,则客户端将默认选择一个策略来选择第一个可用的服务器地址。
- 注意:如果解析器返回的任何一个地址都是均衡器地址,则客户端将使用
负载均衡策略为每个服务器地址创建一个子通道。
对于除
grpclb
之外的所有策略,这意味着解析器返回的每个地址都有一个子通道。请注意,这些策略会忽略解析器返回的任何均衡器地址。在
grpclb
策略的情况下,工作流程如下所示:a: 该策略将打开流解析器返回的其中一个均衡器地址的流。它用客户机最初请求的服务器名称向均衡器请求服务器地址(即,最初传递给名称解析器的同一个服务器名称)。
- 注意:在
grpclb
策略中,解析器返回的非均衡器地址用作后备,以防LB策略启动时无法联系到均衡器。
- 注意:在
b: 如果负载均衡器的配置需要该信息,则负载均衡器指示客户端请求的gRPC服务器可以向负载平衡器报告负载。
c: 负载均衡器将服务器列表返回给gRPC客户端的grpclb策略。然后grpclb策略将为列表中的每个服务器创建一个子通道。
对于每个发送的RPC,负载均衡策略决定哪个子通道(即,哪个服务器)RPC应该被发送到。
- 在
grpclb
策略的情况下,客户端将按照负载均衡器返回的顺序向服务器发送请求。如果服务器列表为空,则呼叫将被阻塞,直到收到非空的服务器列表。
- 在
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
type SubConn interface {
// UpdateAddresses更新此SubConn中使用的地址。
// gRPC检查当前连接的地址是否仍在新列表中。
// 如果它在列表中,连接将被保留。
// 如果它不在列表中,则连接将优雅地关闭,并创建一个新的连接。
//
// 这将触发SubConn的状态转换。
UpdateAddresses([]resolver.Address)
// Connect开始连接这个SubConn。
Connect()
}
// 创建子连接附属信息
type NewSubConnOptions struct{}
type ClientConn interface {
// NewSubConn由均衡器调用来创建一个新的SubConn。
// 它不阻塞并等待连接建立。SubConn的行为可以通过选项来控制。
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn从ClientConn中删除SubConn。SubConn将被关闭。
RemoveSubConn(SubConn)
// UpdateBalancerState被均衡器调用,以通知gRPC均衡器中的一些内部状态已经改变。
//
// gRPC将更新ClientConn的连接状态,并且会在新的择取器上选择新的SubConn。
UpdateBalancerState(s connectivity.State, p Picker)
// 目标返回此ClientConn的拨号目标。
Target() string
}
// 连接择取器选择连接时附属信息
type PickOptions struct{}
// DoneInfo包含完成的附属信息。
type DoneInfo struct {
// Err是RPC完成的rpc错误。可能是零。
Err error
}
// 连接择取器
type Picker interface {
// 选择返回用于发送RPC的SubConn。返回的SubConn必须是由NewSubConn()返回的。
//
// 预计这个函数会返回:
// - READY状态的的SubConn;
// - 如果没有SubConn可用,但正在取得进展(例如,一些SubConn处于CONNECTING模式)时,返回ErrNoSubConnAvailable;
// - 如果没有活动连接正在发生(例如,所有的SubConn在TRANSIENT_FAILURE模式), 返回其他错误。
//
// 如果SubConn被返回:
// - 如果是READY,gRPC将发送RPC;
// - 如果没有准备好,或者返回后没有准备好,gRPC将会阻塞,直到UpdateBalancerState()被调用,并且生成新的择取器。
//
// 如果返回的错误不是零:
// - 如果错误是ErrNoSubConnAvailable,则gRPC将阻塞,直到UpdateBalancerState()
// - 如果错误是ErrTransientFailure:
//
- 如果RPC正在等待准备好,则gRPC将会阻塞,直到UpdateBalancerState()被调用来再次选择;
//
- 否则,RPC将失败并出现不可用的错误。
// - 其他(错误是其他非零错误):
//
- RPC将失败并出现不可用的错误。
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
}
// 负载均衡器
type Balancer interface {
// HandleSubConnStateChange由gRPC调用当子连接的连接状态改变时。
// 均衡器应聚合所有子连接的状态并将其报告回gRPC。
// 当新的状态改变了内部状态时,均衡器也应该能够生成和更新子连接择取器。
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs被gRPC调用以将解析后的更新地址发送给均衡器。
// 均衡器可以创建新的子连接或使用地址删除已淘汰的旧子连接。
// 如果解析器向gRPC返回非零错误,在这里则会传递一个空的地址片和一个非零错误。
HandleResolvedAddrs([]resolver.Address, error)
// 关闭均衡器. 均衡器不需要调用ClientConn.RemoveSubConn移除所有存在的子连接。
Close()
}
接下来我们挑选其中round_robin均衡器实现分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package roundrobin
type rrBuilder struct{}
func (*rrBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &rrBalancer{
cc:
cc,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr:
&connectivityStateEvaluator{},
// 将picker初始化为总是返回ErrNoSubConnAvailable的选择器,
// 因为当SubConn的状态发生变化时,我们可以用这个选择器调用UpdateBalancerState。
picker: newPicker([]balancer.SubConn{}, nil),
}
}
func (*rrBuilder) Name() string {
return "round_robin"
}
type rrBalancer struct {
cc balancer.ClientConn // gRPC客户端连接
csEvltr *connectivityStateEvaluator // 均衡器的连接状态评估器
state
connectivity.State // 均衡器的连接状态
subConns map[resolver.Address]balancer.SubConn // 均衡器内所有子连接
scStates map[balancer.SubConn]connectivity.State // 均衡器内所有子连接状态
picker
*picker // 子连接择取器
}
// 根据解析到的后台服务器地址列表和错误,更新均衡器内维护的子连接和状态信息
func (b *rrBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
grpclog.Infof("roundrobin.rrBalancer: HandleResolvedAddrs called with error %v", err)
return
}
grpclog.Infoln("roundrobin.rrBalancer: got new resolved addresses: ", addrs)
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// 当前地址不存在相应的子连接,则构建新的子连接并置为空闲状态,并尝试为新服务器地址创建一个子通道。
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("roundrobin.rrBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range b.subConns {
if _, ok := addrsSet[a]; !ok { // 如果已存在的子连接在新的地址列表中不存在,则表示已被淘汰应该删除
b.cc.RemoveSubConn(sc)
delete(b.subConns, a)
// 此处仍保持了子连接的状态信息,直到状态变为Shutdown才将在HandleSubConnStateChange中被移除
}
}
}
func (b *rrBalancer) regeneratePicker() {
// 如果是ErrTransientFailure状态,则总是返回ErrTransientFailure错误
if b.state == connectivity.TransientFailure {
b.picker = newPicker(nil, balancer.ErrTransientFailure)
return
}
var readySCs []balancer.SubConn
for sc, st := range b.scStates { // 挑选所有ready态的子连接,用于循环择取进行rpc call
if st == connectivity.Ready {
readySCs = append(readySCs, sc)
}
}
b.picker = newPicker(readySCs, nil)
}
func (b *rrBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("roundrobin.rrBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {
grpclog.Infof("roundrobin.rrBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.recordTransition(oldS, s)
// 当以下几种情况发生时,需要重新生成择取器
//
- 子连接的状态从not-ready变为ready
//
- 子连接的状态从ready变为not-ready
//
- 均衡器的聚合状态由于non-TransientFailure而成为TransientFailure
//
- 均衡器的聚合状态由于TransientFailure而成为non-TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker()
}
b.cc.UpdateBalancerState(b.state, b.picker)
return
}
// Close是一个nop,因为roundrobin平衡器没有内部状态来清除,也不需要移除子连接。
func (b *rrBalancer) Close() {
}
type picker struct {
// 如果err不是零,Pick总是返回这个错误。在创建picker之后它是不可变的。
err error
// subConns是创建此选取器时的roundrobin均衡器的快照。切片是不可改变的,每次都会从中循环选择,并返回已选中的子连接。
subConns []balancer.SubConn
mu
sync.Mutex
next int
}
func newPicker(scs []balancer.SubConn, err error) *picker {
grpclog.Infof("roundrobinPicker: newPicker called with scs: %v, %v", scs, err)
if err != nil {
return &picker{err: err}
}
return &picker{
subConns: scs,
}
}
func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if p.err != nil {
return nil, nil, p.err
}
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns) // 循环选择子连接
p.mu.Unlock()
return sc, nil, nil
}
type connectivityStateEvaluator struct {
numReady
uint64 // ready状态的连接数.
numConnecting
uint64 // connecting状态的连接数.
numTransientFailure uint64 // transientFailure状态的连接数.
}
// 均衡器的连接状态只能在 Ready, Connecting和TransientFailure这三个状态间转换。
// 其他状态(Idle和Shutdown)被ClientConn转换,子连接创建是Idle态,关闭是Shutdown态。
//
// recordTransition只能从同一个goroutine同步调用。
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // 旧状态减1, 新状态加1。 例:从旧状态[connecting]->新状态[ready]
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
参考链接: gRPC load-balancing & gRPC-Go1.8.0#balancer.go & gRPC-Go1.8.0#roundrobin.go
最后
以上就是舒心心情最近收集整理的关于gRPC LBgRPC负载均衡的全部内容,更多相关gRPC内容请搜索靠谱客的其他文章。
发表评论 取消回复