我是靠谱客的博主 爱听歌猫咪,这篇文章主要介绍ETCD Restful Client V3,现在分享给大家,希望可以做个参考。

运行环境中需要支持python2.6 2.7环境的etcdclient,etcd版本为3.4,采用社区的包,由于会引用grpc包,导致import包很慢,测试中发现就引入包用了0.6-0.8S,所要开发的代码是一个实时采集程序,所以决定使用etcd开放的restful api,自己封装一个client,目前只实现了部分接口

代码如下,记录下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2020/8/10 17:05 # @Author : fly # @File : etcd_client.py # @Version : 0.1 import requests import base64 import json class Etcd3Client(): def __init__(self,host,port,user,passwd,timeout=2000): self.__version__ = 'v3.4_1' self.host = host self.port = port self.user = user self.passwd = passwd self.timeout = timeout self.error = "" def GetToken(self): try: url = "http://{0}:{1}/v3/auth/authenticate".format(self.host, self.port) params = { "name": self.user, "password": self.passwd } res = self.conn.post(url=url, data=json.dumps(params), timeout=self.timeout) if res.status_code == 200: self.token = res.json()['token'] except Exception as e: self.error = str(e) def _enbase64(self,value): return base64.b64encode(value) def _debase64(self,value): return base64.b64decode(value) def Connect(self): try: self.conn = requests.session() self.GetToken() except Exception as e: self.error = str(e) def Close(self): self.conn.close() def increment_last_byte(self,byte_string): s = bytearray(byte_string) s[-1] = s[-1] + 1 return bytes(s) def to_bytes(self,maybe_bytestring): if isinstance(maybe_bytestring, bytes): return maybe_bytestring else: return maybe_bytestring.encode('utf-8') def lease_to_id(self,lease): """Figure out if the argument is a Lease object, or the lease ID.""" lease_id = 0 if hasattr(lease, 'id'): lease_id = lease.id else: try: lease_id = int(lease) except TypeError: pass return lease_id def _get_range(self, key,range_end=None,limit=None,revision=None,sort_order=None,sort_target='key',serializable=False,keys_only=False,count_only=None,min_mod_revision=None,max_mod_revision=None,min_create_revision=None,max_create_revision=None): try: url = "http://{0}:{1}/v3/kv/range".format(self.host, self.port) headers = { "Authorization": self.token, "Connection":'keep-alive' } params = {} key = self._enbase64(key) params['key'] = key params['keys_only'] = keys_only if range_end is not None: params['range_end'] = self._enbase64(range_end) if sort_order is None: params['sort_order'] = 'NONE' elif sort_order == 'ascend': params['sort_order'] = 'ASCEND' elif sort_order == 'descend': params['sort_order'] = 'DESCEND ' if sort_target is None or sort_target == 'key': params['sort_target'] = 'KEY' elif sort_target == 'version': params['sort_target'] = 'VERSION ' elif sort_target == 'create': params['sort_target'] = 'CREATE ' elif sort_target == 'mod': params['sort_target'] = 'MOD ' elif sort_target == 'value': params['sort_target'] = 'VALUE ' params['limit'] = limit params['revision'] = revision params['serializable'] = serializable params['count_only'] = count_only params['min_mod_revision'] = min_mod_revision params['max_mod_revision'] = max_mod_revision params['min_create_revision'] = min_create_revision params['max_create_revision'] = max_create_revision data = self.conn.post(url=url, headers=headers, data=json.dumps(params), timeout=self.timeout) return data except Exception as e: self.error = str(e) def get_response(self,key,serializable=False): return self._get_range(key=key,serializable=serializable) def get(self, key, **kwargs): try: resp = self.get_response(key=key,**kwargs) if resp.status_code == 200: data = resp.json() if data['count'] < 1: return (True, None) else: return (True,self._debase64(data['kvs'][0]['value'])) else: return (False,None) except Exception as e: self.error = str(e) def get_prefix_response(self,key_prefix, sort_order=None,sort_target='key', keys_only=False): return self._get_range(key=key_prefix,range_end=self.increment_last_byte(self.to_bytes(key_prefix)),sort_order=sort_order,sort_target=sort_target,keys_only=keys_only) def get_prefix(self,key_prefix,**kwargs): try: resp = self.get_prefix_response(key_prefix=key_prefix,**kwargs) if resp.status_code==200: data = resp.json() if not data.has_key('count'): return (0,None) else: res = [] for i in data['kvs']: item={} if i.has_key('value'): item['key'] = self._debase64(i['key']) item['value'] = self._debase64(i['value']) res.append(item) else: res.append(self._debase64(i['key'])) return (True,res) else: return (False,None) except Exception as e: self.error = str(e) def put(self,key, value, lease=None, prev_kv=False): try: url = "http://{0}:{1}/v3/kv/put".format(self.host, self.port) headers = { "Authorization": self.token, "Connection": 'keep-alive' } params = { "key":self._enbase64(key), "value":self._enbase64(value), "lease":self.lease_to_id(lease), "prev_kv":prev_kv } resp = self.conn.post(url=url, headers=headers, data=json.dumps(params), timeout=self.timeout) if resp.status_code == 200: return True else: return False except Exception as e: self.error = str(e)

最后

以上就是爱听歌猫咪最近收集整理的关于ETCD Restful Client V3的全部内容,更多相关ETCD内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部