概述
目录
Flink的类型分类
Flink的序列化过程
声明类型信息
总结
Flink的类型分类
Flink 在其内部构建了一套自己的类型系统,Flink 现阶段支持的类型分类如图所示,从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。
以上如此多的类型,在Flink中,统一使用TypeInformation类表示。比如,POJO在Flink内部使用PojoTypeInfo来表示,PojoTypeInfo继承自CompositeType,CompositeType继承自TypeInformation。下图展示了TypeInformation的继承关系,可以看到,前面提到的诸多数据类型,在Flink中都有对应的类型。TypeInformation的一个重要的功能就是创建TypeSerializer序列化器,为该类型的数据做序列化。每种类型都有一个对应的序列化器来进行序列化。
Flink的序列化过程
对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfo、WritableTypeIno 等,但针对 GenericTypeInfo类型,Flink会使用Kyro进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink 的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformation、TypeSerializer 和 TypeComparator 即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。
序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java 里面可以简单地理解成一个 byte 数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的 Tuple3 这个对象为例,简述一下它的序列化过程。Tuple3包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person 包含两个字段,一是 int 型的id,另一个是 String 类型的name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple3 会把 int 类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节就可以了。根据 int 占用四个字节,这个能够体现出 Flink 可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。
Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。
声明类型信息
因为Java通常会擦除类型信息,所以你需要将数据类型传递给TypeInformation构造函数:
- 对于非泛型类,直接传入 class 对象即可
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
- 对于泛型类,需要通过 TypeHint 来保存泛型类型信息。
final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
- 预定义的快捷方式
例如 BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,可以直接使用。当然,如果觉得 BasicTypeInfo 还是太长,Flink 还提供了完全等价的 Types 类(org.apache.flink.api.common.typeinfo.Types)
Types.TUPLE(Types.STRING, Types.INT)
- 自定义 TypeInfo 和 TypeInfoFactory
通过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存储更紧凑,运行时也更高效。开发者在自定义类上使用 @TypeInfo 注解,随后创建相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。注意需要继承 TypeInformation 类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是 Tuple(isTupleType)、元数(对于一维的 Row 类型,等于字段的个数)等等,从而为 TypeExtractor 提供决策依据。
总结
个人的理解,不一定对,如有错误希望大家指正。
- 一般情况下,flink自己的类型系统和序列化器能满足大部分的情形;
- 若用户有一些特殊的需求,需要继承TypeInformation、TypeSerializer自定义类和自定义序列化器,也可以实现Flink原生内存管理,而非Kryo;
- 对于不适用于flink的序列化框架的数据类型,flink会使用Kryo来进行序列化,并不是所有的类型都与Kryo无缝连接(例如Google Protobuf、Apache Thrift等),此时可以注册额外的序列化类(env.getConfig().addDefaultKryoSerializer(clazz, serializer))
参考:
Data Types & Serialization
Apache Flink 进阶(五):数据类型和序列化
Flink 类型和序列化机制简介
最后
以上就是潇洒人生为你收集整理的Flink Data Types & Serialization的全部内容,希望文章能够帮你解决Flink Data Types & Serialization所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复