Skip to content

Commit

Permalink
[ARCTIC-925] Introduce SpillableMap to avoid OOM for optimizing and m…
Browse files Browse the repository at this point in the history
…erge-on-read (#949)

* [ARCTIC-925] Introduce a disk spillable map for optimizer to avoid OOM
Co-authored-by: majin1102 <[email protected]>
  • Loading branch information
majin1102 authored Dec 19, 2022
1 parent 6b429b0 commit ad48dbd
Show file tree
Hide file tree
Showing 111 changed files with 1,815 additions and 823 deletions.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ This product includes code from Apache Iceberg.
* org.apache.iceberg.data.DeleteFilter copied and modified to com.netease.arctic.iceberg.optimize.DeleteFilter
* org.apache.iceberg.deletes.Deletes copied and modified to com.netease.arctic.iceberg.optimize.Deletes
* org.apache.iceberg.data.InternalRecordWrapper copied and modified to com.netease.arctic.iceberg.optimize.InternalRecordWrapper
* org.apache.iceberg.util.StructLikeMap copied and modified to com.netease.arctic.iceberg.optimize.StructLikeMap
* org.apache.iceberg.util.StructLikeSet copied and modified to com.netease.arctic.iceberg.optimize.StructLikeSet
* org.apache.iceberg.util.StructLikeMap copied and modified to com.netease.arctic.utils.map.StructLikeMemoryMap
* org.apache.iceberg.util.StructLikeSet copied and modified to com.netease.arctic.utils.StructLikeSet
* org.apache.iceberg.util.StructLikeWrapper copied and modified to com.netease.arctic.iceberg.optimize.StructLikeWrapper
* org.apache.iceberg.util.StructProjection copied and modified to com.netease.arctic.iceberg.optimize.StructProjection
* org.apache.iceberg.data.parquet.BaseParquetReaders copied and modified to org.apache.iceberg.data.parquet.AdaptHiveBaseParquetReaders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.alibaba.fastjson.JSONObject;
import com.netease.arctic.ams.api.properties.AmsHAProperties;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

package com.netease.arctic.ams.api;

import java.util.Random;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.Random;

/**
* Provides mock zookeeper server.
Expand Down
2 changes: 1 addition & 1 deletion ams/ams-dashboard/src/assets/icons/svg/slide.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
import com.netease.arctic.hive.utils.HiveTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.SerializationUtil;
import com.netease.arctic.utils.SerializationUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Snapshot;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,19 +102,19 @@ protected BaseOptimizeTask buildOptimizeTask(@Nullable List<DataTreeNode> source

List<ByteBuffer> baseFileBytesList =
baseFiles.stream()
.map(SerializationUtil::toByteBuffer)
.map(SerializationUtils::toByteBuffer)
.collect(Collectors.toList());
List<ByteBuffer> insertFileBytesList =
insertFiles.stream()
.map(SerializationUtil::toByteBuffer)
.map(SerializationUtils::toByteBuffer)
.collect(Collectors.toList());
List<ByteBuffer> deleteFileBytesList =
deleteFiles.stream()
.map(SerializationUtil::toByteBuffer)
.map(SerializationUtils::toByteBuffer)
.collect(Collectors.toList());
List<ByteBuffer> posDeleteFileBytesList =
posDeleteFiles.stream()
.map(SerializationUtil::toByteBuffer)
.map(SerializationUtils::toByteBuffer)
.collect(Collectors.toList());
optimizeTask.setBaseFiles(baseFileBytesList);
optimizeTask.setInsertFiles(insertFileBytesList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.netease.arctic.data.IcebergContentFile;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.SerializationUtil;
import com.netease.arctic.utils.SerializationUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
Expand Down Expand Up @@ -118,25 +118,25 @@ protected BaseOptimizeTask buildOptimizeTask(List<DataFile> insertFiles,
baseFiles.stream().map(dataFile -> {
IcebergContentFile icebergContentFile =
new IcebergContentFile(dataFile, sequenceNumberFetcher.sequenceNumberOf(dataFile.path().toString()));
return SerializationUtil.toByteBuffer(icebergContentFile);
return SerializationUtils.toByteBuffer(icebergContentFile);
}).collect(Collectors.toList());
List<ByteBuffer> insertFileBytesList =
insertFiles.stream().map(dataFile -> {
IcebergContentFile icebergContentFile =
new IcebergContentFile(dataFile, sequenceNumberFetcher.sequenceNumberOf(dataFile.path().toString()));
return SerializationUtil.toByteBuffer(icebergContentFile);
return SerializationUtils.toByteBuffer(icebergContentFile);
}).collect(Collectors.toList());
List<ByteBuffer> eqDeleteFileBytesList =
eqDeleteFiles.stream().map(deleteFile -> {
IcebergContentFile icebergContentFile =
new IcebergContentFile(deleteFile, sequenceNumberFetcher.sequenceNumberOf(deleteFile.path().toString()));
return SerializationUtil.toByteBuffer(icebergContentFile);
return SerializationUtils.toByteBuffer(icebergContentFile);
}).collect(Collectors.toList());
List<ByteBuffer> posDeleteFileBytesList =
posDeleteFiles.stream().map(deleteFile -> {
IcebergContentFile icebergContentFile =
new IcebergContentFile(deleteFile, sequenceNumberFetcher.sequenceNumberOf(deleteFile.path().toString()));
return SerializationUtil.toByteBuffer(icebergContentFile);
return SerializationUtils.toByteBuffer(icebergContentFile);
}).collect(Collectors.toList());
optimizeTask.setBaseFiles(baseFileBytesList);
optimizeTask.setInsertFiles(insertFileBytesList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.trace.SnapshotSummary;
import com.netease.arctic.utils.ArcticDataFiles;
import com.netease.arctic.utils.FileUtil;
import com.netease.arctic.utils.SerializationUtil;
import com.netease.arctic.utils.SerializationUtils;
import com.netease.arctic.utils.TableFileUtils;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand Down Expand Up @@ -100,7 +100,7 @@ public boolean commit(long baseSnapshotId) throws Exception {
// tasks in partition
if (task.getOptimizeTask().getTaskId().getType() == OptimizeType.Minor) {
task.getOptimizeRuntime().getTargetFiles().stream()
.map(SerializationUtil::toInternalTableFile)
.map(SerializationUtils::toInternalTableFile)
.forEach(minorAddFiles::add);

minorDeleteFiles.addAll(selectDeletedFiles(task, minorAddFiles));
Expand Down Expand Up @@ -128,7 +128,7 @@ public boolean commit(long baseSnapshotId) throws Exception {
partitionOptimizeType.put(entry.getKey(), OptimizeType.Minor);
} else {
task.getOptimizeRuntime().getTargetFiles().stream()
.map(SerializationUtil::toInternalTableFile)
.map(SerializationUtils::toInternalTableFile)
.forEach(majorAddFiles::add);
majorDeleteFiles.addAll(selectDeletedFiles(task, new HashSet<>()));
partitionOptimizeType.put(entry.getKey(), task.getOptimizeTask().getTaskId().getType());
Expand Down Expand Up @@ -395,29 +395,29 @@ private static Set<ContentFile<?>> selectMinorOptimizeDeletedFiles(BaseOptimizeT
Set<ContentFile<?>> addPosDeleteFiles) {
Set<DataTreeNode> newFileNodes = addPosDeleteFiles.stream().map(contentFile -> {
if (contentFile.content() == FileContent.POSITION_DELETES) {
return FileUtil.parseFileNodeFromFileName(contentFile.path().toString());
return TableFileUtils.parseFileNodeFromFileName(contentFile.path().toString());
}

return null;
}).filter(Objects::nonNull).collect(Collectors.toSet());

return optimizeTask.getPosDeleteFiles().stream().map(SerializationUtil::toInternalTableFile)
return optimizeTask.getPosDeleteFiles().stream().map(SerializationUtils::toInternalTableFile)
.filter(posDeleteFile ->
newFileNodes.contains(FileUtil.parseFileNodeFromFileName(posDeleteFile.path().toString())))
newFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(posDeleteFile.path().toString())))
.collect(Collectors.toSet());
}

private static Set<ContentFile<?>> selectMajorOptimizeDeletedFiles(BaseOptimizeTask optimizeTask,
BaseOptimizeTaskRuntime optimizeTaskRuntime) {
// add base deleted files
Set<ContentFile<?>> result = optimizeTask.getBaseFiles().stream()
.map(SerializationUtil::toInternalTableFile).collect(Collectors.toSet());
.map(SerializationUtils::toInternalTableFile).collect(Collectors.toSet());

// if full optimize or new DataFiles is empty, can delete DeleteFiles
if (optimizeTask.getTaskId().getType() == OptimizeType.FullMajor ||
CollectionUtils.isEmpty(optimizeTaskRuntime.getTargetFiles())) {
result.addAll(optimizeTask.getPosDeleteFiles().stream()
.map(SerializationUtil::toInternalTableFile).collect(Collectors.toSet()));
.map(SerializationUtils::toInternalTableFile).collect(Collectors.toSet()));
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.FileUtil;
import com.netease.arctic.utils.IdGenerator;
import com.netease.arctic.utils.TableFileUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -219,12 +219,12 @@ private List<BaseOptimizeTask> collectKeyedTableTasks(String partition, FileTree
if (!baseFiles.isEmpty()) {
List<DataTreeNode> sourceNodes = Collections.singletonList(subTree.getNode());
Set<DataTreeNode> baseFileNodes = baseFiles.stream()
.map(dataFile -> FileUtil.parseFileNodeFromFileName(dataFile.path().toString()))
.map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString()))
.collect(Collectors.toSet());
List<DeleteFile> posDeleteFiles = partitionPosDeleteFiles
.computeIfAbsent(partition, e -> Collections.emptyList()).stream()
.filter(deleteFile ->
baseFileNodes.contains(FileUtil.parseFileNodeFromFileName(deleteFile.path().toString())))
baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString())))
.collect(Collectors.toList());

if (nodeTaskNeedBuild(posDeleteFiles, baseFiles)) {
Expand Down Expand Up @@ -288,7 +288,7 @@ private void addBaseFileIntoFileTree() {

private long getMaxTransactionId(List<DataFile> dataFiles) {
OptionalLong maxTransactionId = dataFiles.stream()
.mapToLong(file -> FileUtil.parseFileTidFromFileName(file.path().toString())).max();
.mapToLong(file -> TableFileUtils.parseFileTidFromFileName(file.path().toString())).max();
if (maxTransactionId.isPresent()) {
return maxTransactionId.getAsLong();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.trace.SnapshotSummary;
import com.netease.arctic.utils.SerializationUtil;
import com.netease.arctic.utils.SerializationUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -77,14 +77,14 @@ public boolean commit(long baseSnapshotId) throws Exception {
// tasks in partition
if (task.getOptimizeTask().getTaskId().getType() == OptimizeType.Minor) {
task.getOptimizeRuntime().getTargetFiles().stream()
.map(SerializationUtil::toInternalTableFile)
.map(SerializationUtils::toInternalTableFile)
.forEach(minorAddFiles::add);

minorDeleteFiles.addAll(selectDeletedFiles(task));
partitionOptimizeType.put(entry.getKey(), OptimizeType.Minor);
} else {
task.getOptimizeRuntime().getTargetFiles().stream()
.map(SerializationUtil::toInternalTableFile)
.map(SerializationUtils::toInternalTableFile)
.forEach(majorAddFiles::add);
majorDeleteFiles.addAll(selectDeletedFiles(task));
partitionOptimizeType.put(entry.getKey(), task.getOptimizeTask().getTaskId().getType());
Expand Down Expand Up @@ -248,15 +248,15 @@ private static Set<ContentFile<?>> selectMinorOptimizeDeletedFiles(BaseOptimizeT
if (CollectionUtils.isNotEmpty(optimizeTask.getInsertFiles())) {
// small data files
for (ByteBuffer insertFile : optimizeTask.getInsertFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(insertFile).asDataFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(insertFile).asDataFile());
}
} else {
// delete files
for (ByteBuffer eqDeleteFile : optimizeTask.getDeleteFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(eqDeleteFile).asDeleteFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(eqDeleteFile).asDeleteFile());
}
for (ByteBuffer posDeleteFile : optimizeTask.getPosDeleteFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(posDeleteFile).asDeleteFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(posDeleteFile).asDeleteFile());
}
}

Expand All @@ -267,18 +267,18 @@ private static Set<ContentFile<?>> selectMajorOptimizeDeletedFiles(BaseOptimizeT
Set<ContentFile<?>> deletedFiles = new HashSet<>();
// data files
for (ByteBuffer insertFile : optimizeTask.getInsertFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(insertFile).asDataFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(insertFile).asDataFile());
}
for (ByteBuffer baseFile : optimizeTask.getBaseFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(baseFile).asDataFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(baseFile).asDataFile());
}

// delete files
for (ByteBuffer eqDeleteFile : optimizeTask.getDeleteFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(eqDeleteFile).asDeleteFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(eqDeleteFile).asDeleteFile());
}
for (ByteBuffer posDeleteFile : optimizeTask.getPosDeleteFiles()) {
deletedFiles.add(SerializationUtil.toIcebergContentFile(posDeleteFile).asDeleteFile());
deletedFiles.add(SerializationUtils.toIcebergContentFile(posDeleteFile).asDeleteFile());
}

return deletedFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.FileUtil;
import com.netease.arctic.utils.TableFileUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -258,12 +258,12 @@ private List<BaseOptimizeTask> collectKeyedTableTasks(String partition, FileTree
if (!baseFiles.isEmpty()) {
List<DataTreeNode> sourceNodes = Collections.singletonList(subTree.getNode());
Set<DataTreeNode> baseFileNodes = baseFiles.stream()
.map(dataFile -> FileUtil.parseFileNodeFromFileName(dataFile.path().toString()))
.map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString()))
.collect(Collectors.toSet());
List<DeleteFile> posDeleteFiles = partitionPosDeleteFiles
.computeIfAbsent(partition, e -> Collections.emptyList()).stream()
.filter(deleteFile ->
baseFileNodes.contains(FileUtil.parseFileNodeFromFileName(deleteFile.path().toString())))
baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString())))
.collect(Collectors.toList());

if (nodeTaskNeedBuild(posDeleteFiles, baseFiles)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.FileUtil;
import com.netease.arctic.utils.TableFileUtils;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.ContentFile;
Expand Down Expand Up @@ -247,7 +247,7 @@ public long getTransactionId() {

public long getLegacyTransactionId() {
if (legacyTransactionId == -1) {
legacyTransactionId = FileUtil.parseFileTidFromFileName(dataFileInfo.getPath());
legacyTransactionId = TableFileUtils.parseFileTidFromFileName(dataFileInfo.getPath());
}
return legacyTransactionId;
}
Expand Down Expand Up @@ -338,12 +338,12 @@ private List<BaseOptimizeTask> collectKeyedTableTasks(String partition, FileTree
sourceNodes = Collections.singletonList(subTree.getNode());
}
Set<DataTreeNode> baseFileNodes = baseFiles.stream()
.map(dataFile -> FileUtil.parseFileNodeFromFileName(dataFile.path().toString()))
.map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString()))
.collect(Collectors.toSet());
List<DeleteFile> posDeleteFiles = partitionPosDeleteFiles
.computeIfAbsent(partition, e -> Collections.emptyList()).stream()
.filter(deleteFile ->
baseFileNodes.contains(FileUtil.parseFileNodeFromFileName(deleteFile.path().toString())))
baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString())))
.collect(Collectors.toList());
// if no insert files and no eq-delete file, skip
if (CollectionUtils.isEmpty(insertFiles) && CollectionUtils.isEmpty(deleteFiles)) {
Expand Down
Loading

0 comments on commit ad48dbd

Please sign in to comment.