我是靠谱客的博主 花痴火,最近开发中收集的这篇文章主要介绍opengauss源码分析—指标采集预测与分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

              

一.指标采集、预测与异常检测简介

数据库指标监控与异常检测技术,通过监控数据库指标,并基于时序预测和异常检测等算法,发现异常信息,进而提醒用户采取措施避免异常情况造成严重后果。

二,使用场景

用户操作数据库的某些行为或某些正在运行的业务发生了变化,都可能会导致数据库产生异常,如果不及时发现并处理这些异常,可能会导致严重的后果。通常,数据库监控指标(metric,如CPU使用率、QPS等)能够反映出数据库系统的健康状况。通过对数据库指标进行监控,分析指标数据特征或变化趋势等信息,可及时发现数据库异常状况,并及时将告警信息推送给运维管理人员,从而避免造成损失。

实现原理

指标采集、预测与异常检测是通过同一套系统实现的,在openGauss项目中名为Anomaly-Detection,它的结构如图8-14所示。该工具主要可以分为Agent和Detector两部分,其中Agent是数据库代理模块,负责收集数据库指标数据并将数据推送到Detector;Detector是数据库异常检测与分析模块,该模块主要有三个作用。

(1) 收集Agent端采集的数据并进行转储。

(2) 对收集到的数据进行特征分析与异常检测。

(3) 将检测出来的异常信息推送给运维管理人员。

1. Agent模块的组成

Agent模块负责采集并发送指标数据,该模块由DBSource、MemoryChannel、HttpSink三个子模块组成。

(1) DBSource作为数据源,负责定期去收集数据库指标数据并将数据发送到数据通道MemoryChannel中。

(2) MemoryChannel是内存数据通道,本质是一个FIFO队列,用于数据缓存。HttpSink组件消费MemoryChannel中的数据,为了防止MemoryChannel中的数据过多导致OOM(out of Memory,内存溢出),设置了容量上限,当超过容量上限时,过多的元素会被禁止放入队列中。

(3) HttpSink是数据汇聚点,该模块定期从MemoryChannel中获取数据,并以Http(s)的方式将数据进行转发,数据读取之后从MemoryChannel中清除。

2. Detector 模块组成

Detector模块负责数据检测,该模块由Server、Monitor两个子模块组成。

(1) Server是一个Web服务,为Agent采集到的数据提供接收接口,并将数据存储到本地数据库内部,为了避免数据增多导致数据库占用太多的资源,我们将数据库中的每个表都设置了行数上限。

(2) Monitor模块包含时序预测和异常检测等算法,该模块定期从本地数据库中获取数据库指标数据,并基于现有算法对数据进行预测与分析,如果算法检测出数据库指标在历史或未来某时间段或时刻出现异常,则会及时的将信息推送给用户。

8.5.3 关键源码解析

1 总体流程解析

智能索引推荐工具的路径是openGauss-server/src/gausskernel/dbmind/tools/anomaly_detection,下面的代码详细展示了程序的入口。

def forecast(args):

    …

    # 如果没有指定预测方式,则默认使用’auto_arima’算法

    if not args.forecast_method:

        forecast_alg = get_instance('auto_arima')

    else:

        forecast_alg = get_instance(args.forecast_method)

    # 指标预测功能函数

    def forecast_metric(name, train_ts, save_path=None):

        …

            forecast_alg.fit(timeseries=train_ts)

            dates, values = forecast_alg.forecast(

                period=TimeString(args.forecast_periods).standard)

        date_range = "{start_date}~{end_date}".format(start_date=dates[0],

                                               end_date=dates[-1])

        display_table.add_row(

            [name, date_range, min(values), max(values), sum(values) / len(values)]

        )

# 校验存储路径

        if save_path:

            if not os.path.exists(os.path.dirname(save_path)):

                os.makedirs(os.path.dirname(save_path))

            with open(save_path, mode='w') as f:

                for date, value in zip(dates, values):

                    f.write(date + ',' + str(value) + 'n')

    # 从本地sqlite中抽取需要的数据

    with sqlite_storage.SQLiteStorage(database_path) as db:

        if args.metric_name:

            timeseries = db.get_timeseries(table=args.metric_name, period=max_rows)

            forecast_metric(args.metric_name, timeseries, args.save_path)

        else:

# 获取sqlite中所有的表名

            tables = db.get_all_tables()

            # 从每个表中抽取训练数据进行预测

for table in tables:

                timeseries = db.get_timeseries(table=table, period=max_rows)

                forecast_metric(table, timeseries)

# 输出结果

    print(display_table.get_string())

# 代码远程部署

def deploy(args):

    print('Please input the password of {user}@{host}: '.format(user=args.user, host=args.host))

# 格式化代码远程部署指令

    command = 'sh start.sh --deploy {host} {user} {project_path}'

        .format(user=args.user,

                host=args.host,

                project_path=args.project_path)

    # 判断指令执行情况

if subprocess.call(shlex.split(command), cwd=SBIN_PATH) == 0:

        print("nExecute successfully.")

    else:

        print("nExecute unsuccessfully.")

# 展示当前监控的参数

def show_metrics():

# 项目总入口

def main():

   …

2. DBSource作为数据源,负责定期去收集数据库指标数据并将数据发送到数据通道MemoryChannel中。

3. MemoryChannel是内存数据通道,本质是一个FIFO队列,用于数据缓存。HttpSink组件消费MemoryChannel中的数据,为了防止MemoryChannel中的数据过多导致OOM(out of Memory,内存溢出),设置了容量上限,当超过容量上限时,过多的元素会被禁止放入队列中。

4. HttpSink是数据汇聚点,该模块定期从MemoryChannel中获取数据,并以Http(s)的方式将数据进行转发,数据读取之后从MemoryChannel中清除。

5. Monitor中datahandler

类,该类用于连接数据库并通过时间戳或数字获取数据

6. Monitor中detecor类,该类用于检测预测器预测的结果是否超出预期,如果超出预期,就会在日志文件中产生一个警告

7.Monitor中

Forecast类,通过调用fit()方法,即可根据历史时序数据进行训练,通过预测算法,预测未来走势。

8. 如果没有指定预测算法,就会默认使用auto_arima算法

9. Monitor中metric_monitor文件,

由4个函数构成,前三个函数用于检查各参数是否错误,最后一个用于监测数据库

1.检查监控任务所需的参数,如预测算法和数据库路径

2.检查参数的检测基础

3.检查可选参数

4.根据配置参数检测数据库的指标

10. Monitor类,用来监测数据库的多个指标

2. 关键代码段解析

1) 后台线程的实现。

前面已经介绍过了,本功能可以分为三个角色:Agent、Monitor以及Detector,这三个不同的角色都是常驻后台的进程,各自执行着不同的任务。Daemon类就是负责运行不同业务流程的容器类,下面介绍该类的实现。

class Daemon:

    """

    This class implements the function of running a process in the background."""

   

    def __init__(self):

        …

def daemon_process(self):

        # 注册退出函数

        atexit.register(lambda: os.remove(self.pid_file))

        signal.signal(signal.SIGTERM, handle_sigterm)

# 启动进程

    @staticmethod

    def start(self):

        try:

            self.daemon_process()

        except RuntimeError as msg:

            abnormal_exit(msg)

        self.function(*self.args, **self.kwargs)

    # 停止进程

    def stop(self):

        if not os.path.exists(self.pid_file):

            abnormal_exit("Process not running.")

        read_pid = read_pid_file(self.pid_file)

        if read_pid > 0:

            os.kill(read_pid, signal.SIGTERM)

        if read_pid_file(self.pid_file) < 0:

            os.remove(self.pid_file)

(2) 数据库相关指标采集过程。

数据库的指标采集架构,参考了Apache Flume的设计。将一个完整的信息采集流程拆分为三个部分,分别是Source、Channel以及Sink。上述三个部分被抽象为三个不同的基类,由此可以派生出不同的采集数据源、缓存管道以及数据的接收端。

前文提到过的DBSource即派生自Source、MemoryChannel派生自Channel,HttpSink则派生自Sink。下面这段代码来自metric_agent.py,负责采集指标,在这里将上述模块串联起来了。

def agent_main():

    # 初始化通道管理器

cm = ChannelManager()

# 初始化数据源

    source = DBSource()

    http_sink = HttpSink(interval=params['sink_timer_interval'], url=url, context=context)

    source.channel_manager = cm

    http_sink.channel_manager = cm

    # 获取参数文件里面的功能函数

    for task_name, task_func in get_funcs(metric_task):

        source.add_task(name=task_name,

                        interval=params['source_timer_interval'],

                        task=task_func,

                        maxsize=params['channel_capacity'])

    source.start()

    http_sink.start()

(3) 数据存储与监控部分的实现。

Agent将采集到的指标数据发送到Detector服务器上,并由Detector服务器负责存储。Monitor不断对存储的数据进行检查,以便提前发现异常。

这里实现了一种通过SQLite进行本地化存储的方式,代码位于sqlite_storage.py文件中,实现的类为SQLiteStorage,该类实现的主要方法如下:

# 通过时间戳获取最近一段时间的数据

def select_timeseries_by_timestamp(self, table, period):

# 通过编号获取最近一段时间的数据

def select_timeseries_by_number(self, table, number):

     …

其中,由于不同指标数据是分表存储的,因此上述参数table也代表了不同指标的名称。

异常检测当前主要支持基于时序预测的方法,包括Prophet算法(由Facebook开源的工业级时序预测算法工具)和ARIMA算法,他们分别被封装成类,供Forecaster调用。

上述时序检测的算法类都继承了AlgModel类,该类的结构如下:

class AlgModel(object):

    """

    This is the base class for forecasting algorithms.

    If we want to use our own forecast algorithm, we should follow some rules.

    """

    def __init__(self):

        pass

    @abstractmethod

    def fit(self, timeseries):

        pass

    @abstractmethod

    def forecast(self, period):

        pass

    def save(self, model_path):

        pass

    def load(self, model_path):

        pass

在Forecast类中,通过调用fit()方法,即可根据历史时序数据进行训练,通过forecast()方法预测未来走势。

获取到未来走势后如何判断是否是异常呢?方法比较多,最简单也是最基础的方法是通过阈值来进行判断,在我们的程序中,默认也是采用该方法进行判断的。

使用示例

Anomaly-Detection工具有start、stop、forecast、show_metrics、deploy五种运行模式,各模式说明如表8-12所示。

表8-12 Anomaly-Detection使用模式及说明

模式名称

说明

start

启动本地或者远程服务

stop

停止本地或远程服务

forecast

预测指标未来变化

show_metrics

输出当前监控的参数

deploy

远程部署代码

Anomaly-Detection工具运行模式使用示例如下所示。

① 使用start模式启动本地collector服务,代码如下:

python main.py start –role collector

1

② 使用stop模式停止本地collector服务,代码如下:

python main.py stop –role collector

1

③ 使用start模式启动远程collector服务,代码如下:

python main.py start --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

1

④ 使用stop模式停止远程collector服务,代码如下:

python main.py stop --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

1

2

⑤ 显示当前所有的监控参数,代码如下:

python main.py show_metrics

1

⑥ 预测io_read未来60秒的最大值、最小值和平均值,代码如下:

python main.py forecast –metric-name io_read –forecast-periods 60S –save-path predict_result

1

⑦ 将代码部署到远程服务器,代码如下:

python main.py deploy –user xxx –host xxx.xxx.xxx.xxx –project-path xxx

1

最后

以上就是花痴火为你收集整理的opengauss源码分析—指标采集预测与分析的全部内容,希望文章能够帮你解决opengauss源码分析—指标采集预测与分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部