我是靠谱客的博主 霸气裙子,这篇文章主要介绍Spark度量系统相关讲解,现在分享给大家,希望可以做个参考。

Spark的Metrics System的度量系统,有两个部分组成:source,sink,创建的时候需要制定instance。度量系统会周期的将source的指标数据被sink周期性的拉去,sink可以有很多。

Instance代表着使用度量系统的角色。在spark内部,目前master,worker,Executor,client driver,这些角色都会因为要去做监控而创建使用度量系统。目前,spark内部实现的instance有:master,worker,Executor,Driver,Applications。

Source指定定义了如何去收取度量指标。目前,已经存在以下两种source:

1.Spark内部的source,比如MasterSource,WorkerSource,ExecutorSource,

DAGSchedulerSource,BlockManagerSource,ApplicationSource。这些source会收集spark内部部件的状态。这些source都跟instance相关,在创建度量系统的时候会被加入。

2.公共的source,比如JVMSource,收集的是更加底层的状态,可以用配置文件配置并且是通过反射机制加载的。

Sink定义了度量指标数据输出的位置。同时可以共存很多sinks,指标数据会发给所有的sinks。

Source和sink的绑定

复制代码
1
2
3
4
5
6
7
def start() { require(!running, "Attempting to start a MetricsSystem that is already running") running = true registerSources() registerSinks() sinks.foreach(_.start) }

指标配置的格式如下:

复制代码
1
[instance].[sink|source].[name].[options] = xxxx

[instance]可以是master,worker,executor,driver,applications.配置了就意味着只有指定的instance由此属性。可以粗犷的用*代替instance name,这就意味着所有的instance都将由此属性。

[sink|source].代表着该属性是source还是sink。只能是二选一。

[name]指定sink或者source的名字。

[options]指定sink或者source的属性

具体例子如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
## Examples # Enable JmxSink for all instances by class name #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink # Enable ConsoleSink for all instances by class name #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink # Polling period for ConsoleSink #*.sink.console.period=10 #*.sink.console.unit=seconds # Master instance overlap polling period #master.sink.console.period=15 #master.sink.console.unit=seconds # Enable CsvSink for all instances #*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink # Polling period for CsvSink #*.sink.csv.period=1 #*.sink.csv.unit=minutes # Polling directory for CsvSink #*.sink.csv.directory=/tmp/ # Worker instance overlap polling period #worker.sink.csv.period=10 #worker.sink.csv.unit=minutes # Enable Slf4jSink for all instances by class name #*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink # Polling period for Slf4JSink #*.sink.slf4j.period=1 #*.sink.slf4j.unit=minutes

注意事项

1,添加新的sink的时候,设置class option时需要是全名。

2,有些sink支持周期的拉去数据。最小拉去数据的周期是1秒钟。

3,有些特殊的属性支持通配符,例如:master.sink.console.period->*.sink.console.period

4,metrics.properties文件如果放在 ${SPARK_HOME}/conf目录下可以被自动加载

如果想自定义目录需要用-Dspark.metrics.conf=xxx,指定java属性配置的方式去指定。

5,MetricsServlet作为默认的sink,只支持,master,worker,client driver,可以通过发送http请求/metrics/json,可以以json的格式获取所有已经注册的指标数据。

 

由于Spark生产中大部分运行于yarn上

Driver端的度量指标的请求方式

/proxy/application_1494227937369_0084/metrics/json

主要source源是:

StreamingSource,DAGSchedulerSource,BlockManagerSource,

ExecutorAllocationManagerSource

driver端的度量系统的初始化细节

在SparkContext里面

初始化度量系统

构建度量系统对象是在Sparkenv中做的

复制代码
1
MetricsSystem.createMetricsSystem("driver", conf, securityManager)

SparkContext只是引用了SparkEnv的对象

复制代码
1
2
metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

启动度量系统并且绑定ServletHandler

复制代码
1
2
3
4
5
// The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

注册source

复制代码
1
2
3
4
5
_env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) }

Executor端的Source:

ExecutorSource

Executor端度量系统的初始化机启动

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val metricsSystem = if (isDriver) { // Don't start metrics system right now for Driver. // We need to wait for the task scheduler to give us an app ID. // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. conf.set("spark.executor.id", executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms } 构建ExecutorSource并注册 private val executorSource = new ExecutorSource(threadPool, executorId) if (!isLocal) { env.metricsSystem.registerSource(executorSource) env.blockManager.initialize(conf.getAppId) }

可以看到Executor端并没有绑定ServletHandler,故而无法通过http请求到度量指标。

最后

以上就是霸气裙子最近收集整理的关于Spark度量系统相关讲解的全部内容,更多相关Spark度量系统相关讲解内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部