我是靠谱客的博主 腼腆小鸭子,最近开发中收集的这篇文章主要介绍Flink容错机制源码分析Flink容错机制,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Flink容错机制

(基于FLIP6的YARN架构)

  1. YARN application master启动:YarnFlinkApplicationMasterRunner
  2. runApplicationMaster(flinkConfig)
    • 启动RPC服务
    • 初始化resource manager
    • 初始化job master参数
    • 启动resource manager
    • 启动job manager runner
  3. JobManagerRunner流程:
    • 从jar包加载作业图:jobGraph = loadJobGraph(config);
    • BlobLibraryCacheManager注册作业,库和class loader
    • 设置ha服务
    • 启动JobManager
  4. JobManager流程:
    • 初始化作业(name, jobid)
    • 初始化重启策略
    • 初始化任务槽池
    • 构建执行图ExecutionGraph
    • 为ExecutionGraph注册状态监听器
  5. 通过作业图构建执行图buildGraph:
    • 初始化具有initializeOnMaster钩子的vertex(输出格式创建目录,输入格式创建splits)
    • 将 job vertices 拓扑排序,并附加到执行图
    • 配置state checkpointing
    • 创建执行图的metrics
  6. executionGraph.enableCheckpointing:创建CheckpointCoordinator,触发和提交检查点,保存状态。
  7. 执行图的监听器监听作业状态,到达JobStatus.RUNNING时启动检查点调度器startCheckpointScheduler(),定期触发triggerCheckpoint()。
  8. triggerCheckpoint():发送消息给任务管理器,触发检查点:execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
  9. 任务管理器接收到TriggerCheckpoint消息:从消息里解出3个参数并传给task触发:task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
  10. task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions):异步执行快照动作。

  11. CheckpointMetaData(checkpointID, checkpointTimestamp)

  12. StreamTask.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);

    • 向下游operatorChain广播: CheckpointBarrier(id, timestamp, checkpointOptions)

    • checkpointingOperation.executeCheckpointing();

    • 提交一个AsyncCheckpointRunnable线程异步执行
  13. State Backend执行snapshot()操作。

最后

以上就是腼腆小鸭子为你收集整理的Flink容错机制源码分析Flink容错机制的全部内容,希望文章能够帮你解决Flink容错机制源码分析Flink容错机制所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部