概述
运行环境中需要支持python2.6 2.7环境的etcdclient,etcd版本为3.4,采用社区的包,由于会引用grpc包,导致import包很慢,测试中发现就引入包用了0.6-0.8S,所要开发的代码是一个实时采集程序,所以决定使用etcd开放的restful api,自己封装一个client,目前只实现了部分接口
代码如下,记录下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/8/10 17:05
# @Author : fly
# @File : etcd_client.py
# @Version : 0.1
import requests
import base64
import json
class Etcd3Client():
def __init__(self,host,port,user,passwd,timeout=2000):
self.__version__ = 'v3.4_1'
self.host = host
self.port = port
self.user = user
self.passwd = passwd
self.timeout = timeout
self.error = ""
def GetToken(self):
try:
url = "http://{0}:{1}/v3/auth/authenticate".format(self.host, self.port)
params = {
"name": self.user,
"password": self.passwd
}
res = self.conn.post(url=url, data=json.dumps(params), timeout=self.timeout)
if res.status_code == 200:
self.token = res.json()['token']
except Exception as e:
self.error = str(e)
def _enbase64(self,value):
return base64.b64encode(value)
def _debase64(self,value):
return base64.b64decode(value)
def Connect(self):
try:
self.conn = requests.session()
self.GetToken()
except Exception as e:
self.error = str(e)
def Close(self):
self.conn.close()
def increment_last_byte(self,byte_string):
s = bytearray(byte_string)
s[-1] = s[-1] + 1
return bytes(s)
def to_bytes(self,maybe_bytestring):
if isinstance(maybe_bytestring, bytes):
return maybe_bytestring
else:
return maybe_bytestring.encode('utf-8')
def lease_to_id(self,lease):
"""Figure out if the argument is a Lease object, or the lease ID."""
lease_id = 0
if hasattr(lease, 'id'):
lease_id = lease.id
else:
try:
lease_id = int(lease)
except TypeError:
pass
return lease_id
def _get_range(self, key,range_end=None,limit=None,revision=None,sort_order=None,sort_target='key',serializable=False,keys_only=False,count_only=None,min_mod_revision=None,max_mod_revision=None,min_create_revision=None,max_create_revision=None):
try:
url = "http://{0}:{1}/v3/kv/range".format(self.host, self.port)
headers = {
"Authorization": self.token,
"Connection":'keep-alive'
}
params = {}
key = self._enbase64(key)
params['key'] = key
params['keys_only'] = keys_only
if range_end is not None:
params['range_end'] = self._enbase64(range_end)
if sort_order is None:
params['sort_order'] = 'NONE'
elif sort_order == 'ascend':
params['sort_order'] = 'ASCEND'
elif sort_order == 'descend':
params['sort_order'] = 'DESCEND '
if sort_target is None or sort_target == 'key':
params['sort_target'] = 'KEY'
elif sort_target == 'version':
params['sort_target'] = 'VERSION '
elif sort_target == 'create':
params['sort_target'] = 'CREATE '
elif sort_target == 'mod':
params['sort_target'] = 'MOD '
elif sort_target == 'value':
params['sort_target'] = 'VALUE '
params['limit'] = limit
params['revision'] = revision
params['serializable'] = serializable
params['count_only'] = count_only
params['min_mod_revision'] = min_mod_revision
params['max_mod_revision'] = max_mod_revision
params['min_create_revision'] = min_create_revision
params['max_create_revision'] = max_create_revision
data = self.conn.post(url=url, headers=headers, data=json.dumps(params), timeout=self.timeout)
return data
except Exception as e:
self.error = str(e)
def get_response(self,key,serializable=False):
return self._get_range(key=key,serializable=serializable)
def get(self, key, **kwargs):
try:
resp = self.get_response(key=key,**kwargs)
if resp.status_code == 200:
data = resp.json()
if data['count'] < 1:
return (True, None)
else:
return (True,self._debase64(data['kvs'][0]['value']))
else:
return (False,None)
except Exception as e:
self.error = str(e)
def get_prefix_response(self,key_prefix, sort_order=None,sort_target='key', keys_only=False):
return self._get_range(key=key_prefix,range_end=self.increment_last_byte(self.to_bytes(key_prefix)),sort_order=sort_order,sort_target=sort_target,keys_only=keys_only)
def get_prefix(self,key_prefix,**kwargs):
try:
resp = self.get_prefix_response(key_prefix=key_prefix,**kwargs)
if resp.status_code==200:
data = resp.json()
if not data.has_key('count'):
return (0,None)
else:
res = []
for i in data['kvs']:
item={}
if i.has_key('value'):
item['key'] = self._debase64(i['key'])
item['value'] = self._debase64(i['value'])
res.append(item)
else:
res.append(self._debase64(i['key']))
return (True,res)
else:
return (False,None)
except Exception as e:
self.error = str(e)
def put(self,key, value, lease=None, prev_kv=False):
try:
url = "http://{0}:{1}/v3/kv/put".format(self.host, self.port)
headers = {
"Authorization": self.token,
"Connection": 'keep-alive'
}
params = {
"key":self._enbase64(key),
"value":self._enbase64(value),
"lease":self.lease_to_id(lease),
"prev_kv":prev_kv
}
resp = self.conn.post(url=url, headers=headers, data=json.dumps(params), timeout=self.timeout)
if resp.status_code == 200:
return True
else:
return False
except Exception as e:
self.error = str(e)
最后
以上就是爱听歌猫咪为你收集整理的ETCD Restful Client V3的全部内容,希望文章能够帮你解决ETCD Restful Client V3所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复