Flink1.14 Source概念入门讲解与源码解析

2023-09-22 15:50:22

目录

Flink Source概念

Source

Source源码

getBoundedness()

createReader(SourceReaderContext readerContext)

createEnumerator(SplitEnumeratorContext enumContext)

SplitEnumerator restoreEnumerator(SplitEnumeratorContext enumContext, EnumChkT checkpoint) throws Exception,>

SimpleVersionedSerializer getSplitSerializer()

SimpleVersionedSerializer getEnumeratorCheckpointSerializer()

总结

参考


Flink Source概念

Flink的Source主要是由3个核心部分组成:Splits,SplitEnumerator,SourceReader。

  • Split:split是数据源的一部分切片数据,source端将数据进行切片分发,可以并行去读取数据,而split就是一个切片粒度,一般每一次每个slot读取一个split进行处理。
  • SplitEnumerator:SplitEnumerator是一个单例只产生在JobManager中,产生split切片并且分发给sourceReader(TM里面),主要负责负载均衡,维持等待中的split的积压平衡,并且分发split给source Reader。
  • SourceReader:请求split文件并且进行处理,sourceReader是并行运行在TM的source算子中,并且产出并行的时间流/记录流。

(放一下官网的图。。。)

Flink作为批流一体的架构,Data Source API支持数据文件是无界流或者是有界批文件。

对于有界批文件来说,enumerator会产生一系列的split文件,并且每一个split文件明确是有限大小的;而对于无界流来说,则有两种情况 1)splits文件是无限的 2)enumerator不断的产生新的split文件

具体说明一下:

有界文件

Source数据源存在一个URI/Path路径,并且有固定的format去明确如何解析文件。

  • 一个split切片是一个文件,或者是多个文件(一个区域内)。
  • SplitEnumerator会列出目录下所有的文件,当下一个reader需要split切片文件的时候,就会将下一个split发送过去,一旦所有的文件全部发送完成,那么就会发出一个 NoMoreSplits 的标志。
  • SourceReader请求一个split切片,然后读取解析得到的split文件,如果没有更多的split文件后,即收到了 NoMoreSplits 那么就会停止读取。

有界Kafka

同理,只不过每一个split是一个明确的topic分区的end offset。一旦sourceReader达到了end offset,就会完成这个split文件的读取。当所有的split文件完成后,sourceReader就会结束。

无界文件流

无界的情况下,将永远不会产生 NoMoreSplits 的标志,会周期性监控URI/Path路径下是否会产生新的文件。一旦产生了新文件则会生成新的split切片并分发给可用的sourcereaders。

无界Kafka

Source数据源是一个Kafka的Topic文件,或者是一系列Topic/Topic正则。

  • Split切片文件是一个Kafka的Topic分区。
  • SplitEnumerator会连接broker,列出所有订阅的topic分区。enumerator能有选择的重复去发现订阅了的topics新增的分区数据。
  • sourcereader读取分配的split文件(topics 分区)并不会有一个end标志,所以reader永远也不会有end的情况。

Source

在1.14-1.15版本的时候source api是一个工厂模式的接口,用于创建以下的组件。

  • Split Enumerator
  • Source Reader (在1.16版本之后变为通过SourceReaderFactory接口实现
  • Split Serializer
  • Enumerator Checkpoint Serializer

除此之外,Source 还提供了 Boundedness 的特性,从而使得 Flink 可以选择合适的模式来运行 Flink 任务。

Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。

Source源码

接下来看看source的源码

import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.Serializable;

/**
 * The interface for Source. It acts like a factory class that helps construct the {@link
 * SplitEnumerator} and {@link SourceReader} and corresponding serializers.
 *
 * @param <T> The type of records produced by the source.
 * @param <SplitT> The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
// 在flink1.16之后,source的接口变为public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends SourceReaderFactory<T, SplitT>
@Public
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

    /**
     * Get the boundedness of this source.
     *
     * @return the boundedness of this source.
     */
    Boundedness getBoundedness();

    /**
     * Creates a new reader to read data from the splits it gets assigned. The reader starts fresh
     * and does not have any state to resume.
     *
     * @param readerContext The {@link SourceReaderContext context} for the source reader.
     * @return A new SourceReader.
     * @throws Exception The implementor is free to forward all exceptions directly. Exceptions
     *     thrown from this method cause task failure/recovery.
     */
    SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception;

    /**
     * Creates a new SplitEnumerator for this source, starting a new input.
     *
     * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
     * @return A new SplitEnumerator.
     * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
     *     thrown from this method cause JobManager failure/recovery.
     */
    SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
            throws Exception;

    /**
     * Restores an enumerator from a checkpoint.
     *
     * @param enumContext The {@link SplitEnumeratorContext context} for the restored split
     *     enumerator.
     * @param checkpoint The checkpoint to restore the SplitEnumerator from.
     * @return A SplitEnumerator restored from the given checkpoint.
     * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
     *     thrown from this method cause JobManager failure/recovery.
     */
    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
            SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception;

    // ------------------------------------------------------------------------
    //  serializers for the metadata
    // ------------------------------------------------------------------------

    /**
     * Creates a serializer for the source splits. Splits are serialized when sending them from
     * enumerator to reader, and when checkpointing the reader's current state.
     *
     * @return The serializer for the split type.
     */
    SimpleVersionedSerializer<SplitT> getSplitSerializer();

    /**
     * Creates the serializer for the {@link SplitEnumerator} checkpoint. The serializer is used for
     * the result of the {@link SplitEnumerator#snapshotState()} method.
     *
     * @return The serializer for the SplitEnumerator checkpoint.
     */
    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}

我们一个一个函数来看,毕竟一堆看上去确实感觉挺头疼的。。。。

getBoundedness()

主要是返回数据源是否有界,返回类型是Boundedness的枚举类,值只有两个BOUNDED 和 CONTINUOUS_UNBOUNDED。

具体的接口实现有四类(后面的实现都是有四类,这边只讲fileSource相关的,就不会过多介绍了。。。)

public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
        implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
    @Override
    public Boundedness getBoundedness() {
        return continuousEnumerationSettings == null
                ? Boundedness.BOUNDED // 有界
                : Boundedness.CONTINUOUS_UNBOUNDED; // 无界
    }
}

public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit, PendingSplitsCheckpoint>, ResultTypeQueryable<OUT> {
    public Boundedness getBoundedness() {
        return this.boundedness;
    }
}

public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {
    public Boundedness getBoundedness() {
        return ((HybridSource.SourceListEntry)this.sources.get(this.sources.size() - 1)).boundedness;
    }
}

public class NumberSequenceSource
        implements Source<
                        Long,
                        NumberSequenceSource.NumberSequenceSplit,
                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
                ResultTypeQueryable<Long> {

    @Override
    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }
}

其中,continuousEnumerationSettings主要的作用是设置轮询时间,多久去对于无界的文件进行扫描。

createReader(SourceReaderContext readerContext)

创建一个全新的source reader去读取分配给到它的splits文件,不包含任何状态恢复,返回接口SourceReader。在flink1.16的版本中已经放在了SourceReaderFactory接口中实现。

    // abstractFileSource中的实现 
    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
        // fileSourceReader是一种读取方式,从FileSourceSplit中读取记录
        return new FileSourceReader<>(
                readerContext, readerFormat, readerContext.getConfiguration());
    }

其中,readerContext是Flink运行时source的上下文;readerFormat是BulkFormat<T, SplitT>类型(BulkFormat一次读取一批次的数据并且解析),对于reader而言,BulkFormat类主要是一个工厂以及一个配置的持有者,真正读取文件的其实是 BulkFormat.Reader,这个方法是在BulkFormat类中的 createReader(Configuration, FileSourceSplit)方法创建。

createEnumerator(SplitEnumeratorContext<SplitT> enumContext)

为这个source创建新的SplitEnumerator,开始一个新的input。

    @Override
    public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
            SplitEnumeratorContext<SplitT> enumContext) {

        final FileEnumerator enumerator = enumeratorFactory.create();

        // read the initial set of splits (which is also the total set of splits for bounded
        // sources)
        final Collection<FileSourceSplit> splits;
        try {
            // TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked"
            // here
            splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not enumerate file splits", e);
        }

        return createSplitEnumerator(enumContext, enumerator, splits, null);
    }

其中,enumerator是由FileEnumerator工厂类产生的,这个类主要任务是找到所有需要读取的文件,切分它们成为FileSourceSplit。并且遍历路径的同时会过滤文件(如果有文件不想要读取可以通过名称进行过滤),决定是否切分文件为多个split,如何去切分的。

splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());

这里则是进行切分split,里面的函数实现主要是通过递归进行遍历path。顺便提一嘴,具体实现是接口FileEnumerator的具体实现NonSplittingRecursiveEnumerator类。

    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
            throws IOException {
        final ArrayList<FileSourceSplit> splits = new ArrayList<>();

        for (Path path : paths) {
            final FileSystem fs = path.getFileSystem();
            final FileStatus status = fs.getFileStatus(path);
            addSplitsForPath(status, fs, splits);
        }

        return splits;
    }

    private void addSplitsForPath(
            FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)
            throws IOException {
        if (!fileFilter.test(fileStatus.getPath())) {
            return;
        }
        // 判断是文件还是目录,如果是文件则转化为source split去读取。
        // 比如hdfs的话,就会去获取datanode的host
        if (!fileStatus.isDir()) {
            convertToSourceSplits(fileStatus, fs, target);
            return;
        }

        final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
        for (FileStatus containedStatus : containedFiles) {
            // 递归遍历文件目录
            addSplitsForPath(containedStatus, fs, target);
        }
    }

最后createSplitEnumerator这个函数则是去根据是有界数据还是无界数据进行划分,如果无界数据存在alreadyProcessedPaths也会直接去划分split,如果alreadyProcessedPaths为空,才会去周期性的监控路径是否产生新文件。(后续再讲。。。)

    private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createSplitEnumerator(
            SplitEnumeratorContext<SplitT> context,
            FileEnumerator enumerator,
            Collection<FileSourceSplit> splits,
            @Nullable Collection<Path> alreadyProcessedPaths) {

        // cast this to a collection of FileSourceSplit because the enumerator code work
        // non-generically just on that base split type
        @SuppressWarnings("unchecked")
        final SplitEnumeratorContext<FileSourceSplit> fileSplitContext =
                (SplitEnumeratorContext<FileSourceSplit>) context;

        final FileSplitAssigner splitAssigner = assignerFactory.create(splits);

        if (continuousEnumerationSettings == null) {
            // bounded case
            return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner));
        } else {
            // unbounded case
            if (alreadyProcessedPaths == null) {
                alreadyProcessedPaths = splitsToPaths(splits);
            }

            return castGeneric(
                    new ContinuousFileSplitEnumerator(
                            fileSplitContext,
                            enumerator,
                            splitAssigner,
                            inputPaths,
                            alreadyProcessedPaths,
                            continuousEnumerationSettings.getDiscoveryInterval().toMillis()));
        }
    }

    @SuppressWarnings("unchecked")
    private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> castGeneric(
            final SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>>
                    enumerator) {

        // cast arguments away then cast them back. Java Generics Hell :-/
        return (SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>)
                (SplitEnumerator<?, ?>) enumerator;
    }

    private static Collection<Path> splitsToPaths(Collection<FileSourceSplit> splits) {
        return splits.stream()
                .map(FileSourceSplit::path)
                .collect(Collectors.toCollection(HashSet::new));
    }

SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception

主要是通过一个checkpoint去恢复一个枚举器。最后调用的函数与createEnumerator只是多了一个checkpoint.getAlreadyProcessedPaths()参数传递。

    @Override
    public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> restoreEnumerator(
            SplitEnumeratorContext<SplitT> enumContext,
            PendingSplitsCheckpoint<SplitT> checkpoint) {

        final FileEnumerator enumerator = enumeratorFactory.create();

        // cast this to a collection of FileSourceSplit because the enumerator code work
        // non-generically just on that base split type
        @SuppressWarnings("unchecked")
        final Collection<FileSourceSplit> splits =
                (Collection<FileSourceSplit>) checkpoint.getSplits();

        return createSplitEnumerator(
                enumContext, enumerator, splits, checkpoint.getAlreadyProcessedPaths());
    }


SimpleVersionedSerializer<SplitT> getSplitSerializer()

主要是为source splits创建一个序列化器,在splits从enumerator到reader的时候或者是当reader进行checkpoint的时候执行。

    @Override
    public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
        return FileSourceSplitSerializer.INSTANCE;
    }


@PublicEvolving
public final class FileSourceSplitSerializer implements SimpleVersionedSerializer<FileSourceSplit> {

    public static final FileSourceSplitSerializer INSTANCE = new FileSourceSplitSerializer();

SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer()

获取SplitEnumerator checkpoint的序列化器,用于处理SplitEnumerator#snapshotState()方法返回的结果

    @Override
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<SplitT>>
            getEnumeratorCheckpointSerializer() {
        return new PendingSplitsCheckpointSerializer<>(getSplitSerializer());
    }

以上就是Source的接口的所有方法,主要包含创建 SourceReaderSplitEnumerator 和对应get序列化器的方法。

总结

目前可以看出,Souce接口的更新,其实是因为Flink在1.12之前将批处理任务与流处理任务分为两种实现模式。

在底层实现中

DataSet API中Source对应的核心借口是InputFormat,功能上主要有三点:

  1. 描述输入的数据如何被划分为不同的InputSplit,继承于 INputSplitSource
  2. 描述如何从单个InputSplit读取记录,具体包括如何打开一个分配到的InputSplit,如何从这个INputSplit读取一条记录,如何得知记录已经读完和如何关闭这个Inputsplit
  3. 描述如何获取输入数据的统计信息(比如文件的大小、记录的数目)

1、3两点主要会被JobManager/JobMaster在调度Exection时使用,而第2点读取数据功能则会在运行时被TaskManager使用。

DataStream API中 Source 对应的核心接口为 SourceFunction 以及 SourceContext。前者直接继承 Function 接口与 Operator 交互,负责通用的状态管理(比如初始化或取消);后者代表运行时的上下文,负责与单条记录级别的数据的交互。此外还有其他一些辅助类型的类或接口。

运行时,Source 主要通过 SourceContext 来控制数据的输出。从 SourceContext 接口的方法即可以看出,Source 在接受到数据后的主要工作有以下几点:

  1. 从外部摄入数据或者生成数据,输出到下游
  2. 为数据生成 Event Time Timestamp(仅在 Time Characteristic 为 Event Time 时有用)
  3. 计算 Watermark 并输出(仅在 Time Characteristic 为 Event Time 时有用)
  4. 当暂时不会有新数据时将自己标记为 Idle ,以避免下游一直等待自己的 Watermark

综上所述,之前的 Source 接口并不能很好的满足批流一体的发展,所以在 FLIP-27中选择重构Source接口,新接口的核心是通过 SplitEnumerator 和 SplitReader,前者负责发现和分配 Split、触发 Checkpoint 等管理工作,后者负责 Split 的实际读取处理。此外,新增 Operator 间的通信机制(复用大部分现有的 RPC 机制),让 Source Subtask 之间可以协调完成 Event Time 对齐等新特性。最后, SplitReader 底层封装了通用的线程模型,相比之前的 SourceFunction 大大简化了 Source 的实现。

参考

漫谈 Flink Source 接口重构 | 时间与精神的小屋

Flink 源码之新 Source 架构 - 简书

数据源 | Apache Flink

更多推荐

瑞芯微RK3568:Debian系统如何安装Docker

本文基于HD-RK3568-IOT评估板演示Debian系统安装Docker,该方法适用于RK356X全系产品。HD-RK3568-IOT评估板基于HD-RK3568-CORE工业级核心板设计(双网口、双CAN、5路串口),接口丰富,适用于工业现场应用需求,亦方便用户评估核心板及CPU的性能。适用于工业自动化控制、人机

网络安全(黑客)自学

前言:想自学网络安全(黑客技术)首先你得了解什么是网络安全!什么是黑客网络安全可以基于攻击和防御视角来分类,我们经常听到的“红队”、“渗透测试”等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。无论网络、Web、移动、桌面、云等哪个领域,都有攻与防两面性,例如Web安全技术,既有Web渗透,也有

手写模拟Spring的底层原理2.1

先来引入两个问题第一个懒加载还是立即加载的问题,这个问题还是在于就是说,当我们xml配置了一个对象bean的时候,它在spring容器里面是什么时候开始会给我们创建这个对象那如果我们想对某个对象启动懒加载,可以添加@lazy这个注解这个注解一加上,它就只会在得到对象的时候给我们在容器中创建对象也就是在使用下面的方法的时

Reactor 第十二篇 WebFlux集成PostgreSQL

1引言在现代的应用开发中,数据库是存储和管理数据的关键组件。PostgreSQL是一种强大的开源关系型数据库,而WebFlux是Spring框架提供的响应式编程模型。本文将介绍如何使用Reactor和WebFlux集成PostgreSQL,实现响应式的数据库访问。1.环境准备首先,我们需要在项目的pom.xml文件中添

一个例子了解交叉编译

学习嵌入式Linux经常听到交叉编译这个名词,那到底什么是交叉编译,下面通过一个例子来介绍。首先新建一个C文件,其代码如下。#include"stdio.h"voidmain(){inta,b;intc;printf("请输入两个数:\n");scanf("%d%d",&a,&b);c=a+b;printf("a+b=

Ubuntu不能上网解决办法

判断能不能联网1、怎么判断ubuntu确实不能联网?(1)最简单的办法当然是打开一个浏览器,随便输入一个网址,如www.baidu.com,若不能打开该网址,说明可能联网有问题。(2)打开终端,输入ifconfig命令,可以显示当前系统的网络设备,若只出现以下一个设备,表示该系统确实不能联网。(3)同样打开终端,使用p

Python经典练习题(三)

文章目录🍀第一题🍀第二题🍀第三题🍀第一题输入一行字符,分别统计出其中英文字母、空格、数字和其它字符的个数。本题需要我们掌握的知识点在于,判断字符串,是数字还是字母还是啥的,当然在Python内置中几乎都可以找到我们需要的下表我将介绍一些常用的判断函数判断函数描述isalnum()判断是否为字母或数字(字母数字组

SQL Server 数据库变成单个用户怎么办

参考技术A1、首先我们打开SQLSERVER的管理控制台,找到一个要设置角色的用户。2、下面我们将为这个用户赋予创建数据库的角色,我们先用这个用户登录管理工具看一下是否具有创建用户的权限。3、进行数据库创建的时候,提示如下的错误,证明这个用户不具备这个角色的权限。4、下面我们登录sa用户,找到这个用户,右键单击选择属性

拼多多API接口解析,实现根据ID取商品详情

拼多多是一个流行的电商平台,它提供了API接口供开发者使用。要根据ID获取商品详情,您需要使用拼多多API接口并进行相应的请求。以下是使用拼多多API接口根据ID获取商品详情的示例代码(使用Python编写):importrequestsimportjson#拼多多API接口地址api_url="https://api

【漏洞复现】易思智能物流无人值守系统文件上传

本文由掌控安全学院-江月投稿【产品介绍】易思无人值守智能物流系统是一款集成了人工智能、机器人技术和物联网技术的创新产品。它能够自主完成货物存储、检索、分拣、装载以及配送等物流作业,帮助企业实现无人值守的智能物流运营,提高效率、降低成本,为现代物流行业带来新的发展机遇。【漏洞描述】易思无人值守智能物流系统/Sys_Rep

h5下载文件,无兼容问题~

最近写了个页面,打开页面出现文件列表,用户可以下载文件。失败方案使用a标签进行下载,参考代码如下:因为有批量下载的需求,这里将xhr请求单独封装到downloadFile.js中//downloadFile.jsconstdownloadFile=(url,onProgress,xhrAr)=>{console.log

热文推荐