概述
openstack 之 ceilometer: Notification
注:本文以ceilometer stable/kilo代码为例分析
ceilometer Notification实现架构
ceilometer Notification即是从消息队列中获取消息并将获取的消息转换成event事件(部分Notification生成sample)。ceilometer获取并存储Notification的结构图如下 :
ceilometer获取的消息格式
通常,从消息队列中获取的消息格式如下:
{
'_context_roles': ['Member'],
'_context_request_id': 'req-ee35abb9-8bfd-48fa-9213',
'_context_quota_class': None,
'event_type': u'compute.instance.create.start',
'_context_service_catalog': [{
'endpoint': [{
'adminURL': 'http://127.0.0.1:8774/v2/tenant_id',
'region': 'RegionOne',
'id': '28d096fd6bf545e3b5de8a482de06d2d',
'internalURL': 'http://127.0.0.1:8774/v2/tenant_id',
'publicURL': 'http://127.0.0.1:8774/v2/tenant_id'
}],
'endpoints_links': [],
'type': 'compute',
'name': u'nova'}
}],
'timestamp': '2015-08-20 10:50:14.930752',
'_context_user': '329d6d053f83464a9f02e1cc9c560371',
'_context_auth_token': '1d05041c7b434f9c0ea95dccc',
'_context_user_id': '329d6d053f83464a9f02e1cc9c560371',
'payload': {},
'_context_read_deleted': 'no',
'_context_tenant': '079db7f7184c49ac94a5fb05a994904a',
'_context_is_admin': False,
'_context_project_id': '079db7f7184c49ac94a5fb05a994904a',
'_context_timestamp': '2015-08-20T10:50:14.385672',
'publisher_id': 'aabbcc.ubuntu',
'message_id': '581f2ad8-40d7-468b-a692-733127ded3d4',
'_context_remote_address': u'127.0.0.1'
}
消息的主图内容存放在payload中,一般从payload中可以取到消息对应的资源的一些信息。
ceilometer notification plugin
在ceilometer/agent/plugin_base.py中定义了Notification plugin的基础类,代码如下:
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, transporter):
super(NotificationBase, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = False if hasattr(transporter, 'publisher') else True
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings.
Strings are defining the event types to be given to this plugin.
"""
def get_targets(self, conf):
"""Return a sequence of oslo.messaging.Target.
Sequence is defining the exchange and topics to be connected for this
plugin.
:param conf: Configuration.
"""
# TODO(sileht): Backwards compatibility, remove in J+2
if hasattr(self, 'get_exchange_topics'):
LOG.warn(_('get_exchange_topics API of NotificationPlugin is'
'deprecated, implements get_targets instead.'))
targets = []
for exchange, topics in self.get_exchange_topics(conf):
targets.extend(oslo_messaging.Target(topic=topic,
exchange=exchange)
for topic in topics)
return targets
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.
:param message: Message to process.
"""
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
"""
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
def to_samples_and_publish(self, context, notification):
"""Return samples produced by *process_notification*.
Samples produced for the given notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
if self.requeue:
meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
for sample in self.process_notification(notification)
]
for notifier in self.transporter:
notifier.sample(context.to_dict(),
event_type='ceilometer.pipeline',
payload=meters)
else:
with self.transporter.publisher(context) as p:
p(list(self.process_notification(notification)))
基本上,只需要继承这个类,然后实现event_types、process_notification以及get_targets方法即可。event_types定义了过滤的事件类型,通过fnmatch匹配;
process_notification即为将notification处理成sample的格式;get_targets设置连接消息队列;
下面以compue为例介绍,ceilometer/compute/notifications/__init__py中定义如下:
class ComputeNotificationBase(plugin_base.NotificationBase):
@staticmethod
def get_targets(conf):
"""Return a sequence of oslo.messaging.Target
This sequence is defining the exchange and topics to be connected for
this plugin.
"""
return [oslo.messaging.Target(topic=topic,
exchange=conf.nova_control_exchange)
for topic in conf.notification_topics]
此处实现了get_targets方法,即设置nova中定义的的exchange并监听相关的设置队列。ceilometer/compute/notifications/instance.py中定义了具体的event_types、process_notification,代码实现如下
@six.add_metaclass(abc.ABCMeta)
class UserMetadataAwareInstanceNotificationBase(
notifications.ComputeNotificationBase):
"""Consumes notifications containing instance user metadata."""
def process_notification(self, message):
instance_properties = self.get_instance_properties(message)
if isinstance(instance_properties.get('metadata'), dict):
src_metadata = instance_properties['metadata']
del instance_properties['metadata']
util.add_reserved_user_metadata(src_metadata, instance_properties)
return self.get_sample(message)
def get_instance_properties(self, message):
"""Retrieve instance properties from notification payload."""
return message['payload']
@abc.abstractmethod
def get_sample(self, message):
"""Derive sample from notification payload."""
class InstanceScheduled(UserMetadataAwareInstanceNotificationBase,
plugin_base.NonMetricNotificationBase):
event_types = ['scheduler.run_instance.scheduled']
def get_instance_properties(self, message):
"""Retrieve instance properties from notification payload."""
return message['payload']['request_spec']['instance_properties']
def get_sample(self, message):
yield sample.Sample.from_notification(
name='instance.scheduled',
type=sample.TYPE_DELTA,
volume=1,
unit='instance',
user_id=None,
project_id=message['payload']['request_spec']
['instance_properties']['project_id'],
resource_id=message['payload']['instance_id'],
message=message)
class ComputeInstanceNotificationBase(
UserMetadataAwareInstanceNotificationBase):
"""Convert compute.instance.* notifications into Samples."""
event_types = ['compute.instance.*']
除了要获取从消息队列中获取Notification存储外,还会通过event_types过滤出部分的事件,通过process_notification实现Notification->Smaple的转换。
ceilometer traits处理
在获取了notification之后,怎么处理这些event事件呢?ceilometer中提供了一个event_definations.yaml配置文件,在这个配置文件中,可以将Notification转换成按照event_definations.yaml中定义的格式,以compute事件为例,如下:
---
- event_type: compute.instance.*
traits: &instance_traits
tenant_id:
fields: payload.tenant_id
user_id:
fields: payload.user_id
instance_id:
fields: payload.instance_id
host:
fields: publisher_id
plugin:
name: split
parameters:
segment: 1
max_split: 1
service:
fields: publisher_id
plugin: split
memory_mb:
type: int
fields: payload.memory_mb
disk_gb:
type: int
fields: payload.disk_gb
root_gb:
type: int
fields: payload.root_gb
ephemeral_gb:
type: int
fields: payload.ephemeral_gb
vcpus:
type: int
fields: payload.vcpus
instance_type_id:
type: int
fields: payload.instance_type_id
instance_type:
fields: payload.instance_type
state:
fields: payload.state
os_architecture:
fields: payload.image_meta.'org.openstack__1__architecture'
os_version:
fields: payload.image_meta.'org.openstack__1__os_version'
os_distro:
fields: payload.image_meta.'org.openstack__1__os_distro'
launched_at:
type: datetime
fields: payload.launched_at
deleted_at:
type: datetime
fields: payload.deleted_at
- event_type: compute.instance.exists
traits:
<<: *instance_traits
audit_period_beginning:
type: datetime
fields: payload.audit_period_beginning
audit_period_ending:
type: datetime
fields: payload.audit_period_ending
event_type定义了符合匹配条件的event事件,traits定义从Notification中转换的数据。其中&resource_traits定义基础的traits项,<<: *resource_traits定义的是附加的traits项。
每个trait项由type、fields、plugin组成。type为可选项,默认是str,有datetime,int等可选;fields项,为从Notification获取的值,可以是数组;plugin项为调用已定义好的traits处理函数;
ceilometer Event格式及存储
将Notification根据event_definations.yaml处理完成后,生成的数据被定义成Event,格式如下:
{
message_id: “XXX”,
event_type: “compute.instance.create.end”,
timestamp: “yyyy-MM-dd mm-hh-ss”,
traits: [{key: ‘key’, value: ‘value’, type: ‘type’}……]
}
Event查询
Ceilometer提供了Event事件查询的API接口,可以根据start_timestamp/end_timestamp,event_type/message_id查询Event事件。
如:
curl -X GET -H 'X-Auth-Token: ******' http://127.0.0.1:8774/v2/events?q.field=event_type&q.field=compute.instance.create.end
但实时上,event的这种存储方式比较的难用,如果想查询某个项目下的所有compute相关事件,实时上是非常难查询的,所以本人在实际开发中对event事件做了一些变动,以满足实际中按项目查询的需求。
最后
以上就是阔达哑铃为你收集整理的openstack 之 ceilometer: Notificationopenstack 之 ceilometer: Notification的全部内容,希望文章能够帮你解决openstack 之 ceilometer: Notificationopenstack 之 ceilometer: Notification所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复