我是靠谱客的博主 难过小蚂蚁,这篇文章主要介绍MetricSystem指标系统简介MetricsSystem的创建与启动给Sinks增加Jetty的ServletContextHandlerSources向Sinks发送数据,现在分享给大家,希望可以做个参考。

文章目录

  • 简介
  • MetricsSystem的创建与启动
    • 加载配置文件
    • 注册Sources
    • 注册并启动Sinks
  • 给Sinks增加Jetty的ServletContextHandler
  • Sources向Sinks发送数据
    • Source中通过MetricRegistry实现指标收集
    • 将Source中的MetricRegistry注册到MetricSystem中
    • 将MetricSystem中的metricRegistry注册到Sinks中
    • Sink通过注册进来的MetricRegistry获取Source的数据

简介

MetricSystem通过一个特定的Instance创建,由sources和sinks组成,并周期性地将source中的指标数据推送到目标sink中去。MetricSystem使用codahale提供的第三方测量仓库Metrics。MetricSystem中有三个主要概念:

  • Instance:指定了谁在使用测量系统。目前在Spark中,已经实现的Instance有:master、worker、applications、driver、executor和shuffleService。
  • Source:指定了从哪里收集测量数据。在指标系统中,有两种类型的souce:
    • Spark internal source,如MasterSource和WorkerSource等,将会收集spark组件的内部状态。
    • Common source,如JvmSource,将会收集低层状态,它们是通过configuration进行配置并利用反射进行加载。
  • Sink:指定了往哪里发送测量数据。多个sinks可以共存,并且指标可以被同时发送给它们。Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。Spark中使用MetricsServlet作为默认的Sink。

指标的配置格式如下:

复制代码
1
2
[instance].[sink|source].[name].[options] = xxxx
  • [instance]:可以是master、worker、executor、driver或者applications,这意味着只有对应的instance会有这个属性。通配符*则意味着所有的instance将会有这个属性。
  • [sink|source]:表明这个属性是属于source还是sink,这个部分的选项只能是source或sink。
  • [name]:表明sink或source的名称。
  • [options]:表示这个source或sink的特定属性。

下面以driver实例为例来介绍下MetricsSystem的创建与启动。

MetricsSystem的创建与启动

driver实例的MetricsSystem创建启动代码位于SparkContext类中。可以看到,创建好MetricsSystem后,则将所有的ServletContextHandler添加到Spark UI中,然后调用其start()方法启动,最后注册其他几个source对象。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建SparkEnv,在SparkEnv创建过程中创建MetricsSystem _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) // Driver的metrics system需要将app ID设置到spark.app.id属性值中, // 因此它应该在我们从task scheduler中得到app ID(在SchedulerBackend构造函数中创建)并且设置spark.app.id属性后再启动 // 启动MetricsSystem _env.metricsSystem.start() // 在metrics system启动后,将driver metrics servlet handler添加到web ui中 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) // 等待taskScheduler的backend就绪 _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) }

DAGSchedulerSource测量的信息是dagScheduler.failedStages、dagScheduler.runningStages、dagScheduler.waitingStages、dagScheduler.numTotalJobs、dagScheduler.activeJobs。

BlockManagerSource测量的信息是memory.maxMem_MB、memory.maxOnHeapMem_MB、memory.maxOffHeapMem_MB、memory.remainingMem_MB、memory.remainingOnHeapMem_MB、memory.remainingOffHeapMem_MB、memory.memUsed_MB、memory.onHeapMemUsed_MB、memory.offHeapMemUsed_MB、disk.diskSpaceUsed_MB。

MetricsSystem类部分代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private[spark] class MetricsSystem private ( val instance: String, conf: SparkConf, securityMgr: SecurityManager) extends Logging { // 存储指标配置信息 private[this] val metricsConfig = new MetricsConfig(conf) // 存储sinks和sources private val sinks = new mutable.ArrayBuffer[Sink] private val sources = new mutable.ArrayBuffer[Source] private val registry = new MetricRegistry() // 没有定义为volatile,也即没有考虑多线程场景 private var running: Boolean = false // 将MetricsServlet看成一个特殊的sink,因为它会暴露给web ui的添加handlers操作 private var metricsServlet: Option[MetricsServlet] = None // 得到该metrics system使用到的所有UI handlers;只能在start()方法调用后才能被调用 def getServletHandlers: Array[ServletContextHandler] = { require(running, "Can only call getServletHandlers on a running MetricsSystem") metricsServlet.map(_.getHandlers(conf)).getOrElse(Array()) } // 加载配置信息 metricsConfig.initialize() // 启动该metrics system def start() { // 强制不能调用多次,而且不是多线程安全的,也即可能防止单线程中反复调用start方法场景 require(!running, "Attempting to start a MetricsSystem that is already running") // 假如running=true执行后被hang住了,后面语句没有执行?也即未考虑多线程场景 running = true StaticSources.allSources.foreach(registerSource) registerSources() registerSinks() sinks.foreach(_.start) } ... }

总结来说,MetricsSystem的启动过程包括以下步骤:

  • 加载配置文件;
  • 注册Sources;
  • 注册并启动Sinks;
  • 给Sinks增加Jetty的ServletContextHandler。

MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attachHandler将它们绑定到Spark UI上。

加载配置文件

创建MetricsSystem时,在其构造函数中调用metricsConfig.initialize()来加载配置信息,具体实现如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** * 基于优先级从多个位置加载配置信息 * 如果同一个配置在这个方法中被再次赋值,则旧配置值会被覆盖 */ def initialize() { // 设置默认配置值以防止没有配置文件存在 setDefaultProperties(properties) // 从配置文件中加载配置,可通过"spark.metrics.conf"指定,默认值为metrics.properties loadPropertiesFromFile(conf.getOption("spark.metrics.conf")) // 加载SparkConf中以"spark.metrics.conf."为前缀的配置值 val prefix = "spark.metrics.conf." conf.getAll.foreach { case (k, v) if k.startsWith(prefix) => properties.setProperty(k.substring(prefix.length()), v) case _ => } // 得到每个instance配置的子配置值。配置以"."分隔,第一个"."前的为instance作为key, // 并将余下的配置kv值统一作为这个instance的key的value值。 // 比如输入属性值为("*.sink.servlet.class"->"class1", "*.sink.servlet.path"->"path1"), // 那么返回Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1")) perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX) // 如果instance包含通配符"*"配置,则将该配置设置给所有具体的instance。 if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) { val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX); (k, v) <- defaultSubProperties if (prop.get(k) == null)) { prop.put(k, v) } } }

注册Sources

注册Sources在start()中被调用,包括两个步骤:

  • 注册固定sources,包括CodegenMetrics和HiveCatalogMetrics;

  • 注册配置的该instance对应的所有sources

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 注册固定sources,allSources = Seq(CodegenMetrics, HiveCatalogMetrics) StaticSources.allSources.foreach(registerSource) // 注册该instance对应的sources registerSources() private def registerSources() { // 得到某instance对应的配置信息 val instConfig = metricsConfig.getInstance(instance) // 得到source的配置信息 val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) // 注册该instance相关的所有source sourceConfigs.foreach { kv => val classPath = kv._2.getProperty("class") try { val source = Utils.classForName(classPath).newInstance() registerSource(source.asInstanceOf[Source]) } catch { case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) } } }

注册并启动Sinks

注册并启动Sinks在start()中被调用:

复制代码
1
2
3
4
registerSinks() // 调用每个具体Sink的start方法进行启动,start方法在子类中被实现 sinks.foreach(_.start)

注册Sinks主要包括两个步骤:

  • 得到该instance对应的所有sinks的配置。

  • 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    private def registerSinks() { // 得到该instance对应的所有sinks的配置 val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { try { val sink = Utils.classForName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { case e: Exception => logError("Sink class " + classPath + " cannot be instantiated") throw e } } } }

给Sinks增加Jetty的ServletContextHandler

为了能够在SparkUI访问到测量数据,所以需要给Sinks增加Jetty的ServletContextHandler,这里主要用到MetricSystem的getServletHandlers方法实现。生成的ServletContextHandler通过SparkUI的attachHandler方法,被绑定到SparkUI。

复制代码
1
2
3
4
5
6
7
8
// 在metrics system启动后,将driver metrics servlet handler添加到web ui中 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) def getServletHandlers: Array[ServletContextHandler] = { require(running, "Can only call getServletHandlers on a running MetricsSystem") metricsServlet.map(_.getHandlers(conf)).getOrElse(Array()) }

我们可以使用以下这些地址访问测量数据(具体截图待补充):

  • http://localhost:4040/metrics/applications/json
  • http://localhost:4040/metrics/json
  • http://localhost:4040/metrics/master/json

Sources向Sinks发送数据

Spark使用第三方库codahale中的MetricRegistry来连接Source和Sink。

Source中通过MetricRegistry实现指标收集

Source接口的定义如下,可以看到每个Source中都包含MetricRegistry成员。

复制代码
1
2
3
4
5
6
7
import com.codahale.metrics.MetricRegistry private[spark] trait Source { def sourceName: String def metricRegistry: MetricRegistry }

MetricRegistry类继承com.codahale.metrics.MetricSet中的getMetrics()方法来实现数据收集工作。

复制代码
1
2
public class MetricRegistry implements MetricSet {}

以DAGSchedulerSource为例,向MetricRegistry中注册要收集的指标及指标收集类型与方法。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { override val metricRegistry = new MetricRegistry() override val sourceName = "DAGScheduler" metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size }) ... }

将Source中的MetricRegistry注册到MetricSystem中

向MetricSystem中注册Source时,将其MetricRegistry注册到MetricSystem的metricRegistry中。

复制代码
1
2
3
4
5
6
7
8
9
10
def registerSource(source: Source) { sources += source try { val regName = buildRegistryName(source) registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } }

将MetricSystem中的metricRegistry注册到Sinks中

在注册每个Sink时都会将MetricSystem中的metricRegistry传递进去:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def registerSinks() { val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { try { val sink = Utils.classForName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { case e: Exception => logError("Sink class " + classPath + " cannot be instantiated") throw e } } } }

Sink通过注册进来的MetricRegistry获取Source的数据

以MetricsServlet这个Sink为例,其在收到请求后将registry的内容转换为字符串发送出去。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[spark] class MetricsServlet( val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { def getHandlers(conf: SparkConf): Array[ServletContextHandler] = { Array[ServletContextHandler]( createServletHandler(servletPath, new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr, conf) ) } def getMetricsSnapshot(request: HttpServletRequest): String = { mapper.writeValueAsString(registry) } }

最后

以上就是难过小蚂蚁最近收集整理的关于MetricSystem指标系统简介MetricsSystem的创建与启动给Sinks增加Jetty的ServletContextHandlerSources向Sinks发送数据的全部内容,更多相关MetricSystem指标系统简介MetricsSystem内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部