我是靠谱客的博主 感性羽毛,最近开发中收集的这篇文章主要介绍etcd记录,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

    • 下载安装
      • 单节点测试
      • 集群测试
    • 使用示例
      • 权限管理
      • 集群
      • KV操作
      • 租约
      • 维护
      • Metrics
      • TLS
      • Watch
      • 并发
        • 集群选举示例
        • 分布式锁
      • 事务

下载安装

测试环境:centos7

官方给的下载方法(当前版本3.5.0):

脚本如下:

#!/bin/bash
ETCD_VER=v3.5.0
# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
/tmp/etcd-download-test/etcdutl version

安装完成之后文件解压到了/tmp/etcd-download-test目录,可以移动到其他目录(tmp目录在机器关机重启后会被删除)

# 移动到/opt目录下
mv /tmp/etcd-download-test /opt/etcd

配置文件模板,etcd.conf.yml内容如下:

# This is the configuration file for the etcd server.
# Human-readable name for this member.
name: 'default'
# Path to the data directory.
data-dir:
# Path to the dedicated wal directory.
wal-dir:
# Number of committed transactions to trigger a snapshot to disk.
snapshot-count: 10000
# Time (in milliseconds) of a heartbeat interval.
heartbeat-interval: 100
# Time (in milliseconds) for an election to timeout.
election-timeout: 1000
# Raise alarms when backend size exceeds the given quota. 0 means use the
# default quota.
quota-backend-bytes: 0
# List of comma separated URLs to listen on for peer traffic.
listen-peer-urls: http://localhost:2380
# List of comma separated URLs to listen on for client traffic.
listen-client-urls: http://localhost:2379
# Maximum number of snapshot files to retain (0 is unlimited).
max-snapshots: 5
# Maximum number of wal files to retain (0 is unlimited).
max-wals: 5
# Comma-separated white list of origins for CORS (cross-origin resource sharing).
cors:
# List of this member's peer URLs to advertise to the rest of the cluster.
# The URLs needed to be a comma-separated list.
initial-advertise-peer-urls: http://localhost:2380
# List of this member's client URLs to advertise to the public.
# The URLs needed to be a comma-separated list.
advertise-client-urls: http://localhost:2379
# Discovery URL used to bootstrap the cluster.
discovery:
# Valid values include 'exit', 'proxy'
discovery-fallback: 'proxy'
# HTTP proxy to use for traffic to discovery service.
discovery-proxy:
# DNS domain used to bootstrap initial cluster.
discovery-srv:
# Initial cluster configuration for bootstrapping.
initial-cluster:
# Initial cluster token for the etcd cluster during bootstrap.
initial-cluster-token: 'etcd-cluster'
# Initial cluster state ('new' or 'existing').
initial-cluster-state: 'new'
# Reject reconfiguration requests that would cause quorum loss.
strict-reconfig-check: false
# Enable runtime profiling data via HTTP server
enable-pprof: true
# Valid values include 'on', 'readonly', 'off'
proxy: 'off'
# Time (in milliseconds) an endpoint will be held in a failed state.
proxy-failure-wait: 5000
# Time (in milliseconds) of the endpoints refresh interval.
proxy-refresh-interval: 30000
# Time (in milliseconds) for a dial to timeout.
proxy-dial-timeout: 1000
# Time (in milliseconds) for a write to timeout.
proxy-write-timeout: 5000
# Time (in milliseconds) for a read to timeout.
proxy-read-timeout: 0
client-transport-security:
# Path to the client server TLS cert file.
cert-file:
# Path to the client server TLS key file.
key-file:
# Enable client cert authentication.
client-cert-auth: false
# Path to the client server TLS trusted CA cert file.
trusted-ca-file:
# Client TLS using generated certificates
auto-tls: false
peer-transport-security:
# Path to the peer server TLS cert file.
cert-file:
# Path to the peer server TLS key file.
key-file:
# Enable peer client cert authentication.
client-cert-auth: false
# Path to the peer server TLS trusted CA cert file.
trusted-ca-file:
# Peer TLS using generated certificates.
auto-tls: false
# The validity period of the self-signed certificate, the unit is year.
self-signed-cert-validity: 1
# Enable debug-level logging for etcd.
log-level: debug
logger: zap
# Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.
log-outputs: [stderr]
# Force to create a new one member cluster.
force-new-cluster: false
auto-compaction-mode: periodic
auto-compaction-retention: "1"

单节点测试可以修改下列项:

# 数据保存目录
data-dir
wal-dir
# 监听地址,可改为http://0.0.0.0:2379
listen-client-urls

单节点测试

# 在etcd命令所在目录执行启动命令
./etcd --config-file=./etcd_config.yaml
# 测试
./etcdctl put foo foo1 # --endpoints=0.0.0.0:2379
./etcdctl get foo

集群测试

多节点测试需修改配置文件的如下项:

# 集群中的节点名称
name
# 数据保存目录
data-dir
wal-dir
# 监听地址,可改为http://0.0.0.0:2379
listen-client-urls
# 与其他节点通信的地址,可改为http://0.0.0.0:2380
listen-peer-urls
# 集群包含的所有节点,也可在后续通过客户端添加,格式为:<node_name1>=<node_url1>,<node_name2>=<node_url2>
initial-cluster

使用dockerfile构建,也可使用已有的etcd镜像,这里使用dockerfile构建

这里基于centos:7.9.2009来构建,dockerfile_etcd内容:

FROM centos:7.9.2009
ADD etcd /opt/etcd
WORKDIR /opt/etcd
CMD ["./etcd_start.sh"] #这个启动命令在将某一个节点移除时会出现该节点启动不了的问题

etcd_start.sh内容:

#!/bin/bash
CUR_DIR=`cd $(dirname $0) && pwd`
if [ -z $NODE_NAME ]; then
NODE_NAME='default'
fi
sed -i "s#name:.*#name: ${NODE_NAME}#g" ${CUR_DIR}/etcd_config.yaml
sed -i "s#initial-cluster:.*#initial-cluster: ${INITIAL_CLUSTER}#g" ${CUR_DIR}/etcd_config.yaml
${CUR_DIR}/etcd --config-file=${CUR_DIR}/etcd_config.yaml

使用docker-compose构建集群,etcd.yaml文件内容:

version: "3"
services:
etcd_n1:
container_name: etcd_n1
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 2379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.2
environment:
NODE_NAME: n1
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
etcd_n2:
container_name: etcd_n2
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 12379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.3
environment:
NODE_NAME: n2
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
etcd_n3:
container_name: etcd_n3
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 22379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.4
environment:
NODE_NAME: n3
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
networks:
etcd_network:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.20.0.0/24

启动并查看节点情况:

# 后台启动
./docker-compose -f etcd.yaml up -d
# 选择一个节点查看运行状态和结果
docker exec -it etcd_n1 ./etcdctl member list -w table
docker exec -it etcd_n1 ./etcdctl endpoint status --cluster -w table

测试完成后删除容器和镜像

脚本rm_test_docker.sh内容

#!/bin/bash
container_ids=`docker ps -a | grep etcd | awk '{print $1}'`
images_ids=`docker images | grep etcd | awk '{print $3}' | uniq`
for i in ${container_ids[@]}
do
echo "rm container $i"
docker stop $i
docker rm $i
done
for j in ${images_ids[@]}
do
echo "rm image $j"
docker rmi $j -f
done

使用示例

注: etcd中使用的bbolt和grpc存在版本冲突的问题

解决方法:

在go.mod中替换适配的版本

// 根据go mod tidy替换bbolt版本
replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.6
// 替换grpc版本
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

权限管理

参考示例

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil {
log.Fatal(err)
}
if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil {
log.Fatal(err)
}
if _, err = cli.RoleGrantPermission(
context.TODO(),
"r",
// role name
"foo", // key
"zoo", // range end
clientv3.PermissionType(clientv3.PermReadWrite),
); err != nil {
log.Fatal("RoleGrantPermission error: ", err)
}
if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil {
log.Fatal("UserAdd u error: ", err)
}
if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil {
log.Fatal("UserGrantRole u error: ", err)
}
if _, err = cli.AuthEnable(context.TODO()); err != nil {
log.Fatal(err)
}
cliAuth, err := clientv3.New(clientv3.Config{
Endpoints:
[]string{"0.0.0.0:2379"},
DialTimeout: time.Second * 5,
Username:
"u",
Password:
"123",
})
if err != nil {
log.Fatal("new error:", err)
}
defer cliAuth.Close()
if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil {
log.Fatal(err)
}
_, err = cliAuth.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
Then(clientv3.OpPut("zoo1", "XYZ")).
Else(clientv3.OpPut("zoo1", "ABC")).
Commit()
fmt.Println("txn error", err)
// now check the permission with the root account
rootCli, err := clientv3.New(clientv3.Config{
Endpoints:
[]string{"0.0.0.0:2379"},
DialTimeout: time.Second * 5,
Username:
"root",
Password:
"123",
})
if err != nil {
log.Fatal(err)
}
defer rootCli.Close()
resp, err := rootCli.RoleGet(context.TODO(), "r")
if err != nil {
log.Fatal("RoleGet r error: ", err)
}
fmt.Printf("user u permission: key %q, range end %qn", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
if _, err = rootCli.AuthDisable(context.TODO()); err != nil {
log.Fatal("AuthDisable error: ", err)
}
_, err = cli.RoleDelete(context.TODO(), "r")
if err != nil {
log.Fatal("RoleDelete r error: ", err)
}
_, err = cli.RoleDelete(context.TODO(), "root")
if err != nil {
log.Fatal("RoleDelete root error: ", err)
}
_, err = cli.UserDelete(context.TODO(), "u")
if err != nil {
log.Fatal("UserDelete u error: ", err)
}
_, err = cli.UserDelete(context.TODO(), "root")
if err != nil {
log.Fatal("UserDelete root error: ", err)
}
}

集群

参考示例

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("members:", len(resp.Members))
for _, v := range resp.Members {
fmt.Println(v)
}
_, err = cli.MemberRemove(context.Background(), resp.Members[1].ID)
if err != nil {
log.Fatal(err)
}
//mresp, err := cli.MemberAddAsLearner(context.Background(), []string{"http://localhost:32381"})
//if err != nil {
//	log.Fatal(err)
//}
mresp, err := cli.MemberAdd(context.Background(), resp.Members[1].PeerURLs)
if err != nil {
log.Fatal(err)
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
fmt.Println("members count:", len(mresp.Members))
_, err = cli.MemberUpdate(context.Background(), resp.Members[1].ID, resp.Members[1].PeerURLs)
if err != nil {
log.Fatal(err)
}
}

KV操作

参考示例

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, "", "sample_value")
cancel()
if err != nil {
switch err {
case context.Canceled:
fmt.Printf("ctx is canceled by another routine: %vn", err)
case context.DeadlineExceeded:
fmt.Printf("ctx is attached with a deadline is exceeded: %vn", err)
case rpctypes.ErrEmptyKey:
fmt.Printf("client-side error: %vn", err)
default:
fmt.Printf("bad cluster endpoints, which are not etcd servers: %vn", err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
presp, err := cli.Put(ctx, "foo", "bar1")
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, "foo", "bar2")
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
// sorted
for i := range make([]int, 3) {
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
cancel()
if err != nil {
log.Fatal(err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
// count keys about to be deleted
gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
// delete the keys
dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("gresp.Kvs len:", len(gresp.Kvs), ", Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
// compact 所有比此版本低的历史都将删除
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Compact(ctx, compRev)
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
fmt.Println(err) // error occurred
// txn
kvc := clientv3.NewKV(cli)
_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = kvc.Txn(ctx).
// txn value comparisons are lexical
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
// the "Then" runs, since "xyz" > "abc"
Then(clientv3.OpPut("key", "XYZ")).
// the "Else" does not run
Else(clientv3.OpPut("key", "ABC")).
Commit()
cancel()
if err != nil {
log.Fatal(err)
}
gresp, err = kvc.Get(context.TODO(), "key")
if err != nil {
log.Fatal(err)
}
for _, ev := range gresp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
// do
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "put-key")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
}

租约

参考示例

等待租约在指定时间后过期:

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// minimum lease TTL is 5-second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
time.Sleep(time.Second * 6)
gresp, err = cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
}

主动撤销租约:

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// revoking lease expires the key attached to its lease ID
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println("number of keys:", len(gresp.Kvs))
}

增加KeepAlive使租约不过期,也可使用KeepAliveOnce只更新一次:

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ka := <-ch
if ka != nil {
fmt.Println("ttl:", ka.TTL)
} else {
fmt.Println("Unexpected NULL")
}
time.Sleep(time.Second * 6)
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
}

维护

碎片整理:

  • 从给定 etcd 成员的内部碎片中释放浪费的空间
  • 只有在删除大量key并想回收资源时才需要进行碎片整理
  • 碎片整理是一项昂贵的操作。 用户应避免同时对多个成员进行碎片整理
  • 要对集群中的多个成员进行碎片整理,用户需要使用不同的端点多次调用碎片整理

参考示例

package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
_, err = cli.Status(context.Background(), "172.20.20.55:2379")
if err != nil {
log.Fatal(err)
}
if _, err = cli.Defragment(context.TODO(), "172.20.20.55:2379"); err != nil {
log.Fatal(err)
}
}

Metrics

参考示例

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io"
"log"
"net"
"net/http"
"strings"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// get a key so it shows up in the metrics as a range RPC
cli.Get(context.TODO(), "test_key")
// listen for all Prometheus metrics
ln, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal(err)
}
donec := make(chan struct{})
go func() {
defer close(donec)
http.Serve(ln, promhttp.Handler())
}()
defer func() {
ln.Close()
<-donec
}()
// make an http request to fetch all Prometheus metrics
url := "http://" + ln.Addr().String() + "/metrics"
resp, err := http.Get(url)
if err != nil {
log.Fatalf("fetch error: %v", err)
}
b, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Fatalf("fetch error: reading %s: %v", url, err)
}
fmt.Println("resp body", string(b))
// confirm range request in metrics
for _, l := range strings.Split(string(b), "n") {
if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) {
fmt.Println(l)
break
}
}
}

TLS

参考示例

package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"log"
"time"
)
func main() {
tlsInfo := transport.TLSInfo{
CertFile:
"/tmp/test-certs/test-name-1.pem",
KeyFile:
"/tmp/test-certs/test-name-1-key.pem",
TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
log.Fatal(err)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints:
[]string{"172.20.20.55:2379"},
DialTimeout: time.Second*5,
TLS:
tlsConfig,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
}

Watch

参考示例

示例1:

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"strconv"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
var i = 0
for {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo", "bar"+strconv.Itoa(i))
i++
}
}()
rch := cli.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}

示例2(WithPrefix):

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"strconv"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
var i = 0
for {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo1", "bar"+strconv.Itoa(i))
i++
}
}()
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}

示例3(WithRange):

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo1", "bar1")
cli.Put(context.Background(), "foo5", "bar5")
cli.Put(context.Background(), "foo2", "bar2")
cli.Put(context.Background(), "foo3", "bar3")
}()
rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}

示例4(WithProgessNotify):

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())
closedch := make(chan bool)
go func() {
// This assumes that cluster is configured with frequent WatchProgressNotifyInterval
// e.g. WatchProgressNotifyInterval: 200 * time.Millisecond.
time.Sleep(time.Second)
err := cli.Close()
if err != nil {
log.Fatal(err)
}
close(closedch)
}()
wresp := <-rch
fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())
<-closedch
}

并发

集群选举示例

参考示例

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"sync"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/test/election")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/test/election")
// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
defer wg.Done()
// delay candidacy so e2 wins first
time.Sleep(3 * time.Second)
log.Println("e1.Campaign")
if err := e1.Campaign(context.Background(), "e1"); err != nil {
log.Fatal(err)
}
log.Println("e1.Campaign success")
electc <- e1
}()
go func() {
defer wg.Done()
log.Println("e2.Campaign")
if err := e2.Campaign(context.Background(), "e2"); err != nil {
log.Fatal(err)
}
log.Println("e2.Campaign success")
electc <- e2
}()
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
log.Fatal(err)
}
e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
wg.Wait()
}

分布式锁

参考示例_高版本v3.5.0

参考示例_低版本v3.3.25

注: 注意版本,高版本才支持TryLock,Lock函数会阻塞

package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/test/my-lock")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/test/my-lock")
// 高版本示例
// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
if err = m2.Lock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")
// 低版本示例
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
}

事务

示例:

package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.57:2379"}, DialTimeout: 10 * time.Second})
if err != nil {
log.Fatal("clientv3.New", err)
}
defer cli.Close()
ctx := context.Background()
_, err = cli.Delete(ctx, "test")
if err != nil {
log.Fatal(err)
}
// 事务,key不存在,则设置初始值
r, err := cli.Txn(ctx).If(clientv3.Compare(clientv3.CreateRevision("test"), "=", 0)).Then(clientv3.OpPut("test", "abc")).Commit()
if err != nil {
log.Fatal(err)
}
log.Println(err, r)
if !r.Succeeded {
log.Println(r.Responses)
}
// 事务,比较后修改
cmp := clientv3.Compare(clientv3.Value("test"), "=", "123")
opPut := clientv3.OpPut("test", "abc")
opGet := clientv3.OpGet("test")
resp, err := cli.Txn(ctx).If(cmp).Then(opPut).Else(opGet).Commit()
if err != nil {
log.Fatal(err)
}
// 如果事务执行中比较失败,则获取值
if !resp.Succeeded {
log.Println("resp:", resp.Responses[0].GetResponseRange().Kvs[0])
}
}

最后

以上就是感性羽毛为你收集整理的etcd记录的全部内容,希望文章能够帮你解决etcd记录所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(44)

评论列表共有 0 条评论

立即
投稿
返回
顶部