概述
一、前言
最近在做mysql innodb cluster的监控,发现mysql innodb cluster的集群状态没有对应的指标能采集到,索性看一波源码,魔改一把吧。源码链接奉上: https://github.com/prometheus/mysqld_exporter
二、源码阅读
1、入口(main函数)
可以看到,MySQL Exporter提供了两个URL供访问,一个是 /
,用于打印一些基本的信息,另一个就是用于收集metrics的 /metrics
链接。
func main() {
// Generate ON/OFF flags for all scrapers.
scraperFlags := map[collector.Scraper]*bool{}
for scraper, enabledByDefault := range scrapers {
defaultOn := "false"
if enabledByDefault {
defaultOn = "true"
}
f := kingpin.Flag(
"collect."+scraper.Name(),
scraper.Help(),
).Default(defaultOn).Bool()
scraperFlags[scraper] = f
}
// Parse flags.
promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("mysqld_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
// landingPage contains the HTML served at '/'.
// TODO: Make this nicer and more informative.
var landingPage = []byte(`<html>
<head><title>MySQLd exporter</title></head>
<body>
<h1>MySQLd exporter</h1>
<p><a href='` + *metricPath + `'>Metrics</a></p>
</body>
</html>
`)
level.Info(logger).Log("msg", "Starting msqyld_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", version.BuildContext())
dsn = os.Getenv("DATA_SOURCE_NAME")
if len(dsn) == 0 {
var err error
if dsn, err = parseMycnf(*configMycnf); err != nil {
level.Info(logger).Log("msg", "Error parsing my.cnf", "file", *configMycnf, "err", err)
os.Exit(1)
}
}
// Register only scrapers enabled by flag.
enabledScrapers := []collector.Scraper{}
for scraper, enabled := range scraperFlags {
if *enabled {
level.Info(logger).Log("msg", "Scraper enabled", "scraper", scraper.Name())
enabledScrapers = append(enabledScrapers, scraper)
}
}
handlerFunc := newHandler(collector.NewMetrics(), enabledScrapers, logger)
http.Handle(*metricPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handlerFunc))
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write(landingPage)
})
level.Info(logger).Log("msg", "Listening on address", "address", *listenAddress)
srv := &http.Server{Addr: *listenAddress}
if err := web.ListenAndServe(srv, *webConfig, logger); err != nil {
level.Error(logger).Log("msg", "Error starting HTTP server", "err", err)
os.Exit(1)
}
}
2. metrics
对应的handler
我们进去看看 /metrics
对应的handler,它是由 newHandler
生成的:
func newHandler(metrics collector.Metrics, scrapers []collector.Scraper, logger log.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
filteredScrapers := scrapers
params := r.URL.Query()["collect[]"]
// Use request context for cancellation when connection gets closed.
ctx := r.Context()
// If a timeout is configured via the Prometheus header, add it to the context.
if v := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"); v != "" {
timeoutSeconds, err := strconv.ParseFloat(v, 64)
if err != nil {
level.Error(logger).Log("msg", "Failed to parse timeout from Prometheus header", "err", err)
} else {
if *timeoutOffset >= timeoutSeconds {
// Ignore timeout offset if it doesn't leave time to scrape.
level.Error(logger).Log("msg", "Timeout offset should be lower than prometheus scrape timeout", "offset", *timeoutOffset, "prometheus_scrape_timeout", timeoutSeconds)
} else {
// Subtract timeout offset from timeout.
timeoutSeconds -= *timeoutOffset
}
// Create new timeout context with request context as parent.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutSeconds*float64(time.Second)))
defer cancel()
// Overwrite request with timeout context.
r = r.WithContext(ctx)
}
}
level.Debug(logger).Log("msg", "collect[] params", "params", strings.Join(params, ","))
// Check if we have some "collect[]" query parameters.
if len(params) > 0 {
filters := make(map[string]bool)
for _, param := range params {
filters[param] = true
}
filteredScrapers = nil
for _, scraper := range scrapers {
if filters[scraper.Name()] {
filteredScrapers = append(filteredScrapers, scraper)
}
}
}
registry := prometheus.NewRegistry()
registry.MustRegister(collector.New(ctx, dsn, metrics, filteredScrapers, logger))
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
registry,
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
}
}
3. 接口
而关键就在于 registry.MustRegister
要求给的参数是符合 Collector
接口的实现,也就是说,每次需要收集信息的时候,就会调用 Collector
接口的 Collect
方法:
type Collector interface {
Describe(chan<- *Desc)
Collect(chan<- Metric)
}
我们不难发现,收集器并发收集所有指标,每个具体指标都会实现 Scraper
这个接口:
// Scraper is minimal interface that let's you add new prometheus metrics to mysqld_exporter.
type Scraper interface {
// Name of the Scraper. Should be unique.
Name() string
// Help describes the role of the Scraper.
// Example: "Collect from SHOW ENGINE INNODB STATUS"
Help() string
// Version of MySQL from which scraper is available.
Version() float64
// Scrape collects data from database connection and sends it over channel as prometheus metric.
Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error
}
那就简单了,我们如果想要实现一个指标的采集只要实现该接口就行,而具体的指标,就在 Scrape
这个接口里,从数据库里查出来,并且利用 各种方式把需要的数据提取出来,例如文本解析,正则等等。我们来看一个简单的收集器:
slave_status.go
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Scrape `SHOW SLAVE STATUS`.
package collector
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
)
const (
// Subsystem.
slaveStatus = "slave_status"
)
var slaveStatusQueries = [2]string{"SHOW ALL SLAVES STATUS", "SHOW SLAVE STATUS"}
var slaveStatusQuerySuffixes = [3]string{" NONBLOCKING", " NOLOCK", ""}
func columnIndex(slaveCols []string, colName string) int {
for idx := range slaveCols {
if slaveCols[idx] == colName {
return idx
}
}
return -1
}
func columnValue(scanArgs []interface{}, slaveCols []string, colName string) string {
var columnIndex = columnIndex(slaveCols, colName)
if columnIndex == -1 {
return ""
}
return string(*scanArgs[columnIndex].(*sql.RawBytes))
}
// ScrapeSlaveStatus collects from `SHOW SLAVE STATUS`.
type ScrapeSlaveStatus struct{}
// Name of the Scraper. Should be unique.
func (ScrapeSlaveStatus) Name() string {
return slaveStatus
}
// Help describes the role of the Scraper.
func (ScrapeSlaveStatus) Help() string {
return "Collect from SHOW SLAVE STATUS"
}
// Version of MySQL from which scraper is available.
func (ScrapeSlaveStatus) Version() float64 {
return 5.1
}
// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeSlaveStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
var (
slaveStatusRows *sql.Rows
err error
)
// Try the both syntax for MySQL/Percona and MariaDB
for _, query := range slaveStatusQueries {
slaveStatusRows, err = db.QueryContext(ctx, query)
if err != nil { // MySQL/Percona
// Leverage lock-free SHOW SLAVE STATUS by guessing the right suffix
for _, suffix := range slaveStatusQuerySuffixes {
slaveStatusRows, err = db.QueryContext(ctx, fmt.Sprint(query, suffix))
if err == nil {
break
}
}
} else { // MariaDB
break
}
}
if err != nil {
return err
}
defer slaveStatusRows.Close()
slaveCols, err := slaveStatusRows.Columns()
if err != nil {
return err
}
for slaveStatusRows.Next() {
// As the number of columns varies with mysqld versions,
// and sql.Scan requires []interface{}, we need to create a
// slice of pointers to the elements of slaveData.
scanArgs := make([]interface{}, len(slaveCols))
for i := range scanArgs {
scanArgs[i] = &sql.RawBytes{}
}
if err := slaveStatusRows.Scan(scanArgs...); err != nil {
return err
}
masterUUID := columnValue(scanArgs, slaveCols, "Master_UUID")
masterHost := columnValue(scanArgs, slaveCols, "Master_Host")
channelName := columnValue(scanArgs, slaveCols, "Channel_Name") // MySQL & Percona
connectionName := columnValue(scanArgs, slaveCols, "Connection_name") // MariaDB
for i, col := range slaveCols {
if value, ok := parseStatus(*scanArgs[i].(*sql.RawBytes)); ok { // Silently skip unparsable values.
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, slaveStatus, strings.ToLower(col)),
"Generic metric from SHOW SLAVE STATUS.",
[]string{"master_host", "master_uuid", "channel_name", "connection_name"},
nil,
),
prometheus.UntypedValue,
value,
masterHost, masterUUID, channelName, connectionName,
)
}
}
}
return nil
}
// check interface
var _ Scraper = ScrapeSlaveStatus{}
4. 采集的集合
通过上边的代码,我们已经知道了,mysql exporter采集指标的方式就是一个个实现接口就行,我们可以很方便扩展。我们可以用集合来表示监控参数的范围。首先exporter中利用scrapers常量记录了一个默认的采集范围集合A。
var scrapers = map[collector.Scraper]bool{
collector.ScrapeGlobalStatus{}: true,
collector.ScrapeGlobalVariables{}: true,
collector.ScrapeSlaveStatus{}: true,
collector.ScrapeProcesslist{}: false,
collector.ScrapeUser{}: false,
collector.ScrapeTableSchema{}: false,
collector.ScrapeInfoSchemaInnodbTablespaces{}: false,
collector.ScrapeInnodbMetrics{}: false,
collector.ScrapeAutoIncrementColumns{}: false,
collector.ScrapeBinlogSize{}: false,
collector.ScrapePerfTableIOWaits{}: false,
collector.ScrapePerfIndexIOWaits{}: false,
collector.ScrapePerfTableLockWaits{}: false,
collector.ScrapePerfEventsStatements{}: false,
collector.ScrapePerfEventsStatementsSum{}: false,
collector.ScrapePerfEventsWaits{}: false,
collector.ScrapePerfFileEvents{}: false,
collector.ScrapePerfFileInstances{}: false,
collector.ScrapePerfMemoryEvents{}: false,
collector.ScrapePerfReplicationGroupMembers{}: false,
collector.ScrapePerfReplicationGroupMemberStats{}: false,
collector.ScrapePerfReplicationApplierStatsByWorker{}: false,
collector.ScrapeUserStat{}: false,
collector.ScrapeClientStat{}: false,
collector.ScrapeTableStat{}: false,
collector.ScrapeSchemaStat{}: false,
collector.ScrapeInnodbCmp{}: true,
collector.ScrapeInnodbCmpMem{}: true,
collector.ScrapeQueryResponseTime{}: true,
collector.ScrapeEngineTokudbStatus{}: false,
collector.ScrapeEngineInnodbStatus{}: false,
collector.ScrapeHeartbeat{}: false,
collector.ScrapeSlaveHosts{}: false,
collector.ScrapeReplicaHost{}: false,
}
exporter也允许在exporter启动的时候,通过设置启动参数来设置采集范围B。当集合B不存在时,集合A生效;当集合B存在时,集合B生效,集合A失效。Prometheus在采集exporter的数据时,可以携带一个collect[]参数设定采集范围C。当集合C不存在时,Prometheus最终的采集范围是A或者B(取决于哪个集合生效);当集合C存在时,Prometheus最终的采集范围时C和A或者B(取决于哪个集合生效)的交集。
三、参考资料
1.https://jiajunhuang.com/articles/2018_12_16-prometheus_mysqld_exporter.md.html
2. https://github.com/prometheus/mysqld_exporter
最后
以上就是尊敬发卡为你收集整理的【MySQL】——mysql exporter源码分析的全部内容,希望文章能够帮你解决【MySQL】——mysql exporter源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复