Skip to content

Commit

Permalink
Spark: Reimplement RewriteDatafilesAction with partial progress (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellSpitzer authored Jul 11, 2021
1 parent bed47a4 commit 25eaeba
Show file tree
Hide file tree
Showing 19 changed files with 1,826 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iceberg.actions;

import java.util.Map;
import java.util.List;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;

Expand Down Expand Up @@ -68,7 +68,7 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
* independently and asynchronously.
**/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";
int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 1;

/**
* The output file size that this rewrite strategy will attempt to generate when rewriting files. By default this
Expand Down Expand Up @@ -100,14 +100,14 @@ default RewriteDataFiles binPack() {
* will report a total failure for the job.
*/
interface Result {
Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
List<FileGroupRewriteResult> rewriteResults();

default int addedDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
}

default int rewrittenDataFilesCount() {
return resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
}
}

Expand All @@ -116,6 +116,8 @@ default int rewrittenDataFilesCount() {
* which were formerly part of the table but have been rewritten.
*/
interface FileGroupRewriteResult {
FileGroupInfo info();

int addedDataFilesCount();

int rewrittenDataFilesCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult;

public class BaseFileGroupRewriteResult implements FileGroupRewriteResult {
private final int addedDataFilesCount;
private final int rewrittenDataFilesCount;
private final FileGroupInfo info;

public BaseFileGroupRewriteResult(FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) {
this.info = info;
this.addedDataFilesCount = addedFilesCount;
this.rewrittenDataFilesCount = rewrittenFilesCount;
}

@Override
public FileGroupInfo info() {
return info;
}

@Override
public int addedDataFilesCount() {
return addedDataFilesCount;
}

@Override
public int rewrittenDataFilesCount() {
return rewrittenDataFilesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

public class BaseRewriteDataFilesFileGroupInfo implements RewriteDataFiles.FileGroupInfo {
private final int globalIndex;
private final int partitionIndex;
private final StructLike partition;

public BaseRewriteDataFilesFileGroupInfo(int globalIndex, int partitionIndex, StructLike partition) {
this.globalIndex = globalIndex;
this.partitionIndex = partitionIndex;
this.partition = partition;
}

@Override
public int globalIndex() {
return globalIndex;
}

@Override
public int partitionIndex() {
return partitionIndex;
}

@Override
public StructLike partition() {
return partition;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("globalIndex", globalIndex)
.add("partitionIndex", partitionIndex)
.add("partition", partition)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult;
import org.apache.iceberg.actions.RewriteDataFiles.Result;

public class BaseRewriteDataFilesResult implements Result {
private final List<FileGroupRewriteResult> rewriteResults;

public BaseRewriteDataFilesResult(List<FileGroupRewriteResult> rewriteResults) {
this.rewriteResults = rewriteResults;
}

@Override
public List<FileGroupRewriteResult> rewriteResults() {
return rewriteResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* more files than {@link #MIN_INPUT_FILES} or would produce at least one file of
* {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
*/
abstract class BinPackStrategy implements RewriteStrategy {
public abstract class BinPackStrategy implements RewriteStrategy {

/**
* The minimum number of files that need to be in a file group for it to be considered for
Expand Down
Loading

0 comments on commit 25eaeba

Please sign in to comment.