我是靠谱客的博主 阔达哑铃,最近开发中收集的这篇文章主要介绍openstack 之 ceilometer: Notificationopenstack 之 ceilometer: Notification,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

openstack 之 ceilometer: Notification

注:本文以ceilometer stable/kilo代码为例分析

ceilometer Notification实现架构

ceilometer Notification即是从消息队列中获取消息并将获取的消息转换成event事件(部分Notification生成sample)。ceilometer获取并存储Notification的结构图如下 :
这里写图片描述

ceilometer获取的消息格式

通常,从消息队列中获取的消息格式如下:

{
  '_context_roles': ['Member'],
  '_context_request_id': 'req-ee35abb9-8bfd-48fa-9213',
  '_context_quota_class': None,
  'event_type': u'compute.instance.create.start',
  '_context_service_catalog': [{
    'endpoint': [{
      'adminURL': 'http://127.0.0.1:8774/v2/tenant_id',
      'region': 'RegionOne', 
      'id': '28d096fd6bf545e3b5de8a482de06d2d', 
      'internalURL': 'http://127.0.0.1:8774/v2/tenant_id',
      'publicURL': 'http://127.0.0.1:8774/v2/tenant_id'
    }], 
    'endpoints_links': [],
    'type': 'compute', 
    'name': u'nova'}
  }],
  'timestamp': '2015-08-20 10:50:14.930752', 
  '_context_user': '329d6d053f83464a9f02e1cc9c560371', 
  '_context_auth_token': '1d05041c7b434f9c0ea95dccc',
  '_context_user_id': '329d6d053f83464a9f02e1cc9c560371',  
  'payload': {}, 
  '_context_read_deleted': 'no',
  '_context_tenant': '079db7f7184c49ac94a5fb05a994904a', 
  '_context_is_admin': False,
  '_context_project_id': '079db7f7184c49ac94a5fb05a994904a', 
  '_context_timestamp':  '2015-08-20T10:50:14.385672',
  'publisher_id': 'aabbcc.ubuntu',
  'message_id': '581f2ad8-40d7-468b-a692-733127ded3d4', 
  '_context_remote_address': u'127.0.0.1'
}

消息的主图内容存放在payload中,一般从payload中可以取到消息对应的资源的一些信息。

ceilometer notification plugin

在ceilometer/agent/plugin_base.py中定义了Notification plugin的基础类,代码如下:

@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
    """Base class for plugins that support the notification API."""
    def __init__(self, transporter):
        super(NotificationBase, self).__init__()
        # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
        # messages to an endpoint.
        self.filter_rule = oslo_messaging.NotificationFilter(
            event_type='|'.join(self.event_types))
        self.transporter = transporter
        # NOTE(gordc): if no publisher, this isn't a PipelineManager and
        # data should be requeued.
        self.requeue = False if hasattr(transporter, 'publisher') else True

    @abc.abstractproperty
    def event_types(self):
        """Return a sequence of strings.
        Strings are defining the event types to be given to this plugin.
        """

    def get_targets(self, conf):
        """Return a sequence of oslo.messaging.Target.
        Sequence is defining the exchange and topics to be connected for this
        plugin.
        :param conf: Configuration.
        """

        # TODO(sileht): Backwards compatibility, remove in J+2
        if hasattr(self, 'get_exchange_topics'):
            LOG.warn(_('get_exchange_topics API of NotificationPlugin is'
                       'deprecated, implements get_targets instead.'))

            targets = []
            for exchange, topics in self.get_exchange_topics(conf):
                targets.extend(oslo_messaging.Target(topic=topic,
                                                     exchange=exchange)
                               for topic in topics)
            return targets

    @abc.abstractmethod
    def process_notification(self, message):
        """Return a sequence of Counter instances for the given message.
        :param message: Message to process.
        """

    def info(self, ctxt, publisher_id, event_type, payload, metadata):
        """RPC endpoint for notification messages
        When another service sends a notification over the message
        bus, this method receives it.
        :param ctxt: oslo.messaging context
        :param publisher_id: publisher of the notification
        :param event_type: type of notification
        :param payload: notification payload
        :param metadata: metadata about the notification
        """
        notification = messaging.convert_to_old_notification_format(
            'info', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)

    def to_samples_and_publish(self, context, notification):
        """Return samples produced by *process_notification*.
        Samples produced for the given notification.
        :param context: Execution context from the service or RPC call
        :param notification: The notification to process.
        """
        if self.requeue:
            meters = [
                utils.meter_message_from_counter(
                    sample, cfg.CONF.publisher.telemetry_secret)
                for sample in self.process_notification(notification)
            ]
            for notifier in self.transporter:
                notifier.sample(context.to_dict(),
                                event_type='ceilometer.pipeline',
                                payload=meters)
        else:
            with self.transporter.publisher(context) as p:
                p(list(self.process_notification(notification)))

基本上,只需要继承这个类,然后实现event_types、process_notification以及get_targets方法即可。event_types定义了过滤的事件类型,通过fnmatch匹配;
process_notification即为将notification处理成sample的格式;get_targets设置连接消息队列;
下面以compue为例介绍,ceilometer/compute/notifications/__init__py中定义如下:

class ComputeNotificationBase(plugin_base.NotificationBase):
    @staticmethod
    def get_targets(conf):
        """Return a sequence of oslo.messaging.Target
        This sequence is defining the exchange and topics to be connected for
        this plugin.
        """
        return [oslo.messaging.Target(topic=topic,
                                      exchange=conf.nova_control_exchange)
                for topic in conf.notification_topics]

此处实现了get_targets方法,即设置nova中定义的的exchange并监听相关的设置队列。ceilometer/compute/notifications/instance.py中定义了具体的event_types、process_notification,代码实现如下

@six.add_metaclass(abc.ABCMeta)
class UserMetadataAwareInstanceNotificationBase(
        notifications.ComputeNotificationBase):
    """Consumes notifications containing instance user metadata."""

    def process_notification(self, message):
        instance_properties = self.get_instance_properties(message)
        if isinstance(instance_properties.get('metadata'), dict):
            src_metadata = instance_properties['metadata']
            del instance_properties['metadata']
            util.add_reserved_user_metadata(src_metadata, instance_properties)
        return self.get_sample(message)

    def get_instance_properties(self, message):
        """Retrieve instance properties from notification payload."""
        return message['payload']

    @abc.abstractmethod
    def get_sample(self, message):
        """Derive sample from notification payload."""


class InstanceScheduled(UserMetadataAwareInstanceNotificationBase,
                        plugin_base.NonMetricNotificationBase):
    event_types = ['scheduler.run_instance.scheduled']

    def get_instance_properties(self, message):
        """Retrieve instance properties from notification payload."""
        return message['payload']['request_spec']['instance_properties']

    def get_sample(self, message):
        yield sample.Sample.from_notification(
            name='instance.scheduled',
            type=sample.TYPE_DELTA,
            volume=1,
            unit='instance',
            user_id=None,
            project_id=message['payload']['request_spec']
                              ['instance_properties']['project_id'],
            resource_id=message['payload']['instance_id'],
            message=message)


class ComputeInstanceNotificationBase(
        UserMetadataAwareInstanceNotificationBase):
    """Convert compute.instance.* notifications into Samples."""
    event_types = ['compute.instance.*']

除了要获取从消息队列中获取Notification存储外,还会通过event_types过滤出部分的事件,通过process_notification实现Notification->Smaple的转换。

ceilometer traits处理

在获取了notification之后,怎么处理这些event事件呢?ceilometer中提供了一个event_definations.yaml配置文件,在这个配置文件中,可以将Notification转换成按照event_definations.yaml中定义的格式,以compute事件为例,如下:

---
- event_type: compute.instance.*
  traits: &instance_traits
    tenant_id:
      fields: payload.tenant_id
    user_id:
      fields: payload.user_id
    instance_id:
      fields: payload.instance_id
    host:
      fields: publisher_id
      plugin:
        name: split
        parameters:
          segment: 1
          max_split: 1
    service:
      fields: publisher_id
      plugin: split
    memory_mb:
      type: int
      fields: payload.memory_mb
    disk_gb:
      type: int
      fields: payload.disk_gb
    root_gb:
      type: int
      fields: payload.root_gb
    ephemeral_gb:
      type: int
      fields: payload.ephemeral_gb
    vcpus:
      type: int
      fields: payload.vcpus
    instance_type_id:
      type: int
      fields: payload.instance_type_id
    instance_type:
      fields: payload.instance_type
    state:
      fields: payload.state
    os_architecture:
      fields: payload.image_meta.'org.openstack__1__architecture'
    os_version:
      fields: payload.image_meta.'org.openstack__1__os_version'
    os_distro:
      fields: payload.image_meta.'org.openstack__1__os_distro'
    launched_at:
      type: datetime
      fields: payload.launched_at
    deleted_at:
      type: datetime
      fields: payload.deleted_at
- event_type: compute.instance.exists
  traits:
    <<: *instance_traits
    audit_period_beginning:
      type: datetime
      fields: payload.audit_period_beginning
    audit_period_ending:
      type: datetime
      fields: payload.audit_period_ending

event_type定义了符合匹配条件的event事件,traits定义从Notification中转换的数据。其中&resource_traits定义基础的traits项,<<: *resource_traits定义的是附加的traits项。
每个trait项由type、fields、plugin组成。type为可选项,默认是str,有datetime,int等可选;fields项,为从Notification获取的值,可以是数组;plugin项为调用已定义好的traits处理函数;

ceilometer Event格式及存储

将Notification根据event_definations.yaml处理完成后,生成的数据被定义成Event,格式如下:

{
  message_id: “XXX”, 
  event_type: “compute.instance.create.end”,
  timestamp: “yyyy-MM-dd mm-hh-ss”,
  traits: [{key: ‘key’, value: ‘value’, type: ‘type’}……]
}

Event查询

Ceilometer提供了Event事件查询的API接口,可以根据start_timestamp/end_timestamp,event_type/message_id查询Event事件。
如:

curl -X GET -H 'X-Auth-Token: ******' http://127.0.0.1:8774/v2/events?q.field=event_type&q.field=compute.instance.create.end

但实时上,event的这种存储方式比较的难用,如果想查询某个项目下的所有compute相关事件,实时上是非常难查询的,所以本人在实际开发中对event事件做了一些变动,以满足实际中按项目查询的需求。

最后

以上就是阔达哑铃为你收集整理的openstack 之 ceilometer: Notificationopenstack 之 ceilometer: Notification的全部内容,希望文章能够帮你解决openstack 之 ceilometer: Notificationopenstack 之 ceilometer: Notification所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部