概述
1、为了防止一下子起很多goroutine,使用一个带缓冲channel的信号量限制并发起的goroutine起的数量,tryAcquire非常新颖的用法
package semaphore
import ()
type Semaphore struct {
bufSize int
channel chan int8
}
func NewSemaphore(concurrencyNum int) *Semaphore {
return &Semaphore{channel: make(chan int8, concurrencyNum), bufSize: concurrencyNum}
}
func (this *Semaphore) TryAcquire() bool {
select {
case this.channel <- int8(0):
return true
default:
return false
}
}
func (this *Semaphore) Acquire() {
this.channel <- int8(0)
}
func (this *Semaphore) Release() {
<-this.channel
}
func (this *Semaphore) AvailablePermits() int {
return this.bufSize - len(this.channel)
}
transfer 将数据rpc到judge:
// Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge
func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) {
batch := g.Config().Judge.Batch // 一次发送,最多batch条数据
addr := g.Config().Judge.Cluster[node]
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
judgeItems := make([]*cmodel.JudgeItem, count)
for i := 0; i < count; i++ {
judgeItems[i] = items[i].(*cmodel.JudgeItem)
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(addr string, judgeItems []*cmodel.JudgeItem, count int) {
defer sema.Release()
resp := &cmodel.SimpleRpcResponse{}
var err error
sendOk := false
for i := 0; i < 3; i++ { //最多重试3次
err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp)
if err == nil {
sendOk = true
break
}
time.Sleep(time.Millisecond * 10)
}
// statistics
if !sendOk {
log.Printf("send judge %s:%s fail: %v", node, addr, err)
proc.SendToJudgeFailCnt.IncrBy(int64(count))
} else {
proc.SendToJudgeCnt.IncrBy(int64(count))
}
}(addr, judgeItems, count)
}
}
rpc调用模块:
可以看处理超时部分设计的非常好,注意 done是一个带1个缓冲的channel
// 同步发送, 完成发送或超时后 才能返回
func (this *SafeRpcConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := this.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
}
conn, err := connPool.Fetch()
if err != nil {
return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc())
}
rpcClient := conn.(*rpcpool.RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond
done := make(chan error, 1)
go func() {
done <- rpcClient.Call(method, args, resp)
}()
select {
case <-time.After(callTimeout):
connPool.ForceClose(conn)
return fmt.Errorf("%s, call timeout", addr)
case err = <-done:
if err != nil {
connPool.ForceClose(conn)
err = fmt.Errorf("%s, call failed, err %v. proc: %s", addr, err, connPool.Proc())
} else {
connPool.Release(conn)
}
return err
}
}
rpc连接池设计与实现:
package conn_pool
import (
"fmt"
"io"
"sync"
"time"
)
var ErrMaxConn = fmt.Errorf("maximum connections reached")
//named conn
type NConn interface {
io.Closer
Name() string
Closed() bool
}
//conn_pool
type ConnPool struct {
sync.RWMutex
Name string
Address string
MaxConns int32
MaxIdle int32
Cnt int64
New func(name string) (NConn, error)
active int32
free []NConn
all map[string]NConn
}
func NewConnPool(name string, address string, maxConns int32, maxIdle int32) *ConnPool {
return &ConnPool{Name: name, Address: address, MaxConns: maxConns, MaxIdle: maxIdle, Cnt: 0, all: make(map[string]NConn)}
}
func (this *ConnPool) Proc() string {
this.RLock()
defer this.RUnlock()
return fmt.Sprintf("Name:%s,Cnt:%d,active:%d,all:%d,free:%d",
this.Name, this.Cnt, this.active, len(this.all), len(this.free))
}
func (this *ConnPool) Fetch() (NConn, error) {
this.Lock()
defer this.Unlock()
// get from free
conn := this.fetchFree()
if conn != nil {
return conn, nil
}
if this.overMax() {
return nil, ErrMaxConn
}
// create new conn
conn, err := this.newConn()
if err != nil {
return nil, err
}
this.increActive()
return conn, nil
}
func (this *ConnPool) Release(conn NConn) {
this.Lock()
defer this.Unlock()
if this.overMaxIdle() {
this.deleteConn(conn)
this.decreActive()
} else {
this.addFree(conn)
}
}
func (this *ConnPool) ForceClose(conn NConn) {
this.Lock()
defer this.Unlock()
this.deleteConn(conn)
this.decreActive()
}
func (this *ConnPool) Destroy() {
this.Lock()
defer this.Unlock()
for _, conn := range this.free {
if conn != nil && !conn.Closed() {
conn.Close()
}
}
for _, conn := range this.all {
if conn != nil && !conn.Closed() {
conn.Close()
}
}
this.active = 0
this.free = []NConn{}
this.all = map[string]NConn{}
}
// internal, concurrently unsafe
func (this *ConnPool) newConn() (NConn, error) {
name := fmt.Sprintf("%s_%d_%d", this.Name, this.Cnt, time.Now().Unix())
conn, err := this.New(name)
if err != nil {
if conn != nil {
conn.Close()
}
return nil, err
}
this.Cnt++
this.all[conn.Name()] = conn
return conn, nil
}
func (this *ConnPool) deleteConn(conn NConn) {
if conn != nil {
conn.Close()
}
delete(this.all, conn.Name())
}
func (this *ConnPool) addFree(conn NConn) {
this.free = append(this.free, conn)
}
func (this *ConnPool) fetchFree() NConn {
if len(this.free) == 0 {
return nil
}
conn := this.free[0]
this.free = this.free[1:]
return conn
}
func (this *ConnPool) increActive() {
this.active += 1
}
func (this *ConnPool) decreActive() {
this.active -= 1
}
func (this *ConnPool) overMax() bool {
return this.active >= this.MaxConns
}
func (this *ConnPool) overMaxIdle() bool {
return int32(len(this.free)) >= this.MaxIdle
}
最后
以上就是无辜小伙为你收集整理的open-falcon transfer rpc实现的全部内容,希望文章能够帮你解决open-falcon transfer rpc实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复