Skip to content

Commit

Permalink
Merge pull request #21 from jeffchao/master
Browse files Browse the repository at this point in the history
Add a time-base semantic to writers to supplement size-based
  • Loading branch information
jeffchao authored Jul 28, 2020
2 parents 270b1ae + 99c4094 commit f3e7f6c
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 101 deletions.
4 changes: 2 additions & 2 deletions mantis-connector-iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ dependencies {
// We only need the Configuration interface. Users can bring their own hadoop-common version.
shadow "org.apache.hadoop:hadoop-common:$hadoopVersion"

api "org.apache.iceberg:iceberg-api:$icebergVersion"
// Exclude in case there are difference in SHAs in 0.7.0 incubation version. We only use interfaces anyway.
// Exclude in case there are differences in SHAs between 0.7.0 incubation versions.
shadow "org.apache.iceberg:iceberg-api:$icebergVersion"
shadow "org.apache.iceberg:iceberg-core:$icebergVersion"
shadow "org.apache.iceberg:iceberg-data:$icebergVersion"
shadow "org.apache.iceberg:iceberg-parquet:$icebergVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -42,7 +43,7 @@
* For example, this class may be used with an S3 compatible filesystem library
* which progressively uploads (multipart) to S3 on each write operation for
* optimizing latencies.
*
* <p>
* Users have the flexibility to choose the semantics of opening, writing, and closing
* this Writer, for example, closing the underlying appender after some number
* of Bytes written and opening a new appender.
Expand Down Expand Up @@ -74,9 +75,9 @@ public BaseIcebergWriter(WriterConfig config, WorkerInfo workerInfo, Table table

/**
* Opens a {@link FileAppender} for a specific {@link FileFormat}.
*
* <p>
* A filename is automatically generated for this appender.
*
* <p>
* Supports Parquet. Avro, Orc, and others unsupported.
*/
@Override
Expand All @@ -88,7 +89,7 @@ public void open() throws IOException {
logger.info("opening new {} file appender {}", format, path);
file = HadoopOutputFile.fromPath(path, config.getHadoopConfig());

switch(format) {
switch (format) {
case PARQUET:
appender = Parquet.write(file)
.schema(table.schema())
Expand All @@ -106,34 +107,38 @@ public void open() throws IOException {

/**
* Closes the currently opened file appender and builds a DataFile.
*
* <p>
* Users are expected to {@link IcebergWriter#open()} a new file appender for this writer
* if they want to continue writing. Users can check for status of the file appender
* using {@link IcebergWriter#isClosed()}.
*
* @return a DataFile representing metadata about the records written.
*/
@Override
public DataFile close() throws IOException {
public DataFile close() throws IOException, RuntimeIOException {
if (appender == null) {
return null;
}

appender.close();

DataFile dataFile = DataFiles.builder(spec)
.withPath(file.location())
.withInputFile(file.toInputFile())
.withFileSizeInBytes(appender.length())
.withPartition(spec.fields().size() == 0 ? null : key)
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.build();

appender = null;
file = null;

return dataFile;
// Calls to FileAppender#close can fail if the backing file system fails to close.
// For example, this can happen for an S3-backed file system where it might fail
// to GET the status of the file. The file would have already been closed.
// Callers should open a new appender.
try {
appender.close();

return DataFiles.builder(spec)
.withPath(file.location())
.withInputFile(file.toInputFile())
.withFileSizeInBytes(appender.length())
.withPartition(spec.fields().size() == 0 ? null : key)
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.build();
} finally {
appender = null;
file = null;
}
}

public boolean isClosed() {
Expand All @@ -142,7 +147,7 @@ public boolean isClosed() {

/**
* Returns the current file size (in Bytes) written using this writer's appender.
*
* <p>
* Users should be careful calling this method in a tight loop because it can
* be expensive depending on the file format, for example in Parquet.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import org.apache.iceberg.DataFile;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.RuntimeIOException;

public interface IcebergWriter {

void open() throws IOException;

void write(Record record);

DataFile close() throws IOException;
DataFile close() throws IOException, RuntimeIOException;

boolean isClosed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;

/**
* Processing stage which writes records to Iceberg through a backing file store.
Expand Down Expand Up @@ -80,6 +85,11 @@ public static List<ParameterDefinition<?>> parameters() {
.validator(Validators.alwaysPass())
.defaultValue(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DEFAULT)
.build(),
new StringParameter().name(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC)
.description(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DESCRIPTION)
.validator(Validators.alwaysPass())
.defaultValue(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DEFAULT)
.build(),
new StringParameter().name(WriterProperties.WRITER_FILE_FORMAT)
.description(WriterProperties.WRITER_FILE_FORMAT_DESCRIPTION)
.validator(Validators.alwaysPass())
Expand All @@ -92,7 +102,7 @@ public static List<ParameterDefinition<?>> parameters() {
* Use this method to create an Iceberg Writer independent of a Mantis Processing Stage.
* <p>
* This is useful for optimizing network utilization by using the writer directly within another
* processing stage instead of having to traverse stage boundaries.
* processing stage to avoid crossing stage boundaries.
* <p>
* This incurs a debuggability trade-off where a processing stage will do multiple things.
*/
Expand Down Expand Up @@ -128,7 +138,7 @@ public void init(Context context) {

IcebergWriter writer = newIcebergWriter(config, workerInfo, table);
WriterMetrics metrics = new WriterMetrics();
transformer = new Transformer(config, metrics, writer);
transformer = new Transformer(config, metrics, writer, Schedulers.computation(), Schedulers.io());
}

@Override
Expand All @@ -152,28 +162,51 @@ public static class Transformer implements Observable.Transformer<Record, DataFi
.withRecordCount(0L)
.build();

private static final Schema TIMER_SCHEMA = new Schema(
Types.NestedField.required(1, "ts_utc_msec", Types.LongType.get()));

private static final Record TIMER_RECORD = GenericRecord.create(TIMER_SCHEMA);

private final WriterConfig config;
private final WriterMetrics metrics;
private final IcebergWriter writer;
private final Scheduler timerScheduler;
private final Scheduler transformerScheduler;

public Transformer(WriterConfig config, WriterMetrics metrics, IcebergWriter writer) {
public Transformer(
WriterConfig config,
WriterMetrics metrics,
IcebergWriter writer,
Scheduler timerScheduler,
Scheduler transformerScheduler) {
this.config = config;
this.metrics = metrics;
this.writer = writer;
this.timerScheduler = timerScheduler;
this.transformerScheduler = transformerScheduler;
}

/**
* Opens an IcebergWriter FileAppender, writes records to a file. Check the file appender
* size on a configured count, and if over a configured threshold, close the file, build
* size on a configured row count, and if over a configured threshold, close the file, build
* and emit a DataFile, and open a new FileAppender.
* <p>
* If the size threshold is not met, secondarily check a time threshold. If there's data, then
* perform the same actions as above. If there's no data, then no-op.
* <p>
* Pair this with a progressive multipart file uploader backend for better latencies.
*/
@Override
public Observable<DataFile> call(Observable<Record> source) {
return source
Observable<Record> timer = Observable.interval(
config.getWriterFlushFrequencyMsec(), TimeUnit.MILLISECONDS, timerScheduler)
.map(i -> TIMER_RECORD);

return source.mergeWith(timer)
.observeOn(transformerScheduler)
.doOnNext(record -> {
if (writer.isClosed()) {
// If closed _and_ timer record, don't open. Only open on new events from `source`.
if (writer.isClosed() && !record.struct().fields().equals(TIMER_SCHEMA.columns())) {
try {
writer.open();
metrics.increment(WriterMetrics.OPEN_SUCCESS_COUNT);
Expand All @@ -183,25 +216,30 @@ public Observable<DataFile> call(Observable<Record> source) {
}
}
})
.scan(new Counter(config.getWriterRowGroupSize()), (counter, record) -> {
try {
writer.write(record);
counter.increment();
metrics.increment(WriterMetrics.WRITE_SUCCESS_COUNT);
} catch (RuntimeException e) {
metrics.increment(WriterMetrics.WRITE_FAILURE_COUNT);
logger.error("error writing record {}", record);
.scan(new Trigger(config.getWriterRowGroupSize()), (trigger, record) -> {
if (record.struct().fields().equals(TIMER_SCHEMA.columns())) {
trigger.timeout();
} else {
try {
writer.write(record);
trigger.increment();
metrics.increment(WriterMetrics.WRITE_SUCCESS_COUNT);
} catch (RuntimeException e) {
metrics.increment(WriterMetrics.WRITE_FAILURE_COUNT);
logger.debug("error writing record {}", record);
}
}
return counter;
return trigger;
})
.filter(Counter::shouldReset)
.filter(counter -> writer.length() >= config.getWriterFlushFrequencyBytes())
.map(counter -> {
.filter(this::shouldFlush)
// Writer can be closed if there are no events, yet timer is still ticking.
.filter(trigger -> !writer.isClosed())
.map(trigger -> {
try {
DataFile dataFile = writer.close();
counter.reset();
trigger.reset();
return dataFile;
} catch (IOException | RuntimeIOException e) {
} catch (IOException | RuntimeException e) {
metrics.increment(WriterMetrics.BATCH_FAILURE_COUNT);
logger.error("error writing DataFile", e);
return ERROR_DATA_FILE;
Expand All @@ -214,54 +252,72 @@ public Observable<DataFile> call(Observable<Record> source) {
metrics.setGauge(WriterMetrics.BATCH_SIZE, dataFile.recordCount());
metrics.setGauge(WriterMetrics.BATCH_SIZE_BYTES, dataFile.fileSizeInBytes());
})
.doOnSubscribe(() -> {
.doOnTerminate(() -> {
try {
writer.open();
metrics.increment(WriterMetrics.OPEN_SUCCESS_COUNT);
logger.info("closing writer on rx terminate signal");
writer.close();
} catch (IOException e) {
metrics.increment(WriterMetrics.OPEN_FAILURE_COUNT);
throw Exceptions.propagate(e);
}
})
.doOnTerminate(() -> {
if (!writer.isClosed()) {
try {
logger.info("closing writer on rx terminate signal");
writer.close();
} catch (IOException e) {
throw Exceptions.propagate(e);
}
}
});
}

private boolean isErrorDataFile(DataFile dataFile) {
return ERROR_DATA_FILE.path() == dataFile.path() &&
return Comparators.charSequences().compare(ERROR_DATA_FILE.path(), dataFile.path()) == 0 &&
ERROR_DATA_FILE.fileSizeInBytes() == dataFile.fileSizeInBytes() &&
ERROR_DATA_FILE.recordCount() == dataFile.recordCount();
}

private static class Counter {
/**
* Flush on size or time.
*/
private boolean shouldFlush(Trigger trigger) {
// Check the trigger to short-circuit if the count is over the threshold first
// because implementations of `writer.length()` may be expensive if called in a tight loop.
return (trigger.isOverCountThreshold() && writer.length() >= config.getWriterFlushFrequencyBytes())
|| trigger.isTimedOut();
}

private static class Trigger {

private final int threshold;
private final int countThreshold;
private int counter;
private boolean timedOut;

Counter(int threshold) {
this.threshold = threshold;
this.counter = 0;
Trigger(int countThreshold) {
this.countThreshold = countThreshold;
}

void increment() {
counter++;
}

void timeout() {
timedOut = true;
}

void reset() {
counter = 0;
timedOut = false;
}

boolean isOverCountThreshold() {
return counter >= countThreshold;
}

boolean shouldReset() {
return counter >= threshold;
boolean isTimedOut() {
return timedOut;
}

@Override
public String toString() {
return "Trigger{"
+ " countThreshold=" + countThreshold
+ ", counter=" + counter
+ ", timedOut=" + timedOut
+ '}';
}
}
}

}
Loading

0 comments on commit f3e7f6c

Please sign in to comment.