Skip to content

Commit

Permalink
Add TempFileSinkFactory abstraction
Browse files Browse the repository at this point in the history
This allows to use customized ORC data sink for temporary files when
writing Hive sorted bucketed partitions.
  • Loading branch information
wenleix committed Sep 26, 2018
1 parent f8c51c3 commit 621fac1
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class HivePageSinkProvider
private final EventClient eventClient;
private final HiveSessionProperties hiveSessionProperties;
private final HiveWriterStats hiveWriterStats;
private final OrcFileWriterFactory orcFileWriterFactory;

@Inject
public HivePageSinkProvider(
Expand All @@ -79,7 +80,8 @@ public HivePageSinkProvider(
NodeManager nodeManager,
EventClient eventClient,
HiveSessionProperties hiveSessionProperties,
HiveWriterStats hiveWriterStats)
HiveWriterStats hiveWriterStats,
OrcFileWriterFactory orcFileWriterFactory)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -100,6 +102,7 @@ public HivePageSinkProvider(
this.eventClient = requireNonNull(eventClient, "eventClient is null");
this.hiveSessionProperties = requireNonNull(hiveSessionProperties, "hiveSessionProperties is null");
this.hiveWriterStats = requireNonNull(hiveWriterStats, "stats is null");
this.orcFileWriterFactory = requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null");
}

@Override
Expand Down Expand Up @@ -150,7 +153,8 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
nodeManager,
eventClient,
hiveSessionProperties,
hiveWriterStats);
hiveWriterStats,
orcFileWriterFactory);

return new HivePageSink(
writerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public class HiveWriterFactory

private final HiveWriterStats hiveWriterStats;

private final OrcFileWriterFactory orcFileWriterFactory;

public HiveWriterFactory(
Set<HiveFileWriterFactory> fileWriterFactories,
String schemaName,
Expand All @@ -159,7 +161,8 @@ public HiveWriterFactory(
NodeManager nodeManager,
EventClient eventClient,
HiveSessionProperties hiveSessionProperties,
HiveWriterStats hiveWriterStats)
HiveWriterStats hiveWriterStats,
OrcFileWriterFactory orcFileWriterFactory)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.schemaName = requireNonNull(schemaName, "schemaName is null");
Expand Down Expand Up @@ -248,6 +251,8 @@ public HiveWriterFactory(
}

this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");

this.orcFileWriterFactory = requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null");
}

public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt bucketNumber)
Expand Down Expand Up @@ -519,7 +524,8 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
types,
sortFields,
sortOrders,
pageSorter);
pageSorter,
(fs, p) -> orcFileWriterFactory.createOrcDataSink(session, fs, p));
}

return new HiveWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.util.SortBuffer;
import com.facebook.presto.hive.util.TempFileReader;
import com.facebook.presto.hive.util.TempFileWriter;
import com.facebook.presto.orc.OrcDataSink;
import com.facebook.presto.orc.OrcDataSource;
import com.facebook.presto.orc.OrcDataSourceId;
import com.facebook.presto.spi.Page;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class SortingFileWriter
private final List<SortOrder> sortOrders;
private final HiveFileWriter outputWriter;
private final SortBuffer sortBuffer;
private final TempFileSinkFactory tempFileSinkFactory;
private final Queue<TempFile> tempFiles = new PriorityQueue<>(comparing(TempFile::getSize));
private final AtomicLong nextFileId = new AtomicLong();

Expand All @@ -83,7 +85,8 @@ public SortingFileWriter(
List<Type> types,
List<Integer> sortFields,
List<SortOrder> sortOrders,
PageSorter pageSorter)
PageSorter pageSorter,
TempFileSinkFactory tempFileSinkFactory)
{
checkArgument(maxOpenTempFiles >= 2, "maxOpenTempFiles must be at least two");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
Expand All @@ -94,6 +97,7 @@ public SortingFileWriter(
this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null"));
this.outputWriter = requireNonNull(outputWriter, "outputWriter is null");
this.sortBuffer = new SortBuffer(maxMemory, types, sortFields, sortOrders, pageSorter);
this.tempFileSinkFactory = tempFileSinkFactory;
}

@Override
Expand Down Expand Up @@ -237,7 +241,7 @@ private void writeTempFile(Consumer<TempFileWriter> consumer)
{
Path tempFile = getTempFileName();

try (TempFileWriter writer = new TempFileWriter(types, fileSystem.create(tempFile))) {
try (TempFileWriter writer = new TempFileWriter(types, tempFileSinkFactory.createSink(fileSystem, tempFile))) {
consumer.accept(writer);
writer.close();
tempFiles.add(new TempFile(tempFile, writer.getWrittenBytes()));
Expand Down Expand Up @@ -297,4 +301,10 @@ public String toString()
.toString();
}
}

public interface TempFileSinkFactory
{
OrcDataSink createSink(FileSystem fileSystem, Path path)
throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
*/
package com.facebook.presto.hive.util;

import com.facebook.presto.orc.OrcDataSink;
import com.facebook.presto.orc.OrcWriteValidation.OrcWriteValidationMode;
import com.facebook.presto.orc.OrcWriter;
import com.facebook.presto.orc.OrcWriterOptions;
import com.facebook.presto.orc.OrcWriterStats;
import com.facebook.presto.orc.OutputStreamOrcDataSink;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.stream.IntStream;
Expand All @@ -42,9 +41,9 @@ public class TempFileWriter
{
private final OrcWriter orcWriter;

public TempFileWriter(List<Type> types, OutputStream output)
public TempFileWriter(List<Type> types, OrcDataSink sink)
{
this.orcWriter = createOrcFileWriter(output, types);
this.orcWriter = createOrcFileWriter(sink, types);
}

public void writePage(Page page)
Expand All @@ -69,22 +68,21 @@ public long getWrittenBytes()
return orcWriter.getWrittenBytes();
}

private static OrcWriter createOrcFileWriter(OutputStream output, List<Type> types)
private static OrcWriter createOrcFileWriter(OrcDataSink sink, List<Type> types)
{
List<String> columnNames = IntStream.range(0, types.size())
.mapToObj(String::valueOf)
.collect(toImmutableList());

return new OrcWriter(
new OutputStreamOrcDataSink(output),
sink,
columnNames,
types,
ORC,
LZ4,
new OrcWriterOptions()
.withMaxStringStatisticsLimit(new DataSize(0, BYTE))
.withStripeMinSize(new DataSize(4, MEGABYTE))
.withStripeMaxSize(new DataSize(4, MEGABYTE))
.withStripeMinSize(new DataSize(64, MEGABYTE))
.withDictionaryMaxMemory(new DataSize(1, MEGABYTE)),
ImmutableMap.of(),
UTC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveDataStreamFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultOrcFileWriterFactory;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.hive.HiveTestUtils.mapType;
import static com.facebook.presto.hive.HiveTestUtils.rowType;
Expand Down Expand Up @@ -777,7 +778,8 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
new TestingNodeManager("fake-environment"),
new HiveEventClient(),
new HiveSessionProperties(hiveClientConfig, new OrcFileWriterConfig()),
new HiveWriterStats());
new HiveWriterStats(),
getDefaultOrcFileWriterFactory(hiveClientConfig));
pageSourceProvider = new HivePageSourceProvider(hiveClientConfig, hdfsEnvironment, getDefaultHiveRecordCursorProvider(hiveClientConfig), getDefaultHiveDataStreamFactories(hiveClientConfig), TYPE_MANAGER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveDataStreamFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultOrcFileWriterFactory;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -208,7 +209,8 @@ protected void setup(String host, int port, String databaseName, Function<HiveCl
new TestingNodeManager("fake-environment"),
new HiveEventClient(),
new HiveSessionProperties(config, new OrcFileWriterConfig()),
new HiveWriterStats());
new HiveWriterStats(),
getDefaultOrcFileWriterFactory(config));
pageSourceProvider = new HivePageSourceProvider(config, hdfsEnvironment, getDefaultHiveRecordCursorProvider(config), getDefaultHiveDataStreamFactories(config), TYPE_MANAGER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,22 @@ public static Set<HiveFileWriterFactory> getDefaultHiveFileWriterFactories(HiveC
HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig);
return ImmutableSet.<HiveFileWriterFactory>builder()
.add(new RcFileFileWriterFactory(testHdfsEnvironment, TYPE_MANAGER, new NodeVersion("test_version"), hiveClientConfig, new FileFormatDataSourceStats()))
.add(new OrcFileWriterFactory(
testHdfsEnvironment,
TYPE_MANAGER,
new NodeVersion("test_version"),
hiveClientConfig,
new FileFormatDataSourceStats(),
new OrcFileWriterConfig()))
.add(getDefaultOrcFileWriterFactory(hiveClientConfig))
.build();
}

public static OrcFileWriterFactory getDefaultOrcFileWriterFactory(HiveClientConfig hiveClientConfig)
{
HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig);
return new OrcFileWriterFactory(
testHdfsEnvironment,
TYPE_MANAGER,
new NodeVersion("test_version"),
hiveClientConfig,
new FileFormatDataSourceStats(),
new OrcFileWriterConfig());
}

public static List<Type> getTypes(List<? extends ColumnHandle> columnHandles)
{
ImmutableList.Builder<Type> types = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveDataStreamFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultOrcFileWriterFactory;
import static com.facebook.presto.hive.HiveType.HIVE_DATE;
import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
Expand Down Expand Up @@ -263,7 +264,8 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
new TestingNodeManager("fake-environment"),
new HiveEventClient(),
new HiveSessionProperties(config, new OrcFileWriterConfig()),
stats);
stats,
getDefaultOrcFileWriterFactory(config));
return provider.createPageSink(transaction, getSession(config), handle);
}

Expand Down

0 comments on commit 621fac1

Please sign in to comment.