概述
上篇文章中介绍过flink实时写入hdfs之BucketingSink,今天介绍的这种方式相比旧版更加灵活强大,官网已经说明:BucketingSink在flink1.9弃用,将在后续版本中被删除。请改用StreamingFileSink。
StreamingFileSink
支持row-wise encoding formats and bulk-encoding formats
- Row-encoded
- 这也是我们常用的方式,例如写入hdfs, 直接上代码:
- 默认粪桶格式类
DateTimeBucketAssigner
public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
//这个是默认的粪桶路径格式,默认根据`dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()))`也就是处理时 间来粪桶, 比如/xxx/2020-01-01--00,这样的路径
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
//这里也可以传入自定义的格式
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
public DateTimeBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
...
BucketAssigner<A, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai"));
- 默认滚动策略
DefaultRollingPolicy
DefaultRollingPolicy<Row, String> rollingPolicy = DefaultRollingPolicy
.create()
// 设置每个文件的最大大小 ,默认是128M。
.withMaxPartSize(parameterTool.getLong("", DEFAULT_MAX_PART_SIZE))
// 滚动写入新文件的时间,默认60s。
.withRolloverInterval(parameterTool.getLong("", DEFAULT_ROLLOVER_INTERVAL))
// 60s空闲,就滚动写入新的文件
.withInactivityInterval(parameterTool.getLong("", DEFAULT_INACTIVITY_INTERVAL))
.build();
- 实例
//系统默认值
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
private static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L*1000L;
public static SinkFunction<Row> getStreamingFileSink() {
StreamingFileSink<Row> sink = StreamingFileSink
.forRowFormat(new Path("/xxx"), new SimpleStringEncoder<Row>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.withBucketAssigner(assigner)
.build();
return sink;
}
//实时流
stream.addSink(sink)
bulk-encoding
- 这种是批量编码格式,可以将数据存为自定义格式例如:
parquet
StreamingFileSink<Prti> bulkSink = StreamingFileSink.
forBulkFormat(new Path("/xxx"), ParquetAvroWriters.forReflectRecord(Xxxx.class))
.withBucketAssigner(new DateTimeBucketAssigner<>()).withBucketCheckInterval(600000)
.build();
- 官网做了一个提示
IMPORTANT: Bulk-encoding formats can only be combined with the
OnCheckpointRollingPolicy
, which rolls the in-progress part file on every checkpoint.
也就是说只能按照每个checkpoint来压缩,这样的话如果ck间隔太短,会产生许多小文件,间隔太长就失去了实时性,个人觉得不如用 Row-encoded的方式,然后定期去离线压缩
- 小提示
粪桶=分桶
最后
以上就是忧郁鸭子为你收集整理的flink实时写入hdfs之StreamingFileSink的全部内容,希望文章能够帮你解决flink实时写入hdfs之StreamingFileSink所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复