7.2、如何理解Flink中的水位线(Watermark)

2023-09-22 16:04:33

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {

    // 每进入一条数据,都会调用一次 onEvent 方法
    @Override
    /*
     * 参数说明:
     *   @event : 进入到该方法的事件数据
     *   @eventTimestamp : 时间戳提取器提取的时间戳
     * */
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        //发射水位线
        output.emitWatermark(new Watermark(eventTimestamp));
    }

    // 不需要实现
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };
        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PeriodWatermarkGenerator<>();
    }
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 5.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 设置变量,用来保存 当前最大的事件时间
    private long currentMaxTimestamp;
    // 设置变量,指定最大的乱序时间(等待时间)
    private final long maxOutOfOrderness = 0000; // 3 秒

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 只更新当前最大时间戳,不再发生水位线
        if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
    }

    // 周期性 生成水位线
    // 每个 setAutoWatermarkInterval 时间,调用一次该方法
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
    }
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };

        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedWatermarkGenerator<>();
    }

}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)
        PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();

        // TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐
        WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
                .withTimestampAssigner((event, timestamp) -> event.f1);

        // 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 创建 内置水位线生成策略
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...

// 3.读取 source时添加水位线
env
        .fromSource(source, WatermarkStrategy实例, "source name")   
        .print()
;

// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3); 

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

        source
                .partitionCustom(
                        new Partitioner<String>() {
                            @Override
                            public int partition(String key, int numPartitions) {
                                if (key.equals("a")) {
                                    return 0;
                                } else if (key.equals("b")) {
                                    return 1;
                                } else {
                                    return 2;
                                }
                            }
                        }, value -> value.split(",")[0]
                )
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                //.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
                                .withTimestampAssigner((element,recordTimestamp) -> element.f1)
                                .withIdleness(Duration.ofSeconds(5))  //空闲等待5s
                )
                .process(new ShowProcessFunction()).setParallelism(1)
                .print();
        
        env.execute();
    }
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

更多推荐

TypeScript入门

目录一:语言特性二:TypeScript安装NPM安装TypeScript三:TypeScript基础语法第一个TypeScript程序四:TypeScript保留关键字空白和换行TypeScript区分大小写TypeScript注释TypeScript支持两种类型的注释五:TypeScript与面向对象六:TypeS

如何使用ChatGPT,而不是生成默认风格的八股文

现在我每天都使用ChatGPT来执行多项任务,包括但不限于内容创建。无论是编写文本还是与我讨论我的业务目标,ChatGPT总是会时不时的用到。但与所有强大的工具一样,ChatGPT和类似的大型语言模型(LLM)也有其局限性。在我从事人工智能工作的过程中,我多次偶然发现它们。如果您在业务中依赖ChatGPT而不了解其局限

正则表达式 - 语法

目录正则表达式-语法普通字符测试工具非打印字符特殊字符限定符定位符选择以下列出?=、?<=、?!、?反向引用实例实例正则表达式-语法正则表达式是一种用于匹配和操作文本的强大工具,它是由一系列字符和特殊字符组成的模式,用于描述要匹配的文本模式。正则表达式可以在文本中查找、替换、提取和验证特定的模式。例如:runoo+b,

125. 验证回文串 【简单题】

题目如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个回文串。字母和数字都属于字母数字字符。给你一个字符串s,如果它是回文串,返回true;否则,返回false。示例1:输入:s="Aman,aplan,acanal:Panama"输出:true解释:"a

amlogic 机顶盒关闭DLNA 后,手机还能搜到盒子

S905L3带有投屏的功能,并通过com.droidlogic.mediacenter.dlna.MediaCenterService服务的启动和停止来开启和关闭DLNA功能,但是在测试中发现机顶盒关闭DLNA后,手机还能搜索到盒子。我在复测中发现关闭后有时很难很久搜索到盒子,有时却很容易搜索到。通过查看日志,发现打开

C语言每日一题(10):无人生还

文章主题:无人生还🔥所属专栏:C语言每日一题📗作者简介:每天不定时更新C语言的小白一枚,记录分享自己每天的所思所想😄🎶个人主页:[₽]的个人主页🏄🌊目录前言编程起因项目介绍情节简介讨论内容找出凶手设计思路1.整体逻辑方法一方法二2.具体逻辑方法一方法二代码展示方法一:依次假设法(最容易想到的方法)方法二:逻

【ABAP】如何理解SAP中的CLIENT (客户端)

💂作者简介:THUNDER王,阿里云社区专家博主,华为云·云享专家,腾讯云社区认证作者,CSDNSAP应用技术领域优质创作者。在学习工作中,我通常使用偏后端的开发语言ABAP,SQL进行任务的完成,对SAP企业管理系统,SAPABAP开发和数据库具有较深入的研究。💅文章概要:MANDT集团永远是无数SAP入门人员无

【STM32】SDIO—SD 卡读写01

基于stm32f103基于零死角玩转STM32—F103指南者简介1.SD卡总共有8个寄存器,用于设定或表示SD卡信息。2.SD卡的寄存器不能像STM32那样访问,而是利用命令访问,SDIO定义了64个命令。SD卡接收到命令后,根据命令要求对SD卡内部寄存器进行修改,程序控制中只需要发送组合命令就可以实现SD卡的控制以

认识微服务、服务拆分和远程调用

文章目录1.认识微服务2.微服务架构的特征3.SpringCloud4.服务拆分5.远程调用6.提供者与消费者1.认识微服务单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署分布式架构:根据业务功能对系统做拆分,每个业务功能模块作为独立项目开发,称为一个服务架构优点缺点单体架构架构简单,部署成本低,耦合度高

C与C++字符串方法示例

C和C++中的字符串处理方法有所不同。在C语言中,字符串被表示为字符数组,使用字符数组来处理字符串。C语言提供了许多用于处理字符串的库函数,如strcpy,strcat,strcmp等。这些函数可以用于复制,连接和比较字符串等操作。在C++中,字符串被表示为字符串对象,使用字符串对象来处理字符串。C++中的字符串类提供

hadoop HDFS分布式计算概述,MapReduce概述,YARN概述

1、分布式计算概述1.1、什么是(数据)计算我们一直在提及:分布式计算,分布式暂且不论,“计算”到底是指什么呢?大数据体系内的计算,举例:销售额统计、区域销售占比、季度销售占比利润率走势、客单价走势、成本走势品类分析、消费者分析、店铺分析等等一系列,基于数据得出的结论。这些就是我们所说的计算。1.2、分布式(数据)计算

热文推荐