Flink目前对于外部Exactly-Once写支持提供了两种的sink,一个是Kafka-Sink,另一个是Hdfs-Sink,这两种sink实现的Exactly-Once都是基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。本篇将会介绍StreamingFileSink的基本用法、如何压缩数据以及合并产生的小文件。
StreamingFileSink提供了基于行、列两种文件写入格式,用法:
//行
StreamingFileSink.forRowFormat(new Path(path),
new SimpleStringEncoder<T>())
.withBucketAssigner(new PaulAssigner<>()) //分桶策略
.withRollingPolicy(new PaulRollingPolicy<>()) //滚动策略
.withBucketCheckInterval(CHECK_INTERVAL) //检查周期
.build();
//列 parquet
StreamingFileSink.forBulkFormat(new Path(path),
ParquetAvroWriters.forReflectRecord(clazz))
.withBucketAssigner(new PaulBucketAssigner<>())
.withBucketCheckInterval(CHECK_INTERVAL)
.build();
//ParquetWriter.Builder中
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
public class PaulParquetAvroWriters {
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type,CompressionCodecName compressionCodecName) {
final String schemaString = SpecificData.get().getSchema(type).toString();
final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out,compressionCodecName);
return new ParquetWriterFactory<>(builder);
}
//compressionCodecName 压缩算法
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema,CompressionCodecName compressionCodecName) {
final String schemaString = schema.toString();
final ParquetBuilder<GenericRecord> builder = (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out,compressionCodecName);
return new ParquetWriterFactory<>(builder);
}
//compressionCodecName 压缩算法
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type,CompressionCodecName compressionCodecName) {
final String schemaString = ReflectData.get().getSchema(type).toString();
final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out,compressionCodecName);
return new ParquetWriterFactory<>(builder);
}
//compressionCodecName 压缩算法
private static <T> ParquetWriter<T> createAvroParquetWriter(
String schemaString,
GenericData dataModel,
OutputFile out,
CompressionCodecName compressionCodecName) throws IOException {
final Schema schema = new Schema.Parser().parse(schemaString);
return AvroParquetWriter.<T>builder(out)
.withSchema(schema)
.withDataModel(dataModel)
.withCompressionCodec(compressionCodecName)//压缩算法
.build();
}
private PaulParquetAvroWriters() {}
}
不管是Flink还是SparkStreaming写hdfs不可避免需要关注的一个点就是如何处理小文件,众多的小文件会带来两个影响:
理想状态下是按照设置的文件大小滚动,那为什么会产生小文件呢?这与文件滚动周期、checkpoint时间间隔设置相关,如果滚动周期较短、checkpoint时间也比较短或者数据流量有低峰期达到文件不活跃的时间间隔,很容易产生小文件,接下来介绍几种处理小文件的方式:
回顾一下文件生成格式:part + subtaskIndex + connter,其中subtaskIndex代表着任务并行度的序号,也就是代表着当前的一个写task,越大的并行度代表着越多的subtaskIndex,数据就越分散,如果我们减小并行度,数据写入由更少的task来执行,写入就相对集中,这个在一定程度上减少的文件的个数,但是在减少并行的同时意味着任务的并发能力下降;
以parquet写分析为例,parquet写文件由processing状态变为pending状态发生在checkpoint的snapshotState阶段中,如果checkpoint周期时间较短,就会更快发生文件滚动,增大checkpoint周期,那么文件就能积累更多数据之后发生滚动,但是这种增加时间的方式带来的是数据的一定延时;