Skip to content

Commit

Permalink
Spark 3.4: Add REST catalog to Spark integration tests (apache#11698)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Dec 16, 2024
1 parent 16cc4e9 commit 791d0fa
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 20 deletions.
22 changes: 22 additions & 0 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,14 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
testImplementation libs.sqlite.jdbc
testImplementation libs.awaitility
testImplementation libs.junit.vintage.engine
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
}

test {
Expand Down Expand Up @@ -174,6 +179,12 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
testRuntimeOnly libs.sqlite.jdbc

testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
Expand Down Expand Up @@ -252,6 +263,17 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')

// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
// runtime dependencies for running REST Catalog based integration test
integrationRuntimeOnly project(path: ':iceberg-core', configuration: 'testArtifacts')
integrationRuntimeOnly (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
integrationRuntimeOnly libs.jetty.servlet
integrationRuntimeOnly libs.sqlite.jdbc

// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {

spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append();

table.refresh();

Long currentSnapshotId = table.currentSnapshot().snapshotId();

Dataset<Row> actualFilesDs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,15 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

String statsFileName = "stats-file-" + UUID.randomUUID();
String location = table.location();
// not every catalog will return file proto for local directories
// i.e. Hadoop and Hive Catalog do, Jdbc and REST do not
if (!location.startsWith("file:")) {
location = "file:" + location;
}

File statsLocation =
new File(new URI(table.location()))
.toPath()
.resolve("data")
.resolve(statsFileName)
.toFile();
new File(new URI(location)).toPath().resolve("data").resolve(statsFileName).toFile();
StatisticsFile statisticsFile;
try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) {
long snapshotId = table.currentSnapshot().snapshotId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest;

import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.rules.ExternalResource;

/**
* This class is to make the {@link RESTCatalogServer} usable for JUnit4 in a similar way to {@link
* RESTServerExtension}.
*/
public class RESTServerRule extends ExternalResource {
public static final String FREE_PORT = "0";

private volatile RESTCatalogServer localServer;
private RESTCatalog client;
private final Map<String, String> config;

public RESTServerRule() {
config = Maps.newHashMap();
}

public RESTServerRule(Map<String, String> config) {
Map<String, String> conf = Maps.newHashMap(config);
if (conf.containsKey(RESTCatalogServer.REST_PORT)
&& conf.get(RESTCatalogServer.REST_PORT).equals(FREE_PORT)) {
conf.put(RESTCatalogServer.REST_PORT, String.valueOf(RCKUtils.findFreePort()));
}
this.config = conf;
}

public Map<String, String> config() {
return config;
}

public RESTCatalog client() {
if (null == client) {
try {
maybeInitClientAndServer();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return client;
}

public String uri() {
return client().properties().get(CatalogProperties.URI);
}

private void maybeInitClientAndServer() throws Exception {
if (null == localServer) {
synchronized (this) {
if (null == localServer) {
this.localServer = new RESTCatalogServer(config);
this.localServer.start(false);
this.client = RCKUtils.initCatalogClient(config);
}
}
}
}

@Override
protected void before() throws Throwable {
maybeShutdownClientAndServer();
maybeInitClientAndServer();
}

@Override
protected void after() {
maybeShutdownClientAndServer();
}

private void maybeShutdownClientAndServer() {
try {
if (localServer != null) {
localServer.stop();
localServer = null;
}
if (client != null) {
client.close();
client = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public enum SparkCatalogConfig {
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
REST(
"testrest",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "rest", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.spark;

import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
Expand All @@ -45,6 +47,14 @@ public static Object[][] parameters() {
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties()
},
{
SparkCatalogConfig.REST.catalogName(),
SparkCatalogConfig.REST.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, REST_SERVER_RULE.uri())
.build()
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.iceberg.spark;

import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;

import java.io.File;
import java.io.IOException;
import java.util.Map;
Expand All @@ -31,20 +37,42 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogServer;
import org.apache.iceberg.rest.RESTServerRule;
import org.apache.iceberg.util.PropertyUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public abstract class SparkTestBaseWithCatalog extends SparkTestBase {
protected static File warehouse = null;

@ClassRule
public static final RESTServerRule REST_SERVER_RULE =
new RESTServerRule(
Map.of(
RESTCatalogServer.REST_PORT,
RESTServerRule.FREE_PORT,
// In-memory sqlite database by default is private to the connection that created it.
// If more than 1 jdbc connection backed by in-memory sqlite is created behind one
// JdbcCatalog, then different jdbc connections could provide different views of table
// status even belonging to the same catalog. Reference:
// https://www.sqlite.org/inmemorydb.html
CatalogProperties.CLIENT_POOL_SIZE,
"1"));

protected static RESTCatalog restCatalog;

@BeforeClass
public static void createWarehouse() throws IOException {
SparkTestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null);
Assert.assertTrue(warehouse.delete());
restCatalog = REST_SERVER_RULE.client();
}

@AfterClass
Expand All @@ -60,8 +88,8 @@ public static void dropWarehouse() throws IOException {

protected final String catalogName;
protected final Map<String, String> catalogConfig;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
protected Catalog validationCatalog;
protected SupportsNamespaces validationNamespaceCatalog;
protected final TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
protected final String tableName;

Expand All @@ -77,11 +105,7 @@ public SparkTestBaseWithCatalog(
String catalogName, String implementation, Map<String, String> config) {
this.catalogName = catalogName;
this.catalogConfig = config;
this.validationCatalog =
catalogName.equals("testhadoop")
? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse)
: catalog;
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
configureValidationCatalog();

spark.conf().set("spark.sql.catalog." + catalogName, implementation);
config.forEach(
Expand Down Expand Up @@ -127,4 +151,32 @@ protected void configurePlanningMode(String table, PlanningMode planningMode) {
TableProperties.DELETE_PLANNING_MODE,
planningMode.modeName());
}

private void configureValidationCatalog() {
if (catalogConfig.containsKey(ICEBERG_CATALOG_TYPE)) {
switch (catalogConfig.get(ICEBERG_CATALOG_TYPE)) {
case ICEBERG_CATALOG_TYPE_HADOOP:
this.validationCatalog =
new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse);
break;
case ICEBERG_CATALOG_TYPE_REST:
this.validationCatalog = restCatalog;
break;
case ICEBERG_CATALOG_TYPE_HIVE:
this.validationCatalog = catalog;
break;
default:
throw new IllegalArgumentException("Unknown catalog type");
}
} else if (catalogConfig.containsKey(CATALOG_IMPL)) {
switch (catalogConfig.get(CATALOG_IMPL)) {
case "org.apache.iceberg.inmemory.InMemoryCatalog":
this.validationCatalog = new InMemoryCatalog();
break;
default:
throw new IllegalArgumentException("Unknown catalog impl");
}
}
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void testComputeTableStatsAction() throws NoSuchTableException, ParseExce
new SimpleRecord(4, "d"));
spark.createDataset(records, Encoders.bean(SimpleRecord.class)).writeTo(tableName).append();
SparkActions actions = SparkActions.get();
table.refresh();
ComputeTableStats.Result results =
actions.computeTableStats(table).columns("id", "data").execute();
assertThat(results).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.iceberg.spark.sql;

import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -32,7 +35,6 @@
import org.apache.spark.sql.AnalysisException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -293,8 +295,13 @@ public void testAlterColumnPositionFirst() {

@Test
public void testTableRename() {
Assume.assumeFalse(
"Hadoop catalog does not support rename", validationCatalog instanceof HadoopCatalog);
assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
.as(
"need to fix https://github.com/apache/iceberg/issues/11154 before enabling this for the REST catalog")
.isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);
assumeThat(validationCatalog)
.as("Hadoop catalog does not support rename")
.isNotInstanceOf(HadoopCatalog.class);

Assert.assertTrue("Initial name should exist", validationCatalog.tableExists(tableIdent));
Assert.assertFalse("New name should not exist", validationCatalog.tableExists(renamedIdent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ public void testCreateRTASWithPartitionSpecChanging() {
+ "FROM %s ORDER BY 3, 1",
tableName, sourceName);

rtasTable.refresh();

Schema expectedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Expand Down
Loading

0 comments on commit 791d0fa

Please sign in to comment.