Flink 类型机制 及 Stream API和Table API类型推断和转换

2023-09-14 17:35:49

注:本文使用flink 版本是0.13

一、类型体系

Flink 有两大API (1)stream API 和 (2)Table API ,分别对应TypeInformation 和 DataType类型体系。

1.1 TypeInformation系统

TypeInformation系统是使用Stream一定会用到的。TypeInformation 以下简称TypeInfo。
TypeInfo 本质就是一对一的类型映射。在java中一个typeInfo就对应着一个确定的java类型。所以在stream api 中某些情况下。给定数据flink可以根据数据自动推断出TypeInfo。
但现在Table API大行其道,Flink已经有意在用DataType替代TypeInfo了。所以Flink中有 DataType To TypeInfo 的API(org.apache.flink.table.runtime.typeutils.InternalTypeInfo虽然只能将DataType转为Table API所规定的TypeInfo),却没有提供 TypeInfo To DataType 的API。
在这里插入图片描述
在Stream API中默认使用的承载行数据的类型是org.apache.flink.types.Row
注意:创建流后如果是复杂类型,比Row类型,非标准的Pojo类型,必须明确告诉Flink是审美类型,Flink无法自动推断出的。
(1)可以在创建流时候提供如org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addSource(org.apache.flink.streaming.api.functions.source.SourceFunction<OUT>, java.lang.String, org.apache.flink.api.common.typeinfo.TypeInformation<OUT>)
(2) 可以在使用function时候提供,如org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns(org.apache.flink.api.common.typeinfo.TypeInformation<T>)
其中return方法参数也有三种(非常重要):

// 比如 DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); 提供 Integer.class就可以了。当然不提供FLink也会推断出的。
public SingleOutputStreamOperator<T> returns(Class<T> typeClass);
// 用于创安含有泛型的TypeInfo
// 如:TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
public SingleOutputStreamOperator<T> returns(TypeHint<T> typeHint);
// TypeInfo的创建方式
// 1. 基础类型从org.apache.flink.api.common.typeinfo.BasicTypeInfo选择即可。
// 2. 不含泛型的类型可从org.apache.flink.api.common.typeinfo.TypeInformation#of方法创建。
// 3. DataStream<Row> 可由 org.apache.flink.api.java.typeutils.RowTypeInfo 创建
// 4. DataStream<RowData> 只能由 org.apache.flink.table.runtime.typeutils.InternalTypeInfo#of(org.apache.flink.table.types.logical.RowType)等方法创建。注意创建出来字段的Field TypeInfo的也都是Flink Table API预先指定好的。
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) ;

(3)可以实现接口org.apache.flink.api.java.typeutils.ResultTypeQueryable提供。

1.2 DataType系统

Table API 中所有的类型都是围绕着DataType构建的。如org.apache.flink.table.api.Schemaorg.apache.flink.table.catalog.ResolvedSchema
前者代表Table api中的Table对象的表结构,后者代表从catalog中获取的表结构。
DataType本质由两部分组成:

    protected final LogicalType logicalType;
    protected final Class<?> conversionClass;

logicalType 即逻辑类型,是和数据库中的类型对应的。
conversionClass 即运行时java类型,是实际承载数据的类型。

可以说DataType 与物理类型也是一对一的关系,并有conversionClass确定。
举个例子:
DataType与数据库中日期对应的类型是DateType ,它有一个支持conversionClass 的列表。分别支持 Date , LocalDate ,Integer。其中 如果使用Integer作为实际承载数据的类型,此时存储的值是与1970-01-01的天数差值。

@PublicEvolving
public final class DateType extends LogicalType {
    private static final String FORMAT = "DATE";
    private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(new String[]{Date.class.getName(), LocalDate.class.getName(), Integer.class.getName()});
    private static final Class<?> DEFAULT_CONVERSION;
 }

如果需要创建DataType,Flink提供的入口是org.apache.flink.table.api.DataTypes 类,提供了各种DataType类型的创建方法。需要注意的是创建的类型都使用了默认 conversionClass ,其就是LogicalType 实现类中指定的DEFAULT_CONVERSION。如果需要指定运行时的类型就需要使用org.apache.flink.table.types.AbstractDataType#bridgedTo方法。

而在Table API中默认使用的承载行数据类型是org.apache.flink.table.data.RowData,是一个接口。常用的实现类是org.apache.flink.table.data.GenericRowData

在Table API中实际承载数据的类型必须使用FLink指定的类型。比如Date类型必须使用Integer,而不能使用java的LocalDate类型。具体类型可以从方法 org.apache.flink.table.types.utils.DataTypeUtils#toInternalDataType(org.apache.flink.table.types.DataType) 得出。

1.3 Stream API 和 Table API相互转换中的类型

转换为核心是 围绕着实际承载数据的类型 即DataType的conversionClass 应为
转换过程均由TableAPI的核心org.apache.flink.table.api.bridge.java.StreamTableEnvironment完成。
刚刚提及了Schema本质就是DataType,并可由DataType创建org.apache.flink.table.api.Schema.Builder#fromRowDataType 。以下就将Schema代指为DataType了。

如下各个方法中
Stream <–相互转化–>Table 中可以指定DataType,也可以不指定DataType(Schema)。

1.3.1 Stream To Table

1.3.1.1 不指定DataType

Flink会从DataStream的TypeInfo中推断DataType类型。
比如 TypeInfo是 org.apache.flink.api.common.typeinfo.BasicTypeInfo#BYTE_TYPE_INFO 或者TypeInfomation.of(Integer.class) 在创建流时候 是Integer类型,则Flink会自动推断出需要使用DataTypes.INT() 创建的对象并把其实际的承载类conversionClass指定为Integer.class

具体可以参考:flink 类型推断data-type-extraction 章节:

# 注意在scala中不要使用primitives 类型需要使用包装类型。因为原始类型不允许为空。
If you intend to implement classes in Scala, it is recommended to use boxed types (e.g. java.lang.Integer) instead of Scala’s primitives. Scala’s primitives (e.g. Int or Double) are compiled to JVM primitives (e.g. int/double) and result in NOT NULL semantics
# 对于没有被列举的类型,是需要额外提供类型的。比如使用@DataTypeHint
Other JVM bridging classes mentioned in this document require a @DataTypeHint annotation.

下图是官方的java类型推断成为FLink DataType的类型
在这里插入图片描述

1.3.1.2 指定DataType

DataType是有LogicalType的,指定了DataType也就指定了LogicalType逻辑类型。
比如现在Row中有一个字段的TypeInfo还是 org.apache.flink.api.common.typeinfo.BasicTypeInfo#BYTE_TYPE_INFO 或者TypeInfomation.of(Integer.class) 也就是java中的int 或 Integer。但指定DataType时候使用DataTypes.DATE().bridgedTo(Integer.class)。此时就已经告诉Flink这里我虽然给你提供的是Integer,但实际代表的逻辑是Date日期类型数据了。以后就可以使用Table API所有关于Date日期的转换方法了。

1.3.2 Table To Stream

Table中都是包含DataType的,可从方法获得,如DataType dt = tbl.getResolvedSchema().toPhysicalRowDataType();

1.3.2.1 不指定DataType

不指定情况比较简单Flink Table API 每种LogicalType逻辑类型都有默认的java类型。
如:

TableDataTypeJavaType(conversionClass)代表内容
intInteger.class
bigintLong.class
dateInteger与1900-01-01天数差
timeInteger.class当天的毫秒数

所以不指定情况下,得到的DataStream中的原Date日期类型的数据一定会转为Integer.class 。并不是java中常用的 LocalDate.class ,也不是 LogicalType.DEFAULT_CONVERSION所规定的DEFAULT_CONVERSION = LocalDate.class;

1.3.2.2 指定DataType

指定情况就会在DataStream中获得想要的java类型了。Flink会在此过程中给提供类型转换服务。
如在表中能获取到DataType,如 DataType dt = tbl.getResolvedSchema().toPhysicalRowDataType();
但在toDataStream或toChangelogStream 可以提供DataType。
当两个DataType不一样的时候Flink就会将Table中的java(物理类型)转换为 提供的。

如下流程:
(1)使用 DataStream<Integer>
(2)指定DataTypes.DATE().bridgedTo(Integer.class)创建Table。此时table中的java数据类型仍然是Integer.class。
(3)使用Table创建Stream,并指定 DataTypes.DATE().bridgedTo(LocalDate.class) ,此时得到的流DataStream<LocalDate>
借此流程就实现了DataStream<Integer>Stream<LocalDate> 的转化。

public interface StreamTableEnvironment extends TableEnvironment {

<T> Table fromDataStream(DataStream<T> dataStream);

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

Table fromChangelogStream(DataStream<Row> dataStream);

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

DataStream<Row> toDataStream(Table table);

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
DataStream<Row> toChangelogStream(Table table);

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

}

总结

综上,Flink中比较容易从DataType得到 TypeInfo 。而从TypeInfo中转化为想要的逻辑类型,八成额外提供DataType(如Integer转Date)。所以,还是建议直接使用Table API中的DataType更加方便。

参考文章:
Flink之数据类型详解
Flink类型系统
聊聊Java类型擦除、Flink中使用Lambda表达式丢失信息和Flink类型暗示机制
聊聊Java泛型类型擦除及Flink类型暗示(type hint)机制
Flink数据类型&&序列化&&序列化器

更多推荐

Jumia、Shein流量逐渐上升,测评自养号如何实现订单突破?

Jumia是全球领先的非洲跨境电商平台,也是非洲第一家在美国主板上市的非洲科技企业。作为100%面向非洲市场的互联网公司,业务范围覆盖尼日利亚、肯尼亚等11个非洲国家。Shein是一家全球领先的时尚和生活方式在线零售商,通过按需生产的模式赋能供应商共同打造敏捷柔性供应链,从而减少浪费,并向全球消费者提供丰富且具有性价比

分布式锁的三种实现方式!

分布式锁是一种用于保证分布式系统中多个进程或线程同步访问共享资源的技术。同时它又是面试中的常见问题,所以我们本文就重点来看分布式锁的具体实现(含实现代码)。在分布式系统中,由于各个节点之间的网络通信延迟、故障等原因,可能会导致数据不一致的问题。分布式锁通过协调多个节点的行为,保证在任何时刻只有一个节点可以访问共享资源,

IntelliJ IDEA 2023.2新特性详解第二弹!

4性能分析器4.1从Run(运行)工具窗口使用分析功能2023.2中,可直接从Run(运行)工具窗口轻松访问IntelliJ分析器的功能。使用新按钮,点击即可调用:AttachIntelliJProfiler(附加IntelliJ分析器)CaptureMemorySnapshot(捕获内存快照)无需打开Profiler

IntelliJ IDEA 2023.2 主要更新了什么?(纯文本介绍版)

🌷🍁博主猫头虎带您GotoNewWorld.✨🍁🦄博客首页——猫头虎的博客🎐🐳《面试题大全专栏》文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大

IntelliJ IDEA 2023.2 主要更新了什么?(图文版)

🌷🍁博主猫头虎带您GotoNewWorld.✨🍁🦄博客首页——猫头虎的博客🎐🐳《面试题大全专栏》文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大

Golang中的GMP调度模型

GMP调度模型Golang调度器的由来单进程时代不需要调度器1.单一的执行流程,计算机只能一个任务一个任务处理。2.进程阻塞所带来的CPU时间浪费。后来操作系统就具有了最早的并发能力:多进程并发,当一个进程阻塞的时候,切换到另外等待执行的进程,这样就能尽量把CPU利用起来,CPU就不浪费了多进程/线程时代有了调度器需求

C++ PrimerPlus 复习 第七章 函数——C++的编程模块(上)

第一章命令编译链接文件make文件第二章进入c++第三章处理数据第四章复合类型(上)第四章复合类型(下)第五章循环和关系表达式第六章分支语句和逻辑运算符第七章函数——C++的编程模块(上)本章重要点注意函数指针,const指针参数。其他的其实都简简单单第七章函数——C++的编程模块(上)函数基本知识;函数原型(函数声明

【数据结构】AVL树的删除(解析有点东西哦)

文章目录前言一、普通二叉搜索树的删除1.删除结点的左右结点都不为空2.删除结点的左结点为空,右节点不为空3.删除结点的右结点为空,左节点不为空4.删除结点的左右结点都不为空二、AVL树的删除1.删除结点,整棵树的高度不变化1.1parent的平衡因子在删除结点之前为01.1.1删除结点为parent的左节点1.1.2删

就只说 3 个 Java 面试题

在面试时,即使是经验丰富的开发人员,也可能会发现这是一些很棘手的问题:1、Java中“transient”关键字的用途是什么?如何才能实现这一目标?在Java中,“transient”关键字用于指示类的特定字段不应包含在对象的序列化形式中。这意味着当对象被序列化时,其状态将转换为可以写入文件或通过网络发送的字节序列。通

Mybatis学习笔记10 高级映射及延迟加载

Mybatis学习笔记9动态SQL_biubiubiu0706的博客-CSDN博客无论简单映射(前面所学的单表和对象之间的映射关系)还是高级映射说到底都是java对象和数据库表记录之间的映射关系准备数据库表:一个班级对应多个学生.班级表:t_class学生表:s_stu(自增)新建模块项目整体结构pom.xml<?xm

深度学习——卷积神经网络

卷积神经网络1计算机视觉(ComputerVision)2边缘检测示例(EdgeDetectionExample)3更多边缘检测内容(MoreEdgeDetectionExample)4Padding5卷积步长(StridedConvolutions)6三维卷积(ConvolutionsOverVolumes)7单层卷

热文推荐