概述
package worker
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
// 分布式锁(TXN事务)
type JobLock struct {
// etcd客户端
Client *clientv3.Client
Kv clientv3.KV
Lease clientv3.Lease
JobName string // 任务名
CancelFunc context.CancelFunc // 用于终止自动续租
LeaseId clientv3.LeaseID // 租约ID
IsLocked bool // 是否上锁成功
}
var(
G_jobLock *JobLock
)
//初始化任务锁
func InitJobLock() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
)
//创建config
config = clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout: 5000 * time.Millisecond,
}
//创建client
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
//创建kv, lease
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
//赋值单例
G_jobLock = &JobLock{
Client:client,
Kv:kv,
Lease:lease,
}
return
}
//尝试上锁
func (jobLock *JobLock) TryLock(jobName string) (err error){
var(
ctx context.Context
cancelFunc context.CancelFunc
leaseGrantRes *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnRes *clientv3.TxnResponse
)
//创建上下文
ctx, cancelFunc = context.WithCancel(context.TODO())
//创建续租
leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
//续租id
leaseId = leaseGrantRes.ID
//保持续租
if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
fmt.Println(err)
goto FAIL
}
//每隔1秒续约续租
go func() {
var keepRes *clientv3.LeaseKeepAliveResponse
for{
select {
case keepRes = <- KeepResChan:
//如果续约失败
if keepRes == nil{
goto END
}
}
time.Sleep(1*time.Second)
}
END:
}()
//创建事务txn
txn = jobLock.Kv.Txn(context.TODO())
//锁路径
lockKey = common.JOB_LOCK_DIR + jobName
//事务枪锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
//提交事务
if txnRes, err = txn.Commit(); err != nil{
fmt.Println("提交创建锁失败", err)
goto FAIL
}
//如果抢锁失败
if !txnRes.Succeeded{
err = "锁被占用"
goto FAIL
}
jobLock.CancelFunc = cancelFunc
jobLock.LeaseId = leaseId
jobLock.IsLocked = true
return
FAIL:
//释放上下文,取消续约
cancelFunc()
jobLock.Lease.Revoke(ctx, leaseId) //释放租约
return
}
//释放锁
func (jobLock *JobLock)UnLock() {
if jobLock.IsLocked{
jobLock.CancelFunc() //取消自动续租协程
jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
}
}
复制代码
转载于:https://juejin.im/post/5d0637bc518825294d2fee14
最后
以上就是危机电脑为你收集整理的go 调用etcd实现分布式锁的全部内容,希望文章能够帮你解决go 调用etcd实现分布式锁所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复