概述
2019独角兽企业重金招聘Python工程师标准>>>
参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/back_pressure.html
https://blog.csdn.net/u011750989/article/details/82191298
https://blog.csdn.net/u013343882/article/details/82454408
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/back_pressure.html
点击以后,如果抓包,会看到下面的报文
下面,我们从这里开始作为研究的起点,首先我们看8088端口是什么端口!
对应进程的主类是
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
说明是也给resourceManager,我们去rm里找对应的处理过程!不过为了节省时间,我们先去找JobManager的web服务地址看看是如何接收到请求的!
首先查看进程
这是JobManager所在的主类,从这个类去寻找答案!
我们先找到一个断点
stop at org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap:129
顺利抓到调用栈
[1] org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap.<init> (WebFrontendBootstrap.java:129)
[2] org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.<init> (WebRuntimeMonitor.java:388)
[3] sun.reflect.NativeConstructorAccessorImpl.newInstance0 (native method)
[4] sun.reflect.NativeConstructorAccessorImpl.newInstance (NativeConstructorAccessorImpl.java:62)
[5] sun.reflect.DelegatingConstructorAccessorImpl.newInstance (DelegatingConstructorAccessorImpl.java:45)
[6] java.lang.reflect.Constructor.newInstance (Constructor.java:423)
[7] org.apache.flink.runtime.webmonitor.WebMonitorUtils.startWebRuntimeMonitor (WebMonitorUtils.java:159)
[8] org.apache.flink.runtime.clusterframework.BootstrapTools.startWebMonitorIfConfigured (BootstrapTools.java:203)
[9] org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster (YarnApplicationMasterRunner.java:328)
[10] org.apache.flink.yarn.YarnApplicationMasterRunner$1.call (YarnApplicationMasterRunner.java:178)
[11] org.apache.flink.yarn.YarnApplicationMasterRunner$1.call (YarnApplicationMasterRunner.java:175)
[12] org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$8.360936478.run (null)
[13] java.security.AccessController.doPrivileged (native method)
[14] javax.security.auth.Subject.doAs (Subject.java:422)
[15] org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:1,692)
[16] org.apache.flink.runtime.security.HadoopSecurityContext.runSecured (HadoopSecurityContext.java:41)
[17] org.apache.flink.yarn.YarnApplicationMasterRunner.run (YarnApplicationMasterRunner.java:175)
[18] org.apache.flink.yarn.YarnApplicationMasterRunner.main (YarnApplicationMasterRunner.java:122)
抓这个调用栈的目的是为了理清楚JobManager启动的流程,以便决定从哪个位置开始debug
建议还是从
stop at
org.apache.flink.yarn.YarnApplicationMasterRunner:200
stop at
org.apache.flink.yarn.YarnApplicationMasterRunner:300
stop at org.apache.flink.runtime.webmonitor.WebRuntimeMonitor:181
stop at org.apache.flink.runtime.webmonitor.WebRuntimeMonitor:214
断点生效!通过一步步的debug,中间要生成下面这个对象
注意界面刷新时间
/**
* The config parameter defining the refresh interval for the web-frontend.
*/
public static final ConfigOption<Long> REFRESH_INTERVAL =
key("web.refresh-interval")
.defaultValue(3000L)
.withDeprecatedKeys("jobmanager.web.refresh-interval");
接下里,是关于背压的一些初始化动作
接下来就是构造路由
然后构造netty
看到这里,自然要去寻找netty里面是在哪里处理http请求的!
所以,直接去看Handler handler = new Handler(WebFrontendBootstrap.this.router);内部实现就可以了!
所以我们在这个地方打断点
stop in org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0
stop at stop at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.DualAbstractHandler:48
很快发现这条路走不通,因为flink编译的netty jar包没有debug信息,打印变量都不好办,继续找
bingo,找到了就好办,直接进去看!
stop in org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler.handleRequest
当点击了
断点就可以触发了!调用栈就是
[1] org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler.handleRequest (JobVertexBackPressureHandler.java:75)
[2] org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler.handleRequest (AbstractJobVertexRequestHandler.java:48)
[3] org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler.lambda$handleJsonRequest$1 (AbstractExecutionGraphRequestHandler.java:73)
[4] org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler$$Lambda$65.1369709571.apply (null)
[5] java.util.concurrent.CompletableFuture.uniCompose (CompletableFuture.java:952)
[6] java.util.concurrent.CompletableFuture$UniCompose.tryFire (CompletableFuture.java:926)
[7] java.util.concurrent.CompletableFuture$Completion.run (CompletableFuture.java:442)
[8] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
[9] java.util.concurrent.FutureTask.run (FutureTask.java:266)
[10] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)
[11] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
[12] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
[13] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
[14] java.lang.Thread.run (Thread.java:748)
让我们分析一下
这里也明确说了,如果有在在途的,当前就会忽略!
下面这个的解释
具体个数跟你的并行度有关系,具体不展开!
另外几个参数的作用如下所示:
然后就依次执行下面的代码
注意,到目前位置,所有的代码都运行在JobManager层,继续执行
打印一下变量
那么剩下的问题就是tm是怎么处理的?
我们回到 org.apache.flink.yarn.YarnTaskManager 来观察TM的启动,看看是否可以找到什么蛛丝马迹!
一时半会不好找,不过这个问题很好解决,debug的时候让JobManager发一个错误内容去TM投石问路就好了,应该TM会打错误日志,就可以找到了!
发现还是行不通,直接参考官方文档,搜索.getStackTrace(),有好几个地方,看了下
这里可以参考
https://blog.csdn.net/u011750989/article/details/82191298
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/back_pressure.html
然后我们在下面这个地方打断点
stop in org.apache.flink.runtime.taskexecutor.TaskExecutor.requestStackTraceSample
还是不行,继续找
继续打断点
stop at org.apache.flink.runtime.taskmanager.TaskManager:715
顺利命中!
[1] org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleStackTraceSampleMessage (TaskManager.scala:715)
[2] org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse (TaskManager.scala:291)
[3] scala.runtime.AbstractPartialFunction.apply (AbstractPartialFunction.scala:36)
[4] org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse (LeaderSessionMessageFilter.scala:49)
[5] scala.runtime.AbstractPartialFunction.apply (AbstractPartialFunction.scala:36)
[6] org.apache.flink.runtime.LogMessages$$anon$1.apply (LogMessages.scala:33)
[7] org.apache.flink.runtime.LogMessages$$anon$1.apply (LogMessages.scala:28)
[8] scala.PartialFunction$class.applyOrElse (PartialFunction.scala:123)
[9] org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse (LogMessages.scala:28)
[10] akka.actor.Actor$class.aroundReceive (Actor.scala:502)
[11] org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive (TaskManager.scala:122)
[12] akka.actor.ActorCell.receiveMessage (ActorCell.scala:526)
[13] akka.actor.ActorCell.invoke (ActorCell.scala:495)
[14] akka.dispatch.Mailbox.processMailbox (Mailbox.scala:257)
[15] akka.dispatch.Mailbox.run (Mailbox.scala:224)
[16] akka.dispatch.Mailbox.exec (Mailbox.scala:234)
[17] scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
[18] scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask (ForkJoinPool.java:1,339)
[19] scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1,979)
[20] scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)
跟朋友聊了下,这么理解
所以我们要在下面这个地方打断点
stop at org.apache.flink.runtime.taskmanager.TaskManager:746
这个不解释了!
我们去看看栈是怎么获得的,其实上面这个就是物理线程,直接调用getStackTrace()方法就可以获得,如果要取样100次,那么剩下就会继续获取!
继续断点
stop at org.apache.flink.runtime.taskmanager.TaskManager:771
这里就发回给了job manager,我们继续去看job manager的处理响应部分!
处理函数是
stop in org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl$StackTraceSampleCompletionCallback.createStatsFromSample
看实现!
计算过程在此!
转载于:https://my.oschina.net/qiangzigege/blog/3024384
最后
以上就是帅气小蜜蜂为你收集整理的Flink的背压的计算过程【上篇】的全部内容,希望文章能够帮你解决Flink的背压的计算过程【上篇】所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复