Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系

2023-09-20 16:55:55

Flink checkpoint

Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

  1. Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记
  2. 当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录
  3. 每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐)
  4. 该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入
  5. 最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据

开启checkpoint

 

        //1.1 开启CK
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。

上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从checkpoint 恢复
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
  • 所有的Checkpoint文件都在以Job ID为名称的目录里面
  • 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
  • Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863

checkpoint的建议

  • Checkpoint 间隔不要太短
    • 过短的间对于底层分布式文件系统而言,会带来很大的压力。
    • Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
  • 合理设置超时时间

Flink savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中

Flink程序中包含两种状态数据:

用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。

设置Operator ID:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定

需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录

state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints

手动执行savepoint命令的时候,指定Savepoint存储目录

bin/flink savepoint :jobId [:targetDirectory]

使用默认配置

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

为正在运行的Flink Job指定一个目录存储Savepoint数据

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

从Savepoint恢复

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd

Savepoint 目录结构

1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
_metadata文件包含了Savepoint的元数据信息
其他文件内容都是序列化的状态信息

总结

checkpoint和savepoint是Flink为我们提供的作业快照机制,它们都包含有作业状态的持久化副本。

用几句话总结一下。

  1. checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。

  2. savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。

  3. checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。

  4. checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。

  5. checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

更多推荐

初识自动驾驶技术之旅 第一课 学习笔记

​🎬岸边的风:个人主页🔥个人专栏:《VUE》《javaScript》⛺️生活的理想,就是为了理想的生活!​目录📚前言📘1.自动驾驶人才需求与挑战📘2.Apollo8.0开源平台详解📘3.如何使用Apollo学习自动驾驶[上机学习]📘4.如何使用Apollo学习自动驾驶[上车学习]📚总结📚前言讲师介绍:

股票数据分析应用之可视化图表组件

股市是市场经济的必然产物,在一个国家的金融领域之中有着举足轻重的地位。在过去,人们对于市场走势的把握主要依赖于经验和直觉,往往容易受到主观因素的影响,导致决策上出现偏差。如今,通过数据可视化呈现,便可将历年数据和市场情报进行深度挖掘、分析,从中找到规律和趋势,帮助用户做出更准确的判断。回顾2022年A股市场的表现可谓是

翻牌闯关游戏

翻牌闯关游戏3关:关卡由少至多12格、20格、30格图案:12个玩法:点击两张卡牌,图案一到即可消除掉记忆时长(秒):memoryDurationTime:5可配置:默认5提示游戏玩法:showTipsFlag:1可配置:1:判断localStorage值,仅一次提示游戏玩法2:每次游戏第一关(12格关卡)都提示游戏玩

基于Java演唱会购票系统设计实现(源码+lw+部署文档+讲解等)

博主介绍:✌全网粉丝30W+,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌🍅文末获取源码联系🍅👇🏻精彩专栏推荐订阅👇🏻不然下次找不到哟2022-2024年最全的计算机软件毕业设计选题

JavaScript知识系列(3)每天10个小知识点

目录系列文章目录JavaScript知识系列(1)每天10个小知识点JavaScript知识系列(2)每天10个小知识点知识点**20.AJAX**的概念、作用、原理、特性、优点、缺点、区别、使用场景**,实现一个AJAX请求****21.尾调用**的概念、作用、原理、特性、优点、缺点、区别、使用场景**22.ES6模

互联网数字化管理升级,制造企业一站式智能管理,可定制-亿发

在互联网时代,传统机械制造企业面临着未有的挑战和机遇。信息化管理水平成为企业竞争力的关键因素。然而,许多制造企业在信息化管理中常常陷入以下三大问题:1、盲目随潮流,缺乏总体规划互联网时代,科技发展日新月异,让制造企业感到需要跟上潮流。然而,盲目跟风而行往往导致了技术的不合理使用和资源的浪费。解决这个问题的关键在于建立明

运维:Centos7安装解压版mysql5.7

目录1、卸载Centos7默认自带的mariadb数据库,避免冲突2、下载解压版mysql并安装3、配置mysql4、mysql客户端访问MySQL是一种开源的关系型数据库管理系统(RDBMS),它具有许多优点和一些缺点。以下是MySQL的主要优缺点:优点1.开源免费:MySQL是开源软件,可以免费使用,并且有大量的社

机器视觉常见的问题及解决

应该怎样选择相机?选择相机却往往刻不容缓的的问题摆在机器视觉工程师面前,因此,选择相机了解以下几个方面问题:通常您首先需要知道系统精度要求和相机分辨率,可以通过公式:X方向系统精度(X方向像素值)=视野范围(X方向)/CCD芯片像素数量(X方向);Y方向系统精度(Y方向像素值)=视野范围(Y方向)/CCD芯片像素数量(

面试失败的反思:如何从错误中吸取教训

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🦄博客首页——🐅🐾猫头虎的博客🎐🐳《面试题大全专栏》🦕文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》🐾学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》🐅学会Gol

服务器搭建(TCP套接字)-epoll版(服务端)

epoll是一种在Linux系统上用于高效事件驱动编程的I/O多路复用机制。它相比于传统的select和poll函数具有更好的性能和扩展性。epoll的主要特点和原理:1、事件驱动:epoll是基于事件驱动的模型,它通过监听事件来触发相应的回调函数,而不是像传统的阻塞模型那样持续轮询。这样可以避免无效的轮询操作,提高效

开源项目在面试中的作用:如何用你的贡献加分

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🦄博客首页——🐅🐾猫头虎的博客🎐🐳《面试题大全专栏》🦕文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》🐾学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》🐅学会Gol

热文推荐