我是靠谱客的博主 感性羽毛,这篇文章主要介绍etcd记录,现在分享给大家,希望可以做个参考。

文章目录

    • 下载安装
      • 单节点测试
      • 集群测试
    • 使用示例
      • 权限管理
      • 集群
      • KV操作
      • 租约
      • 维护
      • Metrics
      • TLS
      • Watch
      • 并发
        • 集群选举示例
        • 分布式锁
      • 事务

下载安装

测试环境:centos7

官方给的下载方法(当前版本3.5.0):

脚本如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash ETCD_VER=v3.5.0 # choose either URL GOOGLE_URL=https://storage.googleapis.com/etcd GITHUB_URL=https://github.com/etcd-io/etcd/releases/download DOWNLOAD_URL=${GOOGLE_URL} rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1 rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz /tmp/etcd-download-test/etcd --version /tmp/etcd-download-test/etcdctl version /tmp/etcd-download-test/etcdutl version

安装完成之后文件解压到了/tmp/etcd-download-test目录,可以移动到其他目录(tmp目录在机器关机重启后会被删除)

复制代码
1
2
3
# 移动到/opt目录下 mv /tmp/etcd-download-test /opt/etcd

配置文件模板,etcd.conf.yml内容如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# This is the configuration file for the etcd server. # Human-readable name for this member. name: 'default' # Path to the data directory. data-dir: # Path to the dedicated wal directory. wal-dir: # Number of committed transactions to trigger a snapshot to disk. snapshot-count: 10000 # Time (in milliseconds) of a heartbeat interval. heartbeat-interval: 100 # Time (in milliseconds) for an election to timeout. election-timeout: 1000 # Raise alarms when backend size exceeds the given quota. 0 means use the # default quota. quota-backend-bytes: 0 # List of comma separated URLs to listen on for peer traffic. listen-peer-urls: http://localhost:2380 # List of comma separated URLs to listen on for client traffic. listen-client-urls: http://localhost:2379 # Maximum number of snapshot files to retain (0 is unlimited). max-snapshots: 5 # Maximum number of wal files to retain (0 is unlimited). max-wals: 5 # Comma-separated white list of origins for CORS (cross-origin resource sharing). cors: # List of this member's peer URLs to advertise to the rest of the cluster. # The URLs needed to be a comma-separated list. initial-advertise-peer-urls: http://localhost:2380 # List of this member's client URLs to advertise to the public. # The URLs needed to be a comma-separated list. advertise-client-urls: http://localhost:2379 # Discovery URL used to bootstrap the cluster. discovery: # Valid values include 'exit', 'proxy' discovery-fallback: 'proxy' # HTTP proxy to use for traffic to discovery service. discovery-proxy: # DNS domain used to bootstrap initial cluster. discovery-srv: # Initial cluster configuration for bootstrapping. initial-cluster: # Initial cluster token for the etcd cluster during bootstrap. initial-cluster-token: 'etcd-cluster' # Initial cluster state ('new' or 'existing'). initial-cluster-state: 'new' # Reject reconfiguration requests that would cause quorum loss. strict-reconfig-check: false # Enable runtime profiling data via HTTP server enable-pprof: true # Valid values include 'on', 'readonly', 'off' proxy: 'off' # Time (in milliseconds) an endpoint will be held in a failed state. proxy-failure-wait: 5000 # Time (in milliseconds) of the endpoints refresh interval. proxy-refresh-interval: 30000 # Time (in milliseconds) for a dial to timeout. proxy-dial-timeout: 1000 # Time (in milliseconds) for a write to timeout. proxy-write-timeout: 5000 # Time (in milliseconds) for a read to timeout. proxy-read-timeout: 0 client-transport-security: # Path to the client server TLS cert file. cert-file: # Path to the client server TLS key file. key-file: # Enable client cert authentication. client-cert-auth: false # Path to the client server TLS trusted CA cert file. trusted-ca-file: # Client TLS using generated certificates auto-tls: false peer-transport-security: # Path to the peer server TLS cert file. cert-file: # Path to the peer server TLS key file. key-file: # Enable peer client cert authentication. client-cert-auth: false # Path to the peer server TLS trusted CA cert file. trusted-ca-file: # Peer TLS using generated certificates. auto-tls: false # The validity period of the self-signed certificate, the unit is year. self-signed-cert-validity: 1 # Enable debug-level logging for etcd. log-level: debug logger: zap # Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd. log-outputs: [stderr] # Force to create a new one member cluster. force-new-cluster: false auto-compaction-mode: periodic auto-compaction-retention: "1"

单节点测试可以修改下列项:

复制代码
1
2
3
4
5
6
# 数据保存目录 data-dir wal-dir # 监听地址,可改为http://0.0.0.0:2379 listen-client-urls

单节点测试

复制代码
1
2
3
4
5
6
# 在etcd命令所在目录执行启动命令 ./etcd --config-file=./etcd_config.yaml # 测试 ./etcdctl put foo foo1 # --endpoints=0.0.0.0:2379 ./etcdctl get foo

集群测试

多节点测试需修改配置文件的如下项:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
# 集群中的节点名称 name # 数据保存目录 data-dir wal-dir # 监听地址,可改为http://0.0.0.0:2379 listen-client-urls # 与其他节点通信的地址,可改为http://0.0.0.0:2380 listen-peer-urls # 集群包含的所有节点,也可在后续通过客户端添加,格式为:<node_name1>=<node_url1>,<node_name2>=<node_url2> initial-cluster

使用dockerfile构建,也可使用已有的etcd镜像,这里使用dockerfile构建

这里基于centos:7.9.2009来构建,dockerfile_etcd内容:

复制代码
1
2
3
4
5
FROM centos:7.9.2009 ADD etcd /opt/etcd WORKDIR /opt/etcd CMD ["./etcd_start.sh"] #这个启动命令在将某一个节点移除时会出现该节点启动不了的问题

etcd_start.sh内容:

复制代码
1
2
3
4
5
6
7
8
9
#!/bin/bash CUR_DIR=`cd $(dirname $0) && pwd` if [ -z $NODE_NAME ]; then NODE_NAME='default' fi sed -i "s#name:.*#name: ${NODE_NAME}#g" ${CUR_DIR}/etcd_config.yaml sed -i "s#initial-cluster:.*#initial-cluster: ${INITIAL_CLUSTER}#g" ${CUR_DIR}/etcd_config.yaml ${CUR_DIR}/etcd --config-file=${CUR_DIR}/etcd_config.yaml

使用docker-compose构建集群,etcd.yaml文件内容:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
version: "3" services: etcd_n1: container_name: etcd_n1 privileged: true build: context: . dockerfile: dockerfile_etcd image: etcd:v1.0 ports: - 2379:2379 networks: etcd_network: ipv4_address: 172.20.0.2 environment: NODE_NAME: n1 INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380" etcd_n2: container_name: etcd_n2 privileged: true build: context: . dockerfile: dockerfile_etcd image: etcd:v1.0 ports: - 12379:2379 networks: etcd_network: ipv4_address: 172.20.0.3 environment: NODE_NAME: n2 INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380" etcd_n3: container_name: etcd_n3 privileged: true build: context: . dockerfile: dockerfile_etcd image: etcd:v1.0 ports: - 22379:2379 networks: etcd_network: ipv4_address: 172.20.0.4 environment: NODE_NAME: n3 INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380" networks: etcd_network: driver: bridge ipam: driver: default config: - subnet: 172.20.0.0/24

启动并查看节点情况:

复制代码
1
2
3
4
5
6
# 后台启动 ./docker-compose -f etcd.yaml up -d # 选择一个节点查看运行状态和结果 docker exec -it etcd_n1 ./etcdctl member list -w table docker exec -it etcd_n1 ./etcdctl endpoint status --cluster -w table

测试完成后删除容器和镜像

脚本rm_test_docker.sh内容

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash container_ids=`docker ps -a | grep etcd | awk '{print $1}'` images_ids=`docker images | grep etcd | awk '{print $3}' | uniq` for i in ${container_ids[@]} do echo "rm container $i" docker stop $i docker rm $i done for j in ${images_ids[@]} do echo "rm image $j" docker rmi $j -f done

使用示例

注: etcd中使用的bbolt和grpc存在版本冲突的问题

解决方法:

在go.mod中替换适配的版本

复制代码
1
2
3
4
5
// 根据go mod tidy替换bbolt版本 replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.6 // 替换grpc版本 replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

权限管理

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil { log.Fatal(err) } if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil { log.Fatal(err) } if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil { log.Fatal(err) } if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil { log.Fatal(err) } if _, err = cli.RoleGrantPermission( context.TODO(), "r", // role name "foo", // key "zoo", // range end clientv3.PermissionType(clientv3.PermReadWrite), ); err != nil { log.Fatal("RoleGrantPermission error: ", err) } if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil { log.Fatal("UserAdd u error: ", err) } if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil { log.Fatal("UserGrantRole u error: ", err) } if _, err = cli.AuthEnable(context.TODO()); err != nil { log.Fatal(err) } cliAuth, err := clientv3.New(clientv3.Config{ Endpoints: []string{"0.0.0.0:2379"}, DialTimeout: time.Second * 5, Username: "u", Password: "123", }) if err != nil { log.Fatal("new error:", err) } defer cliAuth.Close() if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil { log.Fatal(err) } _, err = cliAuth.Txn(context.TODO()). If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")). Then(clientv3.OpPut("zoo1", "XYZ")). Else(clientv3.OpPut("zoo1", "ABC")). Commit() fmt.Println("txn error", err) // now check the permission with the root account rootCli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"0.0.0.0:2379"}, DialTimeout: time.Second * 5, Username: "root", Password: "123", }) if err != nil { log.Fatal(err) } defer rootCli.Close() resp, err := rootCli.RoleGet(context.TODO(), "r") if err != nil { log.Fatal("RoleGet r error: ", err) } fmt.Printf("user u permission: key %q, range end %qn", resp.Perm[0].Key, resp.Perm[0].RangeEnd) if _, err = rootCli.AuthDisable(context.TODO()); err != nil { log.Fatal("AuthDisable error: ", err) } _, err = cli.RoleDelete(context.TODO(), "r") if err != nil { log.Fatal("RoleDelete r error: ", err) } _, err = cli.RoleDelete(context.TODO(), "root") if err != nil { log.Fatal("RoleDelete root error: ", err) } _, err = cli.UserDelete(context.TODO(), "u") if err != nil { log.Fatal("UserDelete u error: ", err) } _, err = cli.UserDelete(context.TODO(), "root") if err != nil { log.Fatal("UserDelete root error: ", err) } }

集群

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } fmt.Println("members:", len(resp.Members)) for _, v := range resp.Members { fmt.Println(v) } _, err = cli.MemberRemove(context.Background(), resp.Members[1].ID) if err != nil { log.Fatal(err) } //mresp, err := cli.MemberAddAsLearner(context.Background(), []string{"http://localhost:32381"}) //if err != nil { // log.Fatal(err) //} mresp, err := cli.MemberAdd(context.Background(), resp.Members[1].PeerURLs) if err != nil { log.Fatal(err) } fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs) fmt.Println("members count:", len(mresp.Members)) _, err = cli.MemberUpdate(context.Background(), resp.Members[1].ID, resp.Members[1].PeerURLs) if err != nil { log.Fatal(err) } }

KV操作

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) _, err = cli.Put(ctx, "", "sample_value") cancel() if err != nil { switch err { case context.Canceled: fmt.Printf("ctx is canceled by another routine: %vn", err) case context.DeadlineExceeded: fmt.Printf("ctx is attached with a deadline is exceeded: %vn", err) case rpctypes.ErrEmptyKey: fmt.Printf("client-side error: %vn", err) default: fmt.Printf("bad cluster endpoints, which are not etcd servers: %vn", err) } } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) presp, err := cli.Put(ctx, "foo", "bar1") cancel() if err != nil { log.Fatal(err) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) _, err = cli.Put(ctx, "foo", "bar2") cancel() if err != nil { log.Fatal(err) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err := cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %sn", ev.Key, ev.Value) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision)) cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %sn", ev.Key, ev.Value) } // sorted for i := range make([]int, 3) { ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) _, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value") cancel() if err != nil { log.Fatal(err) } } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err = cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %sn", ev.Key, ev.Value) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) // count keys about to be deleted gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix()) if err != nil { log.Fatal(err) } // delete the keys dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix()) if err != nil { log.Fatal(err) } fmt.Println("gresp.Kvs len:", len(gresp.Kvs), ", Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted) // compact 所有比此版本低的历史都将删除 ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err = cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) } compRev := resp.Header.Revision // specify compact revision of your choice ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) _, err = cli.Compact(ctx, compRev) cancel() if err != nil { log.Fatal(err) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision)) cancel() fmt.Println(err) // error occurred // txn kvc := clientv3.NewKV(cli) _, err = kvc.Put(context.TODO(), "key", "xyz") if err != nil { log.Fatal(err) } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) _, err = kvc.Txn(ctx). // txn value comparisons are lexical If(clientv3.Compare(clientv3.Value("key"), ">", "abc")). // the "Then" runs, since "xyz" > "abc" Then(clientv3.OpPut("key", "XYZ")). // the "Else" does not run Else(clientv3.OpPut("key", "ABC")). Commit() cancel() if err != nil { log.Fatal(err) } gresp, err = kvc.Get(context.TODO(), "key") if err != nil { log.Fatal(err) } for _, ev := range gresp.Kvs { fmt.Printf("%s : %sn", ev.Key, ev.Value) } // do ops := []clientv3.Op{ clientv3.OpPut("put-key", "123"), clientv3.OpGet("put-key"), clientv3.OpPut("put-key", "456")} for _, op := range ops { if _, err := cli.Do(context.TODO(), op); err != nil { log.Fatal(err) } } ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) resp, err = cli.Get(ctx, "put-key") cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %sn", ev.Key, ev.Value) } }

租约

参考示例

等待租约在指定时间后过期:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() // minimum lease TTL is 5-second resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } // after 5 seconds, the key 'foo' will be removed _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } gresp, err := cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } fmt.Println(gresp.Kvs) time.Sleep(time.Second * 6) gresp, err = cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } fmt.Println(gresp.Kvs) }

主动撤销租约:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // revoking lease expires the key attached to its lease ID _, err = cli.Revoke(context.TODO(), resp.ID) if err != nil { log.Fatal(err) } gresp, err := cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } fmt.Println("number of keys:", len(gresp.Kvs)) }

增加KeepAlive使租约不过期,也可使用KeepAliveOnce只更新一次:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // the key 'foo' will be kept forever ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { log.Fatal(kaerr) } ka := <-ch if ka != nil { fmt.Println("ttl:", ka.TTL) } else { fmt.Println("Unexpected NULL") } time.Sleep(time.Second * 6) gresp, err := cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } fmt.Println(gresp.Kvs) }

维护

碎片整理:

  • 从给定 etcd 成员的内部碎片中释放浪费的空间
  • 只有在删除大量key并想回收资源时才需要进行碎片整理
  • 碎片整理是一项昂贵的操作。 用户应避免同时对多个成员进行碎片整理
  • 要对集群中的多个成员进行碎片整理,用户需要使用不同的端点多次调用碎片整理

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main import ( "context" "github.com/coreos/etcd/clientv3" "log" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() _, err = cli.Status(context.Background(), "172.20.20.55:2379") if err != nil { log.Fatal(err) } if _, err = cli.Defragment(context.TODO(), "172.20.20.55:2379"); err != nil { log.Fatal(err) } }

Metrics

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/prometheus/client_golang/prometheus/promhttp" "io" "log" "net" "net/http" "strings" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() // get a key so it shows up in the metrics as a range RPC cli.Get(context.TODO(), "test_key") // listen for all Prometheus metrics ln, err := net.Listen("tcp", ":0") if err != nil { log.Fatal(err) } donec := make(chan struct{}) go func() { defer close(donec) http.Serve(ln, promhttp.Handler()) }() defer func() { ln.Close() <-donec }() // make an http request to fetch all Prometheus metrics url := "http://" + ln.Addr().String() + "/metrics" resp, err := http.Get(url) if err != nil { log.Fatalf("fetch error: %v", err) } b, err := io.ReadAll(resp.Body) resp.Body.Close() if err != nil { log.Fatalf("fetch error: reading %s: %v", url, err) } fmt.Println("resp body", string(b)) // confirm range request in metrics for _, l := range strings.Split(string(b), "n") { if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) { fmt.Println(l) break } } }

TLS

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main import ( "context" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/pkg/transport" "log" "time" ) func main() { tlsInfo := transport.TLSInfo{ CertFile: "/tmp/test-certs/test-name-1.pem", KeyFile: "/tmp/test-certs/test-name-1-key.pem", TrustedCAFile: "/tmp/test-certs/trusted-ca.pem", } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { log.Fatal(err) } cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"172.20.20.55:2379"}, DialTimeout: time.Second*5, TLS: tlsConfig, }) if err != nil { log.Fatal(err) } defer cli.Close() // make sure to close the client _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } }

Watch

参考示例

示例1:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "strconv" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() go func() { var i = 0 for { time.Sleep(time.Second) cli.Put(context.Background(), "foo", "bar"+strconv.Itoa(i)) i++ } }() rch := cli.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }

示例2(WithPrefix):

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "strconv" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() go func() { var i = 0 for { time.Sleep(time.Second) cli.Put(context.Background(), "foo1", "bar"+strconv.Itoa(i)) i++ } }() rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }

示例3(WithRange):

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() go func() { time.Sleep(time.Second) cli.Put(context.Background(), "foo1", "bar1") cli.Put(context.Background(), "foo5", "bar5") cli.Put(context.Background(), "foo2", "bar2") cli.Put(context.Background(), "foo3", "bar3") }() rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4")) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }

示例4(WithProgessNotify):

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify()) closedch := make(chan bool) go func() { // This assumes that cluster is configured with frequent WatchProgressNotifyInterval // e.g. WatchProgressNotifyInterval: 200 * time.Millisecond. time.Sleep(time.Second) err := cli.Close() if err != nil { log.Fatal(err) } close(closedch) }() wresp := <-rch fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify()) <-closedch }

并发

集群选举示例

参考示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" "sync" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() // create two separate sessions for election competition s1, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s1.Close() e1 := concurrency.NewElection(s1, "/test/election") s2, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s2.Close() e2 := concurrency.NewElection(s2, "/test/election") // create competing candidates, with e1 initially losing to e2 var wg sync.WaitGroup wg.Add(2) electc := make(chan *concurrency.Election, 2) go func() { defer wg.Done() // delay candidacy so e2 wins first time.Sleep(3 * time.Second) log.Println("e1.Campaign") if err := e1.Campaign(context.Background(), "e1"); err != nil { log.Fatal(err) } log.Println("e1.Campaign success") electc <- e1 }() go func() { defer wg.Done() log.Println("e2.Campaign") if err := e2.Campaign(context.Background(), "e2"); err != nil { log.Fatal(err) } log.Println("e2.Campaign success") electc <- e2 }() cctx, cancel := context.WithCancel(context.TODO()) defer cancel() e := <-electc fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) // resign so next candidate can be elected if err := e.Resign(context.TODO()); err != nil { log.Fatal(err) } e = <-electc fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) wg.Wait() }

分布式锁

参考示例_高版本v3.5.0

参考示例_低版本v3.3.25

注: 注意版本,高版本才支持TryLock,Lock函数会阻塞

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}}) if err != nil { log.Fatal(err) } defer cli.Close() s1, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s1.Close() m1 := concurrency.NewMutex(s1, "/test/my-lock") s2, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s2.Close() m2 := concurrency.NewMutex(s2, "/test/my-lock") // 高版本示例 // acquire lock for s1 if err = m1.Lock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s1") if err = m2.Lock(context.TODO()); err == nil { log.Fatal("should not acquire lock") } if err == concurrency.ErrLocked { fmt.Println("cannot acquire lock for s2, as already locked in another session") } if err = m1.Unlock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("released lock for s1") if err = m2.TryLock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s2") // 低版本示例 // acquire lock for s1 if err := m1.Lock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s1") m2Locked := make(chan struct{}) go func() { defer close(m2Locked) // wait until s1 is locks /my-lock/ if err := m2.Lock(context.TODO()); err != nil { log.Fatal(err) } }() if err := m1.Unlock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("released lock for s1") <-m2Locked fmt.Println("acquired lock for s2") }

事务

示例:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main import ( "context" "github.com/coreos/etcd/clientv3" "log" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.57:2379"}, DialTimeout: 10 * time.Second}) if err != nil { log.Fatal("clientv3.New", err) } defer cli.Close() ctx := context.Background() _, err = cli.Delete(ctx, "test") if err != nil { log.Fatal(err) } // 事务,key不存在,则设置初始值 r, err := cli.Txn(ctx).If(clientv3.Compare(clientv3.CreateRevision("test"), "=", 0)).Then(clientv3.OpPut("test", "abc")).Commit() if err != nil { log.Fatal(err) } log.Println(err, r) if !r.Succeeded { log.Println(r.Responses) } // 事务,比较后修改 cmp := clientv3.Compare(clientv3.Value("test"), "=", "123") opPut := clientv3.OpPut("test", "abc") opGet := clientv3.OpGet("test") resp, err := cli.Txn(ctx).If(cmp).Then(opPut).Else(opGet).Commit() if err != nil { log.Fatal(err) } // 如果事务执行中比较失败,则获取值 if !resp.Succeeded { log.Println("resp:", resp.Responses[0].GetResponseRange().Kvs[0]) } }

最后

以上就是感性羽毛最近收集整理的关于etcd记录的全部内容,更多相关etcd记录内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(82)

评论列表共有 0 条评论

立即
投稿
返回
顶部