大数据-玩转数据-Flink SQL编程

2023-09-20 16:38:50

一、概念

在这里插入图片描述

1.1 Apache Flink 两种关系型 API

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。

Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

1.2 动态表(Dynamic Tables)

动态表是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
动态表是随时间变化的,可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

对动态表的一般处理过程: 流->动态表->连续查询处理->动态表->流

二、导入Flink Table API依赖

pom.xml 中添加

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-compress</artifactId>
    <version>1.21</version>
</dependency>

三、表与DataStream的混合使用简单案例

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

//必须添加此类才能在表达式中运用$符号
import static org.apache.flink.table.api.Expressions.$;

public class Table_Api_BasicUse {
    public static void main(String[] args) throws Exception {
        // 流运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //并行参数
        env.setParallelism(1);
        // 数据源
             DataStreamSource<WaterSensor> waterSensorStream =
                env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));
        // 创建表的执行环境
        StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env);
        // 创建表,将流转换成动态表. 表的字段名从pojo的属性名自动抽取
        Table table = TableEnv.fromDataStream(waterSensorStream);
        // 对动态表进行查询
        Table resultTable = table
              .where($("id").isEqual("sensor_1"))
                .select($("id"),$("vc"));
        //把动态表转化为流
        DataStream<Row> dataStream = TableEnv.toAppendStream(resultTable,Row.class);
        dataStream.print();
        env.execute();
    }
}

四、表到流的转换

动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流
仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流
retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。

五、通过Connector声明读入数据

前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据

5.1 File source

// 创建表
//表的元数据信息
Schema schema = new Schema()
    .field("id", DataTypes.STRING())
    .field("ts", DataTypes.BIGINT())
    .field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new FileSystem().path("input/sensor.txt"))
    .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
    .withSchema(schema)
    .createTemporaryTable("sensor");
// 做成表对象, 然后对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable
    .groupBy($("id"))
    .select($("id"), $("id").count().as("cnt"));
//  把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

5.2 Kafka Source

// 创建表
// 表的元数据信息
Schema schema = new Schema()
    .field("id", DataTypes.STRING())
    .field("ts", DataTypes.BIGINT())
    .field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv
    .connect(new Kafka()
                 .version("universal")
                 .topic("sensor")
                 .startFromLatest()
                 .property("group.id", "bigdata")
                 .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"))
    .withFormat(new Json())
    .withSchema(schema)
    .createTemporaryTable("sensor");
//对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable
    .groupBy($("id"))
    .select($("id"), $("id").count().as("cnt"));
//把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

六、通过Connector声明写出数据

6.1 File Sink

package com.atguigu.flink.java.chapter_11;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/1/11 21:43
 */
public class Flink02_TableApi_ToFileSystem {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorStream =
            env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                             new WaterSensor("sensor_1", 2000L, 20),
                             new WaterSensor("sensor_2", 3000L, 30),
                             new WaterSensor("sensor_1", 4000L, 40),
                             new WaterSensor("sensor_1", 5000L, 50),
                             new WaterSensor("sensor_2", 6000L, 60));
        // 1. 创建表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Table sensorTable = tableEnv.fromDataStream(waterSensorStream);
        Table resultTable = sensorTable
            .where($("id").isEqual("sensor_1") )
            .select($("id"), $("ts"), $("vc"));

        // 创建输出表
        Schema schema = new Schema()
            .field("id", DataTypes.STRING())
            .field("ts", DataTypes.BIGINT())
            .field("vc", DataTypes.INT());
        tableEnv
            .connect(new FileSystem().path("output/sensor_id.txt"))
            .withFormat(new Csv().fieldDelimiter('|'))
            .withSchema(schema)
            .createTemporaryTable("sensor");

        // 把数据写入到输出表中
        resultTable.executeInsert("sensor");
    }
}

6.2 Kafka Sink

package com.atguigu.flink.java.chapter_11;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/1/11 21:43
 */
public class Flink03_TableApi_ToKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorStream =
            env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                             new WaterSensor("sensor_1", 2000L, 20),
                             new WaterSensor("sensor_2", 3000L, 30),
                             new WaterSensor("sensor_1", 4000L, 40),
                             new WaterSensor("sensor_1", 5000L, 50),
                             new WaterSensor("sensor_2", 6000L, 60));
        // 1. 创建表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Table sensorTable = tableEnv.fromDataStream(waterSensorStream);
        Table resultTable = sensorTable
            .where($("id").isEqual("sensor_1") )
            .select($("id"), $("ts"), $("vc"));

        // 创建输出表
        Schema schema = new Schema()
            .field("id", DataTypes.STRING())
            .field("ts", DataTypes.BIGINT())
            .field("vc", DataTypes.INT());
        tableEnv
            .connect(new Kafka()
                         .version("universal")
                         .topic("sink_sensor")
                         .sinkPartitionerRoundRobin()
                         .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"))
            .withFormat(new Json())
            .withSchema(schema)
            .createTemporaryTable("sensor");

        // 把数据写入到输出表中
        resultTable.executeInsert("sensor");
    }
}

七、基本使用

7.1 查询未注册的表

package com.lyh.flink12;

import org.apache.flink.types.Row;
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Connect_File_source {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> dataStreamSource =
                env.fromElements(new WaterSensor("sensor_1", 1000L, 20),
                new WaterSensor("sensor_1", 2000L, 30),
                new WaterSensor("sensor_1", 3000L, 40),
                new WaterSensor("sensor_1", 4000L, 50),
                new WaterSensor("sensor_1", 5000L, 60));
        // 创建动态表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 使用SQL查询未注册的表
        // 从流中得到一个表
        Table inputTable = tableEnv.fromDataStream(dataStreamSource);
        Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id = 'sensor_1'");
        tableEnv.toAppendStream(resultTable, Row.class).print();
        env.execute();
    }
}

7.2 查询已注册的表

package com.lyh.flink12;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Flink05_SQL_BaseUse_2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> waterSensorStream =
            env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                             new WaterSensor("sensor_1", 2000L, 20),
                             new WaterSensor("sensor_2", 3000L, 30),
                             new WaterSensor("sensor_1", 4000L, 40),
                             new WaterSensor("sensor_1", 5000L, 50),
                             new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 使用sql查询一个已注册的表
        // 1. 从流得到一个表
        Table inputTable = tableEnv.fromDataStream(waterSensorStream);
        // 2. 把注册为一个临时视图
        tableEnv.createTemporaryView("sensor", inputTable);
        // 3. 在临时视图查询数据, 并得到一个新表
        Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");
        // 4. 显示resultTable的数据
        tableEnv.toAppendStream(resultTable, Row.class).print();
        env.execute();
    }
}

7.3 Kafka到Kafka

使用sql从Kafka读数据, 并写入到Kafka中

package com.lyh.flink12;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Sql_kafka_kafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_source_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop100:9029',"
                + "'properties.group.id' = 'atguigu',"
                + "'scan.startup.mode' = 'latest-offset',"
                + "'format' = 'json'"
                + ")");

        // 2. 注册SinkTable: sink_sensor
        tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_sink_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop100:9029',"
                + "'format' = 'json'"
                + ")");

        // 3. 从SourceTable 查询数据, 并写入到 SinkTable
        tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
    }
}

更多推荐

72、Spring Data JPA 的 Specification 动态查询

Specification:规范、规格★Specification查询它也是SpringData提供的查询——是对JPA本身Criteria动态查询的包装。▲为何要有动态查询页面上常常会让用户添加不同的查询条件,程序就需要根据用户输入的条件,动态地组合不同的查询条件。JPA为动态查询提供了Criteria查询支持。Sp

基于SpringBoot的驾校管理系统

基于SpringBoot+Vue的驾校管理系统、前后端分离开发语言:Java数据库:MySQL技术:SpringBoot、Vue、MybaitsPlus、ELementUI工具:IDEA/Ecilpse、Navicat、Maven【主要功能】角色:管理员、用户、教练管理员:学员管理、教练管理、驾校信息管理、报名信息管理

P7075 [CSP-S2020] 儒略日

题目题目描述为了简便计算,天文学家们使用儒略日(Julianday)来表达时间。所谓儒略日,其定义为从公元前4713年1月1日正午12点到此后某一时刻间所经过的天数,不满一天者用小数表达。若利用这一天文学历法,则每一个时刻都将被均匀的映射到数轴上,从而得以很方便的计算它们的差值。现在,给定一个不含小数部分的儒略日,请你

反向动力学Ik学习

参考文章:(非本人原创)英文原文:InverseKinematicsTechniquesinComputerGraphics:ASurvey(andreasaristidou.com)知乎翻译文章:【游戏开发】逆向运动学(IK)详解-知乎(zhihu.com)概念正向运动学(Forwardkinematics,FK):

如何理解和掌握数据的意义和数据分类?

如今,数据是最重要的资源。在大数据时代,一切被记录的事实都是数据,它既可以变现为数值,也可以变现为任何其他形式。那么对于数据的意义和数据分类我们如何去理解和掌握呢?一般可以从这几个方面入手:第一,明确数据的含义。数据可以是数字、文字、字母、符号、图形、图像、视频、音频等,也可以是这些元素的组合。数据可以是对客观事物的记

运维必会的常用linux命令<建议收藏>

文章目录Linux系统简介国产操作系统基本命令cd命令ls命令查看文本文件内容-cat命令分页查看文本文件-less命令查看CPU信息-lscpu命令查看系统内核版本-uname命令查看IP地址-ifconfig命令创建目录-mkdir命令创建空文件-touch命令查看文件前几行-head命令查看文件后几行-tail命

【云原生持续交付和自动化测试】5.3 持续交付和DevOps实践基础知识

往期回顾:第一章:【云原生概念和技术】第二章:【容器化应用程序设计和开发】第三章:【基于容器的部署、管理和扩展】第四章:【微服务架构设计和实现】第五章:【5.1自动化构建和打包容器镜像】第五章:【5.2自动化测试和集成测试】持续交付和DevOps实践基础知识5.3.1什么是持续交付5.3.2DevOps实践基础知识5.

云原生之深入解析K8S集群内的服务通信

一、传统的服务到服务通信Kubernetes支持服务间通信的3个原生k8s对象分别为:ClusterIPService、DNS和Kube-Proxy:在进入Kubernetes生态系统之前,快速了解一下传统的服务到服务通信:通信是通过IP地址进行的,因此为了让服务A调用服务B,一种方法是为服务B分配一个静态IP地址。现

Qt应用开发(基础篇)——菜单 QMenu

一、前言QMenu类继承于QWidget,它提供了一个菜单样式的小部件,用于菜单栏、上下文菜单和一些弹出式菜单。QMenu菜单的选项是可选的,它可以是一个下拉的菜单,也可以是独立的上下文菜单。下拉菜单通常作用于当用户单击相应的项目或按下指定的快捷键时,使用QMenuBar::addMenu()将菜单插入到菜单栏中,菜单

复杂问题问答

复杂问题问答写在最前面复杂问题问答问答系统分类知识图谱现存问题论文1分类写在最前面希望通过了解,找到目标应用场景的方法具体属于哪一个分支,并初步实现通过阅读文献,找到了另一个研究方向,所以这个就先这样吧hh参考[1]冯钧,李艳,杭婷婷.问答系统中复杂问题分解方法研究综述[J].计算机工程与应用,2022,58(17):

设计模式-中介者模式

每次乘坐高铁出行时,我都会像这样一个问题:这么多列车都可能通过这条轨道,会不会存在冲突的可能呢?同样的,飞机的起飞和降落时对于道路的选择也会有冲突的可能。这些情况都会造成可怕的后果,而阻止这种情况发生的就是机场调度中心。飞机在起飞和降落前都会请求机场调度中心,由机场调度中心来负责协调飞机、地面道路、摆渡车辆等。因此,机

热文推荐