【Flink实战】玩转Flink里面核心的Sink Operator实战

2023-09-13 16:24:01

🚀 作者 :“大数据小禅”

🚀 文章简介 :玩转Flink里面核心的Sink Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


Flink Sink Operator简介

  • 在Flink中,Sink Operator(也称为Sink Function或Sink)是指负责将DataStream或DataSet的数据发送到外部存储或外部系统的操作符。Sink Operator是Flink的数据输出端,它的作用是将处理过的数据写入目标位置,如数据库、文件系统、消息队列等。

  • Sink Operator通过将数据传输到外部系统来完成最终的数据存储、展示或其他类型的处理。它可以将数据单个地或批量地发送到目标系统,具体取决于Sink操作符的实现。例如,可以将数据写入关系型数据库、NoSQL数据库、消息队列、文件系统等。

  • 在Flink中,可以使用预定义的Sink操作符,如addSink()方法,或自定义Sink函数来实现数据的输出。预定义的Sink操作符可以满足一般的输出需求,而自定义Sink函数可以根据具体的业务逻辑实现特定的输出操作。

  • 自定义Sink函数需要实现SinkFunction接口或RichSinkFunction抽象类,并重写其中的方法。这些方法包括open()、invoke()和close()等,用于初始化和管理连接,以及处理数据发送等操作。

使用Sink Operator时,需要考虑以下几个方面:

  • 目标系统的可用性和容错性:保证目标系统的可用性,并确保在故障发生时能够进行重试或恢复。
  • 写入的一致性:根据需求选择适当的写入一致性级别,如精确一次(exactly-once)或最少一次(at-least-once)语义。
  • 并行度和性能:根据目标系统的特性和可用资源,设置合适的并行度以提高任务并行处理和整体性能。

Flink 核心知识 Sink Operator速览

  • Flink编程模型
    在这里插入图片描述
  • Sink 输出源
    • 预定义
      • print
      • writeAsText (过期)
    • 自定义
      • SinkFunction
      • RichSinkFunction
        • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
    • flink官方提供 Bundle Connector
      • kafka、ES 等
    • Apache Bahir
      • kafka、ES、Redis等

Flink 自定义的Sink 连接Mysql存储商品订单案例实战

  • 自定义

    • SinkFunction
    • RichSinkFunction
      • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • Flink连接mysql的几种方式(都需要加jdbc驱动)

    • 方式一:自带flink-connector-jdbc 需要加依赖包
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    
    • 方式二:自定义sink
  • 保存视频订单到Mysql

    CREATE TABLE `video_order` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
      `user_id` int(11) DEFAULT NULL,
      `money` int(11) DEFAULT NULL,
      `title` varchar(32) DEFAULT NULL,
      `trade_no` varchar(64) DEFAULT NULL,
      `create_time` date DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    • 添加jdbc依赖
    <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
    </dependency>
    
    • 编码
    public class MysqlSink extends RichSinkFunction<VideoOrder> {
    
        private Connection conn = null;
        private PreparedStatement ps = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");   //url user passwd
            String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
            ps = conn.prepareStatement(sql);
        }
    
        @Override
        public void close() throws Exception {
            if (conn != null) {
                conn.close();
            }
            if (ps != null) {
                ps.close();
            }
        }
    
        @Override
        public void invoke(VideoOrder videoOrder, Context context) throws Exception {
            //给ps中的?设置具体值
            ps.setInt(1,videoOrder.getUserId());
            ps.setInt(2,videoOrder.getMoney());
            ps.setString(3,videoOrder.getTitle());
            ps.setString(4,videoOrder.getTradeNo());
            ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
    
            ps.executeUpdate();
        }
    }
    

    在这里插入图片描述
    在这里插入图片描述

更多推荐

大数据-玩转数据-Flink恶意登录监控

一、恶意登录对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网

图解Raft协议

前言分布式系统设计中,在极大提高可用性、容错性的同时,带来了一致性问题(CAP理论),Raft协议就是解决分布式中的一致性问题。最近研究了Raft协议,谈谈自己对Raft协议的理解。希望这篇文章能够帮助大家理解。raft协议是什么?Raft协议是一种分布式一致性算法(共识算法),共识就是多个节点对某一个事件达成一致的算

网络编程套接字 | TCP套接字

前面的文章中我们使用UDP套接字编写了四个版本,不同的UDP服务器,在本文中我们将要对TCP套接字进行几个不同的版本的代码的编写,首先测试一下TCP套接字的代码,然后是将这个版本进行修改成多进程版本的,再将代码修改成多线程版本的,最后在编写一个线程池版本的代码。在编写TCP套接字之前我们会使用如下的一些APIsocke

QT基础教程(QT中的文件操作)

文章目录前言一、文件操作方法二、QFileInfo类四、QTemporaryFile类总结前言本篇文章我们来讲解QT中的文件操作,文件操作对于QT来说是非常重要的一个点,那么下面的话将给大家详细的讲解QT中的文件操作。一、文件操作方法在QT中,文件操作是通过Qt的文件和目录处理类来完成的。以下是一些常用的文件操作功能:

FLASK中的鉴权的插件Flask-HTTPAuth

在Web应用中,我们经常需要保护我们的api,以避免非法访问。比如,只允许登录成功的用户发表评论等。Flask-HTTPAuth扩展可以很好地对HTTP的请求进行认证,不依赖于Cookie和Session。本文主要介绍两种认证的方式:基于密码和基于令牌(token)。1、安装$pipinstallFlask-HTTPA

数学建模——微分方程介绍

一、基础知识1、一阶微分方程称为一阶微分方程。y(x0)=y0为定解条件。其常规求解方法:(1)变量分离再两边积分就可以求出通解。(2)一阶线性求解公式通解公式:有些一阶微分方程需要通过整体代换,比如u=x+y,u=xy,u=x/y,u=1/yn等化为以上两种类型求解后再还原。2、二阶常系数微分方程【1】【2】【1】为

Java运行时数据区域

运行时数据区域Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。根据《Java虚拟机规范》的规定,Java虚拟机所管理的内存将会包括以下几个运行时数据区域:程序计数器程序计数器(ProgramCounterRegister)是一块较小的内存空间,可以看作是当前线程所执行的字节码的行号

5.10.WebRTC接口宏

那今天呢?我给大家介绍一下webrtc的接口宏,那之所以在现成的章节中要介绍接口宏。是由于接口在调用的过程中啊,会发生线程的切换,所以把接口宏这部分知识我们放在线程这一章还算比较合适的。那另外呢,我们对于接口宏的介绍可能要花费三节到四节的时间,那之所以要用这么大的篇幅来介绍接口宏,是由于接口宏本身是比较复杂的。里边儿涉

python学习--函数

函数的创建与调用什么是函数函数就是执行特定任务或完成特定功能的一段代码为什么需要函数复用代码隐藏实现细节提高可维护性提高可读性便于调试函数的创建def函数名([输入函数]):函数体[returnxxx]defcalc(a,b):#a,b称为形式参数,简称形参,形参的位置是在函数定义处c=a+breturncresult

【Redis GEO】1、地理位置类型的基本用法

1、RedisGEO介绍RedisGEO主要用于存储地理位置信息,并对存储的信息进行操作,该功能在Redis3.2版本新增。RedisGEO操作方法有:geoadd:添加地理位置的坐标。geopos:获取地理位置的坐标。geodist:计算两个位置之间的距离。georadius:根据用户给定的经纬度坐标来获取指定范围内

jQuery 指定区域的内容循环滚动

需求:页面指定区域内的内容循环滚动,但是内容形式、高度都不固定,是接口从编辑器提取出来的内容。代码:<divid="container5"><divclass="content"id="f12red1">自2023年9月20日24时起,国内汽、<br>柴油价格(标准品,下同)每吨分别提高70元。<br>自2023年9月

热文推荐