6.3、Flink数据写入到MySQL

2023-09-14 15:34:09

目录

1、添加POM依赖

2、这一个完整的案例

3、何时批量写入MySQL呢?

4、容错性的保证(精确一次&至少一次)

4.1、 至少一次

4.2、精确一次


1、添加POM依赖

Apache Flink 集成了通用的 JDBC 连接器,使用时需要根据生产环境的版本引入相应的依赖

官网链接:官网

<!-- 引入 flink jdbc连接器 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

<!-- 引入 mysql数据库的驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

2、这一个完整的案例

开发语言:java1.8

flink版本:flink1.17.0

package com.baidu.datastream.sink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

// 将 Flink数据写入到 MySQL
/*
* TODO Step1、在 开启socket服务,输入下列数据
*       1,红楼梦,曹雪芹,19.8,1
*       1,红楼梦,曹雪芹,19.8,2
*       1,红楼梦,曹雪芹,19.8,3
*       1,红楼梦,曹雪芹,19.8,4
*       1,红楼梦,曹雪芹,19.8,5
*       1,红楼梦,曹雪芹,19.8,6
*
* TODO Step2、MySQL book表DDL
*       create table books(id int, title varchar(99), author varchar(99), price double, qty int);
*
* */
public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启checkpoint后,会触发 数据写入MySQL操作
        //env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 JdbcSink实例
        SinkFunction<String> jdbcSink = JdbcSink.sink(
                // TODO 1、指定要执行的SQL
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                // TODO 2、指定 将 dataStream数据 封装到 SQL的占位符中
                new JdbcStatementBuilder<String>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, String s) throws SQLException {
                        int id = Integer.parseInt(s.split(",")[0]);
                        String title = s.split(",")[1];
                        String author = s.split(",")[2];
                        double price = Double.parseDouble(s.split(",")[3]);
                        int qty = Integer.parseInt(s.split(",")[4]);

                        preparedStatement.setInt(1, id);
                        preparedStatement.setString(2, title);
                        preparedStatement.setString(3, author);
                        preparedStatement.setDouble(4, price);
                        preparedStatement.setInt(5, qty);
                    }
                },
                // TODO 3、指定 批量写入MySQL大小和频率 (当满足 设置的批次或者提交时间时 会触发写入MySQL)
                //  重要:这里的设置非常重要,它控制着flink写入MySQL的延迟程度
                //  当不设置 JdbcExecutionOptions 时,将使用默认配置 (缓存5000条数据后触发写入操作)
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 插入发生异常重试次数 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
                        .withBatchSize(1)  // 批量的大小:条数(默认值为5000)
                        .withBatchIntervalMs(2000) // 批次的时间间隔 (默认值0L,表示无限长的时间间隔)
                        .build(),
                // TODO 4、指定 MySQL的连接信息
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("gaocun123")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );

        streamSource.addSink(jdbcSink);

        // 3.触发程序执行
        env.execute();
    }
}

3、何时批量写入MySQL呢?

可以通过 JdbcExecutionOptions 来控制写入MySQL的数据量和时间频率,这也决定了Flink写入MySQL的延迟程度。

JdbcExecutionOptions 三个常用的参数:

  • withMaxRetries(3) :插入操作发生异常时的重试次数
    • 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
  • withBatchSize(100)  :    批量写入数据的条数(默认值为5000)   
  • withBatchIntervalMs(2000)  :  批量写入的时间间隔 (默认值0L,表示关闭定时写入)
触发批量写入的条件

实时写入MySQL应该如何配置:

// TODO 实时写入MySQL
JdbcExecutionOptions.builder()
    .withMaxRetries(3) // 插入发生异常重试次数 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
    .withBatchSize(1)  // 批量的大小设置为1,表示产生一条数据就会被写入MySQL
    .withBatchIntervalMs(0) // 批次的时间间隔为0,表示关闭定时写入
    .build()

4、容错性的保证(精确一次&至少一次)

4.1、 至少一次

使用Flink提供的 JDBC Sink 能够保证至少一次的语义

注意:

        这里的至少一次的保证指的是 MySQL故障后,数据不会丢失

4.2、精确一次

       对于 JDBC Sink,例如 MySQL,要实现故障时的精确一次的保证通过 upsert 语句或幂等更新实现

MySQL 中常用的 upsert 语句:       

        在MySQL中,"upsert"是指一种操作,它根据一定的条件在表中插入新的行,或者如果已经存在满足条件的行,则更新这些行的数据

使用 upset语句的前提是:

        表具有唯一键(UNIQUE KEY)或主键(PRIMARY KEY),以便在插入行时进行冲突检测


DDL:

-- TODO 要想使用 upsert语句,表必须具有 PRIMARY KEY 或者 UNIQUE约束
create table books
(
    id     int PRIMARY KEY,
    title  varchar(99) UNIQUE,
    author varchar(99),
    price  double,
    qty    int
);

insert into 语句

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,则插入数据失败

示例:

insert into books (id, title, author, price, qty) values (1,'红楼梦','红楼梦',19.9,1);


insert ignore 语句

功能: 向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,则忽略这次插入行为

示例:

-- insert ignore 语句
insert ignore books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹1',19.9,1);

replace into 语句 (更新所有字段)

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,会删除原有数据,再将这次插入数据写入 

示例:

-- replace into 语句
replace into books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹',19.8,2);


insert on duplicate key update 语句 (更新局部字段)

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,只会对 on duplicate key update 后指定的字段进行更新

示例:

-- insert on duplicate key update 语句
insert ignore books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹?',8.8,3)
on duplicate key update author = 'XueQinCao';
;

更多推荐

01_Elasticsearch入门介绍

01_Elasticsearch入门介绍Elasticsearch是什么1、什么是搜索?2、如果用数据库做搜索会怎么样?3、什么是全文检索和Lucene?4、什么是Elasticsearch?5、Elasticsearch的功能6、Elasticsearch的适用场景7、Elasticsearch的特点什么是Elast

Axure原型设计累加器计时器设计效果(职业院校技能大赛物联网技术应用项目原型设计题目)

目录前言一、本题实现效果二、操作步骤1.新建文件2.界面设计2.1文本框2.2按钮2.3设计界面完成3.交互3.1启动交互设置3.2分别设置三个属性3.2.1设置值为“0”3.2.2文字于文本框3.2.3获取焦点时3.3停止按钮的交互动作3.3.1设置变量值3.4重置按钮的交互设置3.4.1设置文字3.5文本框交互设置

【网络基础】——HTTPS

目录HTTPS背景知识HTTPS是什么?加密解密为什么要加密常见的加密方式对称加密非对称加密数据摘要&&数据指纹数字签名HTTPS工作过程探究方案1:只使用对称加密方案2:只使用非对称加密方案3:双方都使用非对称加密​编辑方案4:非对称加密+对称加密中间人攻击证书的引入数据签名查看CA机构方案5:非对称加密+对称加密+

掌动智能浅谈UI自动化测试工具的重要性

在现代软件开发中,用户界面(UI)的质量和可靠性对于一个应用的成功至关重要。为了确保应用在各种环境和设备上都能正常运行,开发团队需要进行全面的UI测试。为了提高测试效率和减少人为错误,UI自动化测试工具成为不可或缺的工具。本文将探讨UI自动化测试工具的重要性,以及其如何提升软件质量与效率。本文将探讨UI自动化测试工具的

文件上传漏洞实验

实验目的掌握上传漏洞的利用。实验工具火狐:MozillaFirefox,中文俗称“火狐”(正式缩写为Fx或fx,非正式缩写为FF),是一个自由及开放源代码网页浏览器,使用Gecko排版引擎,支持多种操作系统,如Windows、MacOSX及GNU/Linux等。实验内容什么是网站(web)漏洞?web漏洞通常是指网站程

中文人物关系知识图谱(含码源):中文人物关系图谱构建、数据回标、基于远程监督人物关系抽取、知识问答等应用.

项目设计集合(人工智能方向):助力新人快速实战掌握技能、自主完成项目设计升级,提升自身的硬实力(不仅限NLP、知识图谱、计算机视觉等领域):汇总有意义的项目设计集合,助力新人快速实战掌握技能,助力用户更好利用CSDN平台,自主完成项目设计升级,提升自身的硬实力。专栏订阅:项目大全提升自身的硬实力[专栏详细介绍:项目设计

论文阅读《2022ICLR:Are Message Passing Neural Networks Really Helpful for Knowledge Graph Completion? 》

论文链接工作简介最近有关知识图谱补全的工作都集中在使用图神经网络来学习实体、关系的嵌入。基于GNN的模型带来的改进归因于增强的信息聚合过程。因此目前关于为KGs开发更好的GNN的研究仍然主要集中在推进信息聚合过程。作者发现:基于GNN的模型中的信息聚合过程并不是所报告的KGC性能改进的最关键原因。具体来说,作者用简单的

Kafka

一,为什么需要消息队列主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发toomanyconnection错误,引发雪崩效应。我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解

Java 基于 SPringBoot 的校园二手书交易平台,附源码、教程

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W+、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌文章目录1.介绍2.技术栈3.需求分析4系统设计4.1数据库设计5系统详细设计5.1系统功能模块5.2管理员功能模块5.3卖家用户功能模块6、源码获取1

港联证券:“火箭蛋”来袭 蛋价涨势能否延续?

上个交易周(9月11日至15日),鸡蛋期货商场呈现了意想不到的涨势。9月15日,鸡蛋期货多个合约大涨,其中2310合约涨超5.6%,主力合约2311盘中两度触及涨停,最终收涨6%。业内人士以为,鸡蛋期货呈现大幅上涨,一方面在于旺季阶段贴水起伏较大,质料本钱抬升,商场预期转强,带动近月合约大幅上涨;另一方面表现在现货端价

Java面向对象编程

主机甲和乙已建立了TCP连接,甲始终以MSS=1KB大小的段发送数据,并一直有数据发送;乙每收到一个数据段都会发出一个接收窗口为10KB的确认段。若甲在t时刻发生超时时拥塞窗口为8KB,则从t时刻起,不再发生超时的情况下,经过10个RTT后,甲的发送窗口是()A.10KBB.12KBC.14KBD.15KB答案:A某主

热文推荐