我是靠谱客的博主 忧郁鸭子,最近开发中收集的这篇文章主要介绍flink实时写入hdfs之StreamingFileSink,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

上篇文章中介绍过flink实时写入hdfs之BucketingSink,今天介绍的这种方式相比旧版更加灵活强大,官网已经说明:BucketingSink在flink1.9弃用,将在后续版本中被删除。请改用StreamingFileSink。

  • StreamingFileSink支持row-wise encoding formats and bulk-encoding formats
  1. Row-encoded
  • 这也是我们常用的方式,例如写入hdfs, 直接上代码:
  • 默认粪桶格式类DateTimeBucketAssigner
public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {

	private static final long serialVersionUID = 1L;
	//这个是默认的粪桶路径格式,默认根据`dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()))`也就是处理时			 间来粪桶, 比如/xxx/2020-01-01--00,这样的路径
	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
	//这里也可以传入自定义的格式
	private final String formatString;
	private final ZoneId zoneId;
	private transient DateTimeFormatter dateTimeFormatter;
	public DateTimeBucketAssigner() {
		this(DEFAULT_FORMAT_STRING);
	}
	public DateTimeBucketAssigner(String formatString) {
		this(formatString, ZoneId.systemDefault());
	}
	...
	
BucketAssigner<A, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai"));
  • 默认滚动策略DefaultRollingPolicy
DefaultRollingPolicy<Row, String> rollingPolicy = DefaultRollingPolicy
    .create()
     // 设置每个文件的最大大小 ,默认是128M。
     .withMaxPartSize(parameterTool.getLong("", DEFAULT_MAX_PART_SIZE))
     // 滚动写入新文件的时间,默认60s。
     .withRolloverInterval(parameterTool.getLong("", DEFAULT_ROLLOVER_INTERVAL))
     // 60s空闲,就滚动写入新的文件
     .withInactivityInterval(parameterTool.getLong("", DEFAULT_INACTIVITY_INTERVAL))
     .build();
  • 实例
//系统默认值
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
private static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L*1000L;
public static SinkFunction<Row> getStreamingFileSink() {
    StreamingFileSink<Row> sink = StreamingFileSink
            .forRowFormat(new Path("/xxx"), new SimpleStringEncoder<Row>("UTF-8"))
            .withRollingPolicy(rollingPolicy)
            .withBucketAssigner(assigner)
            .build();
    return sink;
}

//实时流
stream.addSink(sink)
  1. bulk-encoding
  • 这种是批量编码格式,可以将数据存为自定义格式例如:parquet
StreamingFileSink<Prti> bulkSink = StreamingFileSink.
    forBulkFormat(new Path("/xxx"), ParquetAvroWriters.forReflectRecord(Xxxx.class))
    .withBucketAssigner(new DateTimeBucketAssigner<>()).withBucketCheckInterval(600000)
    .build();
  • 官网做了一个提示

IMPORTANT: Bulk-encoding formats can only be combined with the OnCheckpointRollingPolicy, which rolls the in-progress part file on every checkpoint.
也就是说只能按照每个checkpoint来压缩,这样的话如果ck间隔太短,会产生许多小文件,间隔太长就失去了实时性,个人觉得不如用 Row-encoded的方式,然后定期去离线压缩

  • 小提示

粪桶=分桶

最后

以上就是忧郁鸭子为你收集整理的flink实时写入hdfs之StreamingFileSink的全部内容,希望文章能够帮你解决flink实时写入hdfs之StreamingFileSink所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部