Skip to content

Commit

Permalink
Spark: Implement copy-on-write DELETE (apache#1862)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Dec 4, 2020
1 parent 7d7a51d commit af5f600
Show file tree
Hide file tree
Showing 14 changed files with 1,250 additions and 17 deletions.
1 change: 1 addition & 0 deletions .baseline/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
org.apache.commons.lang3.Validate.*,
org.apache.iceberg.expressions.Expressions.*,
org.apache.iceberg.expressions.Expression.Operation.*,
org.apache.iceberg.IsolationLevel.*,
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
org.apache.iceberg.SortDirection.*,
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/org/apache/iceberg/IsolationLevel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* An isolation level in a table.
* <p>
* Two isolation levels are supported: serializable and snapshot isolation. Both of them provide
* a read consistent view of the table to all operations and allow readers to see only already
* committed data. While serializable is the strongest isolation level in databases,
* snapshot isolation is beneficial for environments with many concurrent writers.
* <p>
* The serializable isolation level guarantees that an ongoing UPDATE/DELETE/MERGE operation
* fails if a concurrent transaction commits a new file that might contain rows matching
* the condition used in UPDATE/DELETE/MERGE. For example, if there is an ongoing update
* on a subset of rows and a concurrent transaction adds a new file with records
* that potentially match the update condition, the update operation must fail under
* the serializable isolation but can still commit under the snapshot isolation.
*/
public enum IsolationLevel {
SERIALIZABLE, SNAPSHOT
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,10 @@ private TableProperties() {

public static final String MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep";
public static final int MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1;

public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";

public static final String DELETE_MODE = "write.delete.mode";
public static final String DELETE_MODE_DEFAULT = "copy-on-write";
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
// TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
val writeInfo = newWriteInfo(r.schema)
val mergeBuilder = r.table.asMergeable.newMergeBuilder(writeInfo)
val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)

val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.extensions;

import java.util.Objects;

public class Employee {
private Integer id;
private String dep;

public Employee() {
}

public Employee(Integer id, String dep) {
this.id = id;
this.dep = dep;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getDep() {
return dep;
}

public void setDep(String dep) {
this.dep = dep;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

Employee employee = (Employee) other;
return Objects.equals(id, employee.id) && Objects.equals(dep, employee.dep);
}

@Override
public int hashCode() {
return Objects.hash(id, dep);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.extensions;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTestBase {

private static final Random RANDOM = ThreadLocalRandom.current();

protected final String fileFormat;
protected final boolean vectorized;

public SparkRowLevelOperationsTestBase(String catalogName, String implementation,
Map<String, String> config, String fileFormat,
boolean vectorized) {
super(catalogName, implementation, config);
this.fileFormat = fileFormat;
this.vectorized = vectorized;
}

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, vectorized = {4}")
public static Object[][] parameters() {
return new Object[][] {
{ "testhive", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default"
),
"orc",
true
},
{ "testhadoop", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hadoop"
),
"parquet",
RANDOM.nextBoolean()
},
{ "spark_catalog", SparkSessionCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
"clients", "1",
"parquet-enabled", "false",
"cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
),
"avro",
false
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.extensions;

import java.util.Map;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class TestCopyOnWriteDelete extends TestDelete {

public TestCopyOnWriteDelete(String catalogName, String implementation, Map<String, String> config,
String fileFormat, Boolean vectorized) {
super(catalogName, implementation, config, fileFormat, vectorized);
}

@Override
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.DELETE_MODE, "copy-on-write");
}
}
Loading

0 comments on commit af5f600

Please sign in to comment.