Skip to content

Commit

Permalink
Spark: Add spark's legacy mode write tests to spark3.0 and 3.1 versio…
Browse files Browse the repository at this point in the history
…ns. (apache#3767)

* Add spark's legacy mode write tests to spark3.0 and 3.1 versions backport of tests from apache#3723
  • Loading branch information
SinghAsDev authored Dec 18, 2021
1 parent e62f3b7 commit c5d3d1c
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void before() {

spark.conf().set("hive.exec.dynamic.partition", "true");
spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));

List<SimpleRecord> expected = Lists.newArrayList(
Expand Down Expand Up @@ -569,6 +570,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception {
assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults);
}

@Test
public void testHiveStyleThreeLevelList() throws Exception {
threeLevelList(true);
}

@Test
public void testThreeLevelList() throws Exception {
threeLevelList(false);
}

@Test
public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(true);
}

@Test
public void testThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(false);
}

@Test
public void testHiveStyleThreeLevelLists() throws Exception {
threeLevelLists(true);
}

@Test
public void testThreeLevelLists() throws Exception {
threeLevelLists(false);
}

@Test
public void testHiveStyleStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(true);
}

@Test
public void testStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(false);
}

public void threeLevelList(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>, col3 ARRAY<STRUCT<col4: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
int testValue2 = 987654;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))",
tableName, testValue1, testValue2);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 STRUCT<col2: ARRAY<STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))",
tableName, testValue1);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}


private SparkTable loadTable(String name) throws NoSuchTableException, ParseException {
return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void before() {

spark.conf().set("hive.exec.dynamic.partition", "true");
spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));

List<SimpleRecord> expected = Lists.newArrayList(
Expand Down Expand Up @@ -569,6 +570,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception {
assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults);
}

@Test
public void testHiveStyleThreeLevelList() throws Exception {
threeLevelList(true);
}

@Test
public void testThreeLevelList() throws Exception {
threeLevelList(false);
}

@Test
public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(true);
}

@Test
public void testThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(false);
}

@Test
public void testHiveStyleThreeLevelLists() throws Exception {
threeLevelLists(true);
}

@Test
public void testThreeLevelLists() throws Exception {
threeLevelLists(false);
}

@Test
public void testHiveStyleStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(true);
}

@Test
public void testStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(false);
}

public void threeLevelList(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2: INT>>, col3 ARRAY<STRUCT<col4: INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
int testValue2 = 987654;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))",
tableName, testValue1, testValue2);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 STRUCT<col2: ARRAY<STRUCT<col3: INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))",
tableName, testValue1);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}


private SparkTable loadTable(String name) throws NoSuchTableException, ParseException {
return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
Expand Down

0 comments on commit c5d3d1c

Please sign in to comment.