概述
karbor 是openstack中的数据保护项目。对于数据备份最重要的当然是备份,其次重要的就是可以设置定时和周期进行备份。karbor进行周期备份主要按以下步骤进行:
- 创建一个plan------指名要保护的对象和使用的provider
- 创建一个triger------一个时间相关的触发器,定义了从什么时候开始备份,以及备份的时间间隔
- 执行schedule operation------就是用上面的triger来执行定义好的plan
本文档就分析以下karbor 是如何实现定时备份的,主要从创建的triger是如何实现定时以及调度是如何使用triger来定时执行plan 的。
Schedule operation
在horizon中或者命令行中可以对指定的plan进行调度。调度时发出的http 请求如下面形式:
Create a scheduled operation, reequest body: {u'scheduled_operation': {u'operation_type': u'protect', u'name': u'test', u'trigger_id': u'854eda55-d39f-457e-8786-a47ef0b23a39', u'operation_definition': {u'provider_id': u'cf56bd3e-97a7-4078-b6d5-f36246333fd9', u'plan_id': u'31bdb8af-0b47-41e3-a762-5f1ac9f0ad62'}}}create from (pid=17244) /opt/stack/karbor/karbor/api/v1/scheduled_operations.py:89
从上面的内容可以看出请求发送到了/karbor/karbor/api/v1/scheduled_operations.py的中的create函数。从该函数一直往下追踪到/home/kele/Code/openstack/karbor/karbor/services/operationengine/manager.py:
OperationEngineManager::create_scheduled_operation,该函数的代码内容如下:
def create_scheduled_operation(self, context, operation):
LOG.debug("Create scheduled operation.")
self.operation_manager.check_operation_definition(
operation.operation_type,
operation.operation_definition,
)
# 1.注册operation到trigger中
self.trigger_manager.register_operation(operation.trigger_id,
operation.id)
trust_id = self.user_trust_manager.add_operation(
context, operation.id)
#2. 创建 ScheduledOperationState 记录
state_info = {
"operation_id": operation.id,
"service_id": self._service_id,
"trust_id": trust_id,
"state": constants.OPERATION_STATE_REGISTERED
}
operation_state = objects.ScheduledOperationState(
context, **state_info)
try:
operation_state.create()
except Exception:
self.trigger_manager.unregister_operation(
operation.trigger_id, operation.id)
raise
上面的函数主要完成两件事:
- 注册operation到trigger中,从http 请求的body中我们可以看到operation所包含的内容:trigger_id, plan_id, provider_id等。
- 创建ScheduledOperationState的记录
Trigger
在schedule_operation中将operation注册到trigger中,在trigger中完成了主要的工作。下面看一下trigger中时如何完成工作的。首先从上面的registe_operation开始,这个函数时trigger的一个方法,这里我们以time_trigger为例。TimeTrigger中该函数的内容如下:
def register_operation(self, operation_id, **kwargs):
if operation_id in self._operation_ids:
msg = (_("The operation_id(%s) is exist") % operation_id)
raise exception.ScheduledOperationExist(msg)
if self._greenthread and not self._greenthread.running:
raise exception.TriggerIsInvalid(trigger_id=self._id)
self._operation_ids.add(operation_id)
#对于每一个trigger实例都会创建一个greenthread
if self._greenthread is None:
self._start_greenthread()
def _start_greenthread(self):
# Find the first time.
# We don't known when using this trigger first time.
timer = self._get_timer(self._trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), self._trigger_property['end_time'], timer)
if not first_run_time:
raise exception.TriggerIsInvalid(trigger_id=self._id)
self._create_green_thread(first_run_time, timer)
def _create_green_thread(self, first_run_time, timer):
func = functools.partial(
self._trigger_operations,
trigger_property=self._trigger_property.copy(),
timer=timer)
self._greenthread = TriggerOperationGreenThread(
first_run_time, func)
上面的代码最后调用了TriggerOperationGreenThread实例化了一个绿色线程,主要的逻辑都在里面完成:
class TriggerOperationGreenThread(object):
def __init__(self, first_run_time, function):
super(TriggerOperationGreenThread, self).__init__()
self._is_sleeping = True
self._pre_run_time = None
self._running = False
self._thread = None
self._function = function
self._start(first_run_time)
def _start(self, first_run_time):
self._running = True
now = datetime.utcnow()
#计算创建线程的时间
initial_delay = 0 if first_run_time <= now else (
int(timeutils.delta_seconds(now, first_run_time)))
#调用eventlet的spawn_after在上面计算出的时间之后启动一个线程
self._thread = eventlet.spawn_after(
initial_delay, self._run, first_run_time)
self._thread.link(self._on_done)
def _run(self, expect_run_time):
while self._running:
self._is_sleeping = False
self._pre_run_time = expect_run_time
#执行保护操作,并返回下次期望执行的时间
expect_run_time = self._function(expect_run_time)
if expect_run_time is None or not self._running:
break
self._is_sleeping = True
now = datetime.utcnow()
#计算到下次执行所需要的时间
idle_time = 0 if expect_run_time <= now else int(
timeutils.delta_seconds(now, expect_run_time))
eventlet.sleep(idle_time)
上面的的代码逻辑中首先创建一个线程,在线程中循环执行保护任务。每一次执行完任务之后睡眠一段时间,而这个时间间隔是这次执行到下次执行时间之间的间隔。这就是karbor中定时时行保护计划的逻辑。数据保护具体流程这里不做分析。
最后
以上就是狂野招牌为你收集整理的karbor 中定时备份代码逻辑分析Schedule operationTrigger的全部内容,希望文章能够帮你解决karbor 中定时备份代码逻辑分析Schedule operationTrigger所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复