Skip to content

Commit

Permalink
Add Delta Lake table property change_data_feed_enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and ebyhr committed Feb 21, 2023
1 parent 58f3d69 commit 511e105
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 17 deletions.
9 changes: 6 additions & 3 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ in the metastore. As a result, any Databricks engine can write to the table::
The Delta Lake connector also supports creating tables using the :doc:`CREATE
TABLE AS </sql/create-table-as>` syntax.

There are three table properties available for use in table creation.
The following properties are available for use:

.. list-table:: Delta Lake table properties
:widths: 40, 60
Expand All @@ -554,14 +554,17 @@ There are three table properties available for use in table creation.
- Set partition columns.
* - ``checkpoint_interval``
- Set the checkpoint interval in seconds.
* - ``change_data_feed_enabled``
- Enables storing change data feed entries.

The following example uses all three table properties::
The following example uses all four table properties::

CREATE TABLE example.default.example_partitioned_table
WITH (
location = 's3://my-bucket/a/path',
partitioned_by = ARRAY['regionkey'],
checkpoint_interval = 5
checkpoint_interval = 5,
change_data_feed_enabled = true
)
AS SELECT name, comment, regionkey FROM tpch.tiny.nation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLegacyCreateTableWithExistingLocationEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getChangeDataFeedEnabled;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy;
Expand Down Expand Up @@ -474,6 +476,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
Optional<Long> checkpointInterval = tableHandle.getMetadataEntry().getCheckpointInterval();
checkpointInterval.ifPresent(value -> properties.put(CHECKPOINT_INTERVAL_PROPERTY, value));

Optional<Boolean> changeDataFeedEnabled = tableHandle.getMetadataEntry().isChangeDataFeedEnabled();
changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value));

return new ConnectorTableMetadata(
tableHandle.getSchemaTableName(),
columns,
Expand Down Expand Up @@ -708,6 +713,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
ensurePathExists(session, targetPath);
Path deltaLogDirectory = getTransactionLogDir(targetPath);
Optional<Long> checkpointInterval = getCheckpointInterval(tableMetadata.getProperties());
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(tableMetadata.getProperties());

try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand All @@ -725,6 +731,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
Map<String, Boolean> columnsNullability = tableMetadata.getColumns().stream()
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::isNullable));
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());

appendTableEntries(
0,
transactionLogWriter,
Expand All @@ -734,7 +741,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
columnComments,
columnsNullability,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(checkpointInterval),
configurationForNewTable(checkpointInterval, changeDataFeedEnabled),
CREATE_TABLE_OPERATION,
session,
nodeVersion,
Expand Down Expand Up @@ -863,7 +870,8 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
location,
getCheckpointInterval(tableMetadata.getProperties()),
external,
tableMetadata.getComment());
tableMetadata.getComment(),
getChangeDataFeedEnabled(tableMetadata.getProperties()));
}

private Optional<String> getSchemaLocation(Database database)
Expand Down Expand Up @@ -975,7 +983,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
ImmutableMap.of(),
handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)),
handle.getInputColumns().stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(handle.getCheckpointInterval()),
configurationForNewTable(handle.getCheckpointInterval(), handle.getChangeDataFeedEnabled()),
CREATE_TABLE_AS_OPERATION,
session,
nodeVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class DeltaLakeOutputTableHandle
private final Optional<Long> checkpointInterval;
private final boolean external;
private final Optional<String> comment;
private final Optional<Boolean> changeDataFeedEnabled;

@JsonCreator
public DeltaLakeOutputTableHandle(
Expand All @@ -45,7 +46,8 @@ public DeltaLakeOutputTableHandle(
@JsonProperty("location") String location,
@JsonProperty("checkpointInterval") Optional<Long> checkpointInterval,
@JsonProperty("external") boolean external,
@JsonProperty("comment") Optional<String> comment)
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("changeDataFeedEnabled") Optional<Boolean> changeDataFeedEnabled)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -54,6 +56,7 @@ public DeltaLakeOutputTableHandle(
this.checkpointInterval = checkpointInterval;
this.external = external;
this.comment = requireNonNull(comment, "comment is null");
this.changeDataFeedEnabled = requireNonNull(changeDataFeedEnabled, "changeDataFeedEnabled is null");
}

@JsonProperty
Expand Down Expand Up @@ -106,4 +109,10 @@ public Optional<String> getComment()
{
return comment;
}

@JsonProperty
public Optional<Boolean> getChangeDataFeedEnabled()
{
return changeDataFeedEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.longProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -38,6 +39,7 @@ public class DeltaLakeTableProperties
public static final String LOCATION_PROPERTY = "location";
public static final String PARTITIONED_BY_PROPERTY = "partitioned_by";
public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval";
public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled";

private final List<PropertyMetadata<?>> tableProperties;

Expand Down Expand Up @@ -67,6 +69,11 @@ public DeltaLakeTableProperties()
"Checkpoint interval",
null,
false))
.add(booleanProperty(
CHANGE_DATA_FEED_ENABLED_PROPERTY,
"Enables storing change data feed entries",
null,
false))
.build();
}

Expand Down Expand Up @@ -97,4 +104,9 @@ public static Optional<Long> getCheckpointInterval(Map<String, Object> tableProp

return checkpointInterval;
}

public static Optional<Boolean> getChangeDataFeedEnabled(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Boolean) tableProperties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MetadataEntry
public static final String DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY = "delta.checkpoint.writeStatsAsStruct";

private static final String DELTA_CHECKPOINT_INTERVAL_PROPERTY = "delta.checkpointInterval";
private static final String DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY = "delta.enableChangeDataFeed";

private final String id;
private final String name;
Expand Down Expand Up @@ -156,11 +157,28 @@ public Optional<Long> getCheckpointInterval()
}
}

public static Map<String, String> configurationForNewTable(Optional<Long> checkpointInterval)
@JsonIgnore
public Optional<Boolean> isChangeDataFeedEnabled()
{
if (this.getConfiguration() == null) {
return Optional.empty();
}

String value = this.getConfiguration().get(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY);
if (value == null) {
return Optional.empty();
}

boolean changeDataFeedEnabled = Boolean.parseBoolean(value);
return Optional.of(changeDataFeedEnabled);
}

public static Map<String, String> configurationForNewTable(Optional<Long> checkpointInterval, Optional<Boolean> changeDataFeedEnabled)
{
return checkpointInterval
.map(value -> ImmutableMap.of(DELTA_CHECKPOINT_INTERVAL_PROPERTY, String.valueOf(value)))
.orElseGet(ImmutableMap::of);
ImmutableMap.Builder<String, String> configurationMapBuilder = ImmutableMap.builder();
checkpointInterval.ifPresent(interval -> configurationMapBuilder.put(DELTA_CHECKPOINT_INTERVAL_PROPERTY, String.valueOf(interval)));
changeDataFeedEnabled.ifPresent(enabled -> configurationMapBuilder.put(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY, String.valueOf(enabled)));
return configurationMapBuilder.buildOrThrow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,17 @@ public void testTableWithNonNullableColumns()
assertQuery("SELECT * FROM " + tableName, "VALUES(1, null, 100), (2, null, 200)");
}

@Test
public void testThatEnableCdfTablePropertyIsShownForCtasTables()
{
String tableName = "test_show_create_show_property_for_table_created_with_ctas_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(page_url, views)" +
"WITH (change_data_feed_enabled = true) " +
"AS VALUES ('url1', 1), ('url2', 2)", 2);
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.contains("change_data_feed_enabled = true");
}

@Override
protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter
outputPath.toString(),
Optional.of(deltaLakeConfig.getDefaultCheckpointWritingInterval()),
true,
Optional.empty());
Optional.empty(),
Optional.of(false));

DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider(
new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeTestWithContext;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
Expand All @@ -29,15 +35,28 @@
public class TestDeltaLakeDatabricksChangeDataFeedCompatibility
extends BaseTestDeltaLakeS3Storage
{
@Inject
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3Client;

@BeforeTestWithContext
public void setup()
{
super.setUp();
s3Client = new S3ClientFactory().createS3Client(s3ServerType);
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testUpdateTableWithCdf()
{
String tableName = "test_updates_to_table_with_cdf_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true)");

Assertions.assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = true");

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)");
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)");
Expand All @@ -59,7 +78,7 @@ public void testUpdateTableWithCdf()
row("testValue4", 4, "update_postimage", 5L));
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName);
}
}

Expand Down Expand Up @@ -477,4 +496,58 @@ public void testDeleteFromNullPartitionWithCdfEnabled()
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testThatCdfDoesntWorkWhenPropertyIsNotSet()
{
String tableName1 = "test_cdf_doesnt_work_when_property_is_not_set_1_" + randomNameSuffix();
String tableName2 = "test_cdf_doesnt_work_when_property_is_not_set_2_" + randomNameSuffix();
assertThereIsNoCdfFileGenerated(tableName1, "");
assertThereIsNoCdfFileGenerated(tableName2, "change_data_feed_enabled = false");
}

private void assertThereIsNoCdfFileGenerated(String tableName, String tableProperty)
{
try {
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
(tableProperty.isEmpty() ? "" : ", " + tableProperty) + ")");

if (tableProperty.isEmpty()) {
Assertions.assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString())
.doesNotContain("change_data_feed_enabled");
}
else {
Assertions.assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString())
.contains(tableProperty);
}
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)");
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)");
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3)");

// as INSERTs don't generate cdf files other operation is needed, UPDATE will do
onTrino().executeQuery("UPDATE delta.default." + tableName +
" SET updated_column = 5 WHERE col1 = 'testValue3'");
onDelta().executeQuery("UPDATE default." + tableName +
" SET updated_column = 4 WHERE col1 = 'testValue2'");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(
row("testValue1", 1),
row("testValue2", 4),
row("testValue3", 5));

assertThatThereIsNoChangeDataFiles(tableName);
}
finally {
onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName);
}
}

private void assertThatThereIsNoChangeDataFiles(String tableName)
{
String prefix = "databricks-compatibility-test-" + tableName + "/_change_data/";
ListObjectsV2Result listResult = s3Client.listObjectsV2(bucketName, prefix);
Assertions.assertThat(listResult.getObjectSummaries()).isEmpty();
}
}

0 comments on commit 511e105

Please sign in to comment.