大数据面试题:Flink延迟数据是怎么解决的

2023-09-22 12:01:58

最近朋友面试某猪的时候,被问到一个问题答得面试官不太满意,问的是前司数据延迟问题是怎么解决的,我稍作整理。

一、什么是延迟数据

大数据处理过程中 Join 的场景太多太多了,几乎所有公司的 APP 都会涉及到两条流数据之间的维度拼接,将表变宽等场景,避免不了进行多流 Join 操作。同时join场景中受网络或物理设备等因素影响也有可能,以致出现不同的流式数据到达计算引擎的时间不一定,那这些数据称为延迟数据。即延迟数据是指系统的事件时间戳在经过延迟元素时间戳之后的时间到达的数据。所以延迟数据可以说是一种特殊的乱序数据,它没有被watermark和Window机制处理,因为是在窗口关闭后才到达的数据。一般这种情况有三种处理办法:

  • 重新激活已经关闭的窗口并重新计算以修正结果。

  • 将迟到数据收集起来另外处理。

  • 将迟到数据直接丢弃。

Flink默认采用第三种方法,将迟到数据直接丢弃。接下来我们演示一个延迟数据案例,之后使用Flink提供的waterMark、allowedLateness机制、sideOutput机制解决延迟数据的场景。

二、Flink延迟数据场景

这里是模拟将两个Socket流使用事件时间进行5秒滚动窗口Left Outer join后,延迟数据被丢失的场景,延迟数据没有被正常输出。

1. 代码

object MyCoGroupJoin {

def main(args: Array[String]): Unit = {

// 创建环境变量

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 指定事件时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s1 = env.socketTextStream("hadoop52", 9999)

// 设置事件时间戳字段

.assignAscendingTimestamps(_.split(" ")(0).toLong)

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s2 = env.socketTextStream("hadoop52", 8888)

// 设置事件时间戳字段

.assignAscendingTimestamps(_.split(" ")(0).toLong)

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

//将两个数据流进行合并统计,这里是将两数据流利用窗口进行单词拼串处理

s1.coGroup(s2)

.where(_._2)

.equalTo(_._2)

.window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口,窗口大小5秒

.apply(new CoGroupFunction[(Long, String), (Long, String), String] {

override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit = {

// 对两数据流中的数据进行循环遍历,并拼串下发

first.forEach(r1 => {

second.forEach(r2 => {

println(s"${r1}::${r2}")

val str = r1._2 + r2._2

out.collect(str)

})

})

}

})

.print()

env.execute("cogroupjoin")

}

}

2. Socket数据源1

[atguigu@hadoop52 ~]$ nc -l 9999

1001000 hello

1005000 java

1003000 hello [这条被丢弃了]

1010000 xixi

3. Socket数据源2

[atguigu@hadoop52 ~]$ nc -l 8888

1002000 hello

1005000 java

1001000 hello [这条被丢弃了]

1010000 xixi

4. 程序执行控制台输出结果

(1001000,hello)::(1002000,hello)

4> hellohello

(1005000,java)::(1005000,java)

2> javajava

我们发现在Socket数据源输入 "1005000 java" 后,会统计1005000时间戳之前的数据,而在1005000时间戳之后输入的hello就没有被统计输出。当输入 "1010000 xixi" 后,触发了第2个窗口,只输出了java,还是没有后输入的hello统计结果,这也更明确了1005000时间戳之后输入的hello被丢弃了。数据很重要,不想丢弃怎么办,我们可以使用Flink提供的水印位(waterMark)解决。

三、解决方案之waterMark

watermark是flink为了处理event time窗口计算提出的一种机制,本质上就是一个时间戳,代表着比这个时间早的事件已经全部进入到相应的窗口,后续不会再有比这个时间小的事件出现,基于这个前提我们才有可能将event time窗口视为完整并触发窗口的计算。

1. 程序代码

object MyCoGroupJoin {

def main(args: Array[String]): Unit = {

// 创建环境变量

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 指定事件时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s1 = env.socketTextStream("hadoop52", 9999)

// 设置事件时间戳字段

// .assignAscendingTimestamps(_.split(" ")(0).toLong)

// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark,分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现

// 这里使用周期性产生WaterMark,延长2秒

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {

override def extractTimestamp(element: String): Long = {

val strs = element.split(" ")

strs(0).toLong

}

})

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s2 = env.socketTextStream("hadoop52", 8888)

// 设置事件时间戳字段

// .assignAscendingTimestamps(_.split(" ")(0).toLong)

// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark,分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现

// 这里使用周期性产生WaterMark,延长2秒

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {

override def extractTimestamp(element: String): Long = {

val strs = element.split(" ")

strs(0).toLong

}

})

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

//将两个数据流进行合并统计,这里是将两数据流利用窗口进行单词拼串处理

s1.coGroup(s2)

.where(_._2)

.equalTo(_._2)

.window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口,窗口大小5秒

.apply(new CoGroupFunction[(Long, String), (Long, String), String] {

override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit = {

// 对两数据流中的数据进行循环遍历,并拼串下发

first.forEach(r1 => {

second.forEach(r2 => {

println(s"${r1}::${r2}")

val str = r1._2 + r2._2

out.collect(str)

})

})

}

})

.print()

env.execute("cogroupjoin")

}

}

2. Socket数据源1

[atguigu@hadoop52 ~]$ nc -l 9999

1001000 hello

1005000 java

1003000 hello

1007000 java

3. Socket数据源2

[atguigu@hadoop52 ~]$ nc -l 8888

1002000 hello

1005000 java

1001000 hello

1007000 java

4. 程序执行控制台输出结果

(1001000,hello)::(1002000,hello)

4> hellohello

(1001000,hello)::(1001000,hello)

4> hellohello

(1003000,hello)::(1002000,hello)

4> hellohello

(1003000,hello)::(1001000,hello)

4> hellohello

当我们使用WaterMark后,我们可以发现在两个Socket终端输入"1005000 java"时,控制台并没有立刻统计输出信息。而是在两个Socket终端输入 "1007000 java"

后,控制台才将统计结果输出出来且在时间戳"1005000"之后输入的hello也同时给统计出来了,上面的问题可以解决了,但是 "1007000 java" 之后我们再输入 hello ,你会发现还是存在问题,没有输出又给丢弃了。继续测试如下。

5. Socket数据源1

[atguigu@hadoop52 ~]$ nc -l 9999

1001000 hello

1005000 java

1003000 hello

1007000 java

1003000 hello

1012000 spark

6. Socket数据源2

[atguigu@hadoop52 ~]$ nc -l 8888

1002000 hello

1005000 java

1001000 hello

1007000 java

1004000 hello

1012000 spark

7. 程序执行控制台输出结果

(1001000,hello)::(1002000,hello)

4> hellohello

(1001000,hello)::(1001000,hello)

4> hellohello

(1003000,hello)::(1002000,hello)

4> hellohello

(1003000,hello)::(1001000,hello)

4> hellohello

(1005000,java)::(1005000,java)

2> javajava

(1005000,java)::(1007000,java)

2> javajava

(1007000,java)::(1005000,java)

2> javajava

(1007000,java)::(1007000,java)

2> javajava

所以waterMark只能在一定程度上解决这种问题。我们再来看看allowedLateness机制。

四、解决方案之allowedLateness机制

默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被丢弃。为了避免有些迟到的数据被丢弃,因此产生了allowedLateness机制。简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后还允许有一段时间(也是以event time来衡量)来等待之后的数据到达,以便再次处理这些数据。

1. 程序代码

object MyCoGroupJoin {

def main(args: Array[String]): Unit = {

// 创建环境变量

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 指定事件时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s1 = env.socketTextStream("hadoop52", 9999)

// 设置事件时间戳字段

// .assignAscendingTimestamps(_.split(" ")(0).toLong)

// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark,分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现

// 这里使用周期性产生WaterMark,延长2秒

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {

override def extractTimestamp(element: String): Long = {

val strs = element.split(" ")

strs(0).toLong

}

})

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s2 = env.socketTextStream("hadoop52", 8888)

// 设置事件时间戳字段

// .assignAscendingTimestamps(_.split(" ")(0).toLong)

// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark,分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现

// 这里使用周期性产生WaterMark,延长2秒

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {

override def extractTimestamp(element: String): Long = {

val strs = element.split(" ")

strs(0).toLong

}

})

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

//将两个数据流进行合并统计,这里是将两数据流利用窗口进行单词拼串处理

s1.coGroup(s2)

.where(_._2)

.equalTo(_._2)

.window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口,窗口大小5秒

// 允许数据迟到2秒,窗口触发后2秒内过来的数据还可以重新被计算

.allowedLateness(Time.seconds(2))

.apply(new CoGroupFunction[(Long, String), (Long, String), String] {

override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit = {

// 对两数据流中的数据进行循环遍历,并拼串下发first.forEach(r1 => {

second.forEach(r2 => {

println(s"${r1}::${r2}")

val str = r1._2 + r2._2

out.collect(str)

})

})

}

})

.print()

env.execute("cogroupjoin")

}

}

2. Socket数据源1

[atguigu@hadoop52 ~]$ nc -l 9999

1001000 hello

1007000 java

3. Socket数据源2

[atguigu@hadoop52 ~]$ nc -l 8888

1002000 hello

1007000 java

1003000 hello

4. 程序执行控制台输出结果

(1001000,hello)::(1002000,hello)

4> hellohello

(1001000,hello)::(1002000,hello)

4> hellohello

(1001000,hello)::(1003000,hello)

4> hellohello

到这里估计有朋友又有疑问了,allowedLateness机制解决数据延迟设置的时间段,那之后再来的延迟数据呢,还是被丢弃了并没有彻底解决问题。别慌,针对allowedLateness机制之后来的延迟数据Flink还提供了另一种方案就是sideOutput机制。

五、解决方案之sideOutput机制

Side Output简单来说就是在程序执行过程中,将主流stream流中的不同的业务类型或者不同条件的数据分别输出到不同的地方。如果我们想对没能及时在Flink窗口计算的延迟数据专门处理,也就是窗口已经计算了,但后面才来的数据专门处理,我们可以使用旁路输出到侧流中去处理。

1. 程序代码

object MyCoGroupJoin {

def main(args: Array[String]): Unit = {

// 创建环境变量

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 指定事件时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建Socket源数据流,内容格式 "时间戳 单词"

val s1 = env.socketTextStream("hadoop52", 9999)

// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark,分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现

// 这里使用周期性产生WaterMark,延长2秒
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {

override def extractTimestamp(element: String): Long = {

val strs = element.split(" ")

strs(0).toLong

}

})

.map(line => {

val strs = line.split(" ")

(strs(0).toLong, strs(1))

})

// 定义一个侧输出流

val lateData: OutputTag[(Long, String)] = new OutputTag[(Long, String)]("late")

val s2 = s1.timeWindowAll(Time.seconds(5))

//允许迟到2秒数据

.allowedLateness(Time.seconds(2))

//迟到长于2秒的数据将被保存到lateData侧数据流中

.sideOutputLateData(lateData)

.process(new ProcessAllWindowFunction[(Long, String), (Long, String), TimeWindow] {

override def process(context: Context, elements: Iterable[(Long, String)], out: Collector[(Long, String)]): Unit = {

elements.foreach(ele => {

out.collect(ele)

})

}

})

s2.print("主流")

s2.getSideOutput(lateData).print("侧流")

env.execute("cogroupjoin")

}

}

2. Socket数据源

[atguigu@hadoop52 ~]$ nc -l 9999

1001000 hello

1005000 java

1007000 python

1002000 hello

1009000 java

1001000 xixi

1002000 haha

3. 程序执行控制台输出结果

主流:10> (1001000,hello)

主流:11> (1001000,hello)

主流:12> (1002000,hello)

侧流:1> (1001000,xixi)

侧流:2> (1002000,haha)

通过上面测试可以发现晚于allowedLateness机制的延迟数据,Flink没有丢弃而是输出到了侧输出流中等待处理了,这样延迟数据就完美解决了。

六、总结

sideOutput机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

allowedLateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。

更多推荐

[C++ 网络协议] 多种I/O函数

1.Linux的send&recv函数1.1send函数和recv函数#include<sys/socket.h>ssize_tsend(intsockfd,//套接字文件描述符constvoid*buf,//保存待传输数据的缓冲地址值size_tnbytes,//待传输的字节数intflags//传输数据时指定的可选

JavaScript 测试基础,TDD、BDD、Benchmark

目录测试框架TDDBDDBenchmark使用场景结果报告示例示例代码当我写一段测试的时候,我在想些什么设计思路思路一思路二当我以测试驱动开发的时候,我在想些什么原则:谁开发,谁测试。注意:原则上应该先写测试,再进行编码;如果需求时间紧,可以先进行功能实现,但务必后续维护时候将测试代码补充完善。BDD(优先)+TDD(

【业务功能篇112】maven 指定打包某个module子模块 打包命令参数

-am--also-make构建所列模块的所有依赖模块;-pl--projects<arg>构建制定的模块,模块间用逗号分隔;指定settings.xml打包mvncleanpackage--settingsC:\Users\Admini\.m2\settings.xml如果需要根据指定的settings.xml文件打

html综合案例2

<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname="viewport"content="width=device-width,initi

软信天成:如何提高云数据仓库的数据质量?

随着云计算的深入普及、5G和边缘计算等技术带来了数据量的爆发增长,数据驱动商业运作向实时化和自动化迈进,越来越多的企业开始考虑嵌入基于云计算的企业数据仓库,以Snowflake、MicrosoftSynapseAnalytics、GoogleBigQuery、AmazonRedshift等为代表的云数据仓库或成为数据时

神经网络案例分析

神经网络(neuralnetwork)是一种模拟人脑神经思维方式的数据模型,神经网络有多种,包括BP神经网络、卷积神经网络,多层感知器MLP等,最为经典为神经网络为多层感知器MLP(Multi-LayerPerception),SPSSAU默认使用该模型。类似其它的机器学习模型(比如决策树、随机森林、支持向量机SVM等

STM32 CAN使用记录:FDCAN基础通讯

文章目录目的基础说明关键配置与代码轮询方式中断方式收发测试示例链接总结目的CAN是非常常用的一种数据总线,被广泛用在各种车辆系统中。这篇文章将对STM32中FDCAN的使用做个示例。CAN的一些基础介绍与使用可以参考下面文章:《CAN基础概念》https://blog.csdn.net/Naisu_kun/articl

贸易战的影响:跨境电商的“黑洞”风险与机遇

当今全球贸易局势充满了不确定性和动荡。贸易战的阴云笼罩下,跨境电商企业面临着前所未有的挑战,但与此同时,也蕴藏着巨大的机遇。本文将深入探讨贸易战对跨境电商的影响,以及企业在这个新现实中如何应对风险并寻找机遇。贸易战的背景贸易战是国家之间为争夺贸易利益而采取的一系列贸易政策和关税措施的冲突。近年来,中美之间的贸易战引起了

大模型股票交易-挖掘新闻和情绪价值

埃隆·马斯克(ElonMusk)的星际飞船于2023年4月20日升空后爆炸。想象一下,当时您正在观察股市,突然出现新闻,您会如何交易TSLA股票?我希望您不要与我争论,您作为交易者(而不是投资者)要做的第一件事就是摆脱现有的多头头寸并可能做空股票。让我们看看这样的交易是否有利可图。根据此链接SpaceXrocketla

飞驰的高铁-第15届蓝桥杯第一次STEMA测评Scratch真题精选

[导读]:超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成,后续会不定期解读蓝桥杯真题,这是Scratch蓝桥杯真题解析第150讲。飞驰的高铁,本题是2023年8月20日举行的第15届蓝桥杯STEMA测评Scratch编程中级组编程第2题,题目要求编程实现模拟高铁飞驰前进的效果。当按下数字1时,画面中的景

游戏创业小知识:游戏运营的步骤和流程

游戏运营是确保游戏在持续运行中保持活跃和成功的过程。以下是游戏运营的一般步骤流程:1.游戏发布前准备游戏选择:了解并熟悉游戏的核心概念、目标受众和游戏玩法。开发团队:组建开发团队,包括程序员、设计师、艺术家和声音设计师等。技术基础设施:建立游戏服务器、数据库和其他必要的技术基础设施。资金筹集:获取足够的资金来支持游戏的

热文推荐