概述
目录
数据类型管理
TypeInformation
数据类型:
类型推断
主动声明类型
数据类型管理
Flink框架内部是自主进行内存管理的,其实现大量使用堆外内存,对开发者隐藏无论类型与逻辑类型的转换细节,Flink自己实现了一套TypeInformation,用于推断与转换数据类型,实现serializers以及comparators。
但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala API和Java API中,Flink分别使用了不同的方式重构了数据类型信息。
同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。
TypeInformation
TypeInformation是为DataStream API设计的,用于描述对象的类型信息,运行时则根据不同的类型进行序列化与反序列化。
Flink能够支持任意的Java或Scala的数据类型,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。
下面将针对DataStream的类型系统做分析说明:
数据类型:
物理类型是真实存在的类型,开发者使用的是物理类型。
逻辑类型是物理类型的描述,Flink运行时会根据逻辑类型进行数据的序列化和反序列化。
BasicTypeInfo:能够支持任意Java 原生基本类型(装箱)或 String 类型,例如Integer、String、Double等
BasicArrayTypeInfo:对应的是Java基本类型数组(装箱)或 String对象的数组
TupleTypeInfo:Flink在Java接口中定义了元祖类(Tuple)供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。
PojoTypeInfo:用以描述任意的POJOs,包括Java 和Scala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
- POJOs 类必须是Public修饰且必须独立定义,不能是内部类;
- POJOs类中必须含有默认空构造器;
- POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
- POJOs类中的字段类型必须是Flink支持的。
类型推断
目的就是在物理类型与逻辑类型间做封装与解析,下文以Java做简述。
在使用DataStream接口的时候,会触发类型的提取。
自动推断类型的核心是TypeExtractor,它是用于对类进行反射分析的工具类,用于确定转换函数实现的返回类型。
自动类型提取是一项复杂的业务,它依赖于很多变量,比如泛型,编译器、接口等。
Java对泛型擦除的机制,会导致Flink在处理Lambda表达式的类型提取会出现异常,进而导致无法推断,因此无论何时使用此类的方法,都需要确保提供传递自定义键入信息作为兜底。如下代码段:
TypeInformation.of(new TypeHint<Tuple2<Integer,String>>(){})
Lambda类型提取的说明:Flink类型的提取是依赖继承机制,但Lambda函数的特殊性在于其类型提取是匿名的(javac编译器不会留下类型信息在字节码中),所以获取不到该信息;所以Flink借鉴了GSON的TypeToken机制,使用TypeHint来保留类型信息。
主动声明类型
我们一般情况下,可以通过TypeInformation.of()方法来显式创建一个类型信息的对象,如下:
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
最后
以上就是幸福紫菜为你收集整理的Flink框架源码——数据类型数据类型管理TypeInformation的全部内容,希望文章能够帮你解决Flink框架源码——数据类型数据类型管理TypeInformation所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复