【Flink实战】玩转Flink里面核心的Source Operator实战

2023-09-13 15:08:59

🚀 作者 :“大数据小禅”

🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


Flink 的API层级介绍Source Operator速览

  • Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

    • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理

    • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发

      • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    • 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差

      • 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
      • 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
    • 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式

      • SQL 抽象与 Table API 抽象之间的关联是非常紧密的
    • 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
      在这里插入图片描述

  • Flink编程模型

在这里插入图片描述

  • Source来源

    • 元素集合

      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系统

      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket

      • env.socketTextStream(“ip”, 8888)
    • 自定义Source,实现接口自定义数据源,rich相关的api更丰富

      • 并行度为1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors与第三方系统进行对接(用于source或者sink都可以)

    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
  • Apache Bahir连接器

    • 里面也有kafka、RabbitMQ、ES的连接器更多
  • 总结 和外部系统进行读取写入的

    • 第一种 Flink 里面预定义的 source 和 sink。
    • 第二种 Flink 内部也提供部分 Boundled connectors。
    • 第三种是第三方 Apache Bahir 项目中的连接器。
    • 第四种是通过异步 IO 方式
      • 异步I/O是Flink提供的非常底层的与外部系统交互

Flink 预定义的Source 数据源 案例实战

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
 public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //相同类型元素的数据流 source
        DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
        stringDS1.print("stringDS1");

        DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
        stringDS2.print("stringDS2");

        DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
        longDS3.print("longDS3");

        //DataStream需要调用execute,可以取个名称
        env.execute("xdclass job");
    }

  • 文件/文件系统
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
        //DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
        textDS.print();
        env.execute("xdclass job");
}
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
   public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
        stringDataStream.print();
        env.execute(" job");
}

Flink自定义的Source 数据源案例-订单来源实战

  • 自定义Source,实现接口自定义数据源

    • 并行度为1

      • SourceFunction
      • RichSourceFunction
    • 并行度大于1

      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

  • 创建接口

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;

}


public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {

	private volatile Boolean flag = true;

    private  Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }

    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {
        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
        }
    }

    /**
     * 取消任务
     */
    @Override
    public void cancel() {
        flag = false;
    }
}
  • 案例
public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
        videoOrderDataStream.print();

        //DataStream需要调用execute,可以取个名称
        env.execute("custom source job");
    }

不断产生很多订单

在这里插入图片描述

更多推荐

Vue3自定义指令

文章目录Vue3自定义指令1.自定义全局指令v-focus2.自定义局部指令v-focus3.指令定义的钩子函数3.1概念3.2钩子函数参数3.3vnode&prevNode3.4简写3.5指令函数接受JavaScript表达式Vue3自定义指令1.自定义全局指令v-focus除了默认设置的核心指令(v-model和v

【python】入门第一课:了解基本语法(数据类型)

目录一、介绍1、什么是python?2、python的几个特点二、实例1、注释2、数据类型2.1、字符串str2.2、整数int2.3、浮点数float2.4、布尔bool2.5、列表list2.6、元组tuple2.7、集合set2.8、字典dict一、介绍1、什么是python?Python是一种通用的高级编程语言

ffmpeg安装及使用

centoslinux下安装ffmpeg1、下载解压wgethttp://www.ffmpeg.org/releases/ffmpeg-3.1.tar.gztar-zxvfffmpeg-3.1.tar.gz2、进入解压后目录,输入如下命令/usr/local/ffmpeg为自己指定的安装目录cdffmpeg-3.1./

融云受邀参加 Web3.0 顶级峰会「Meta Era Summit 2023」

本周四19:00-20:00,融云直播课社交泛娱乐出海最短变现路径如何快速实现一款1V1视频应用?欢迎点击上方小程序报名~9月12日,由中国香港Web3.0媒体MetaEra主办的“MetaEraSummit2023”在新加坡收官,融云作为战略合作伙伴参与了峰会。关注【融云全球互联网通信云】了解更多大会以“Metave

【苹果】SpringBoot监听Iphone15邮件提醒,Selenium+Python自动化抢购脚本

前言🍊缘由Iphone15来了,两年之约你还记得吗?两年前,与特别的人有一个特别的约定。虽物是人非,但思念仍在。遂整合之前iphone13及iphone14的相关抢购代码,完成一个SpringBoot监听Iphone15有货邮件提醒+python自动化脚本小功能。后端基于SpringBoot,通过苹果官网进行有货接口

Vue.js vs React:哪一个更适合你的项目?

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🦄博客首页——🐅🐾猫头虎的博客🎐🐳《面试题大全专栏》🦕文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》🐾学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》🐅学会Gol

Spring Cloud实战案例 │ Apollo和Zuul的整合开发

Apollo是携程研发的开源配置管理中心,能够集中管理应用于不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。本案例结合一个案例介绍Apollo和Zuul的整合开发。整个应用分为4个微服务项目,分别是Eureka服务器项目mweathereurekaserver、服务提供者项

OpenSergo & Spring Cloud Alibaba 带来的服务治理能力

博主介绍:✌全网粉丝3W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端

云原生微服务 第四章 Spring Cloud Netflix 之 Eureka

系列文章目录第一章Java线程池技术应用第二章CountDownLatch和Semaphone的应用第三章SpringCloud简介第四章SpringCloudNetflix之Eureka文章目录系列文章目录@[TOC](文章目录)前言1、Eureka两大组件2、Eureka服务注册与发现3、案例3.1、创建主工程3.

jq命令安装与使用

目录一、简介二、下载及安装1.Linux安装2.Windows安装3.测试安装结果三、jq用法1.基本语法2.常见用法1)格式化JSON2)获取属性3)属性不存在情况处理4)数组遍历、截取、展开5)管道、逗号、加号6)数据构造7)基础函数8)过滤、排序、分组函数9)字符串操作函数10)日期函数11)高级用法官网地址:h

网络安全(黑客技术)学习笔记

1.网络安全是什么网络安全可以基于攻击和防御视角来分类,我们经常听到的“红队”、“渗透测试”等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。2.网络安全市场一、是市场需求量高;二、则是发展相对成熟入门比较容易。3.所需要的技术水平需要掌握的知识点偏多(举例):外围打点能力渗透漏洞挖掘流量分析代

热文推荐