Skip to content

Commit

Permalink
[FLINK-12965][table][hive] unify catalog view implementations
Browse files Browse the repository at this point in the history
This PR unified implementations of CatalogView.

This closes apache#8882.
  • Loading branch information
bowenli86 committed Jun 27, 2019
1 parent f21ce9b commit 1ab79f6
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@
import org.apache.flink.batch.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.AbstractCatalogView;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogPartition;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.config.CatalogTableConfig;
Expand Down Expand Up @@ -498,23 +499,12 @@ private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
}

if (isView) {
if (isGeneric) {
return new GenericCatalogView(
return new CatalogViewImpl(
hiveTable.getViewOriginalText(),
hiveTable.getViewExpandedText(),
tableSchema,
properties,
comment
);
} else {
return new HiveCatalogView(
hiveTable.getViewOriginalText(),
hiveTable.getViewExpandedText(),
tableSchema,
properties,
comment
);
}
comment);
} else {
return new CatalogTableImpl(tableSchema, partitionKeys, properties, comment);
}
Expand Down Expand Up @@ -546,7 +536,7 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl

// Table columns and partition keys
if (table instanceof CatalogTableImpl) {
CatalogTableImpl catalogTable = (CatalogTableImpl) table;
CatalogTable catalogTable = (CatalogTableImpl) table;

if (catalogTable.isPartitioned()) {
int partitionKeySize = catalogTable.getPartitionKeys().size();
Expand All @@ -559,8 +549,8 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
} else if (table instanceof AbstractCatalogView) {
AbstractCatalogView view = (AbstractCatalogView) table;
} else if (table instanceof CatalogViewImpl) {
CatalogView view = (CatalogViewImpl) table;

// TODO: [FLINK-12398] Support partitioned view in catalog API
sd.setCols(allColumns);
Expand All @@ -571,7 +561,7 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl
hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
throw new CatalogException(
"HiveCatalog only supports CatalogTable and HiveCatalogView");
"HiveCatalog only supports CatalogTableImpl and CatalogViewImpl");
}

return hiveTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
Expand All @@ -39,7 +37,6 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -278,26 +275,6 @@ protected boolean isGeneric() {
return true;
}

@Override
public CatalogView createView() {
return new GenericCatalogView(
String.format("select * from %s", t1),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
createTableSchema(),
new HashMap<>(),
"This is a view");
}

@Override
public CatalogView createAnotherView() {
return new GenericCatalogView(
String.format("select * from %s", t2),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
createAnotherTableSchema(),
new HashMap<>(),
"This is another view");
}

@Override
protected CatalogFunction createFunction() {
return new GenericCatalogFunction(MyScalarFunction.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
Expand Down Expand Up @@ -140,26 +139,6 @@ public CatalogTable createStreamingTable() {
);
}

@Override
public CatalogView createView() {
return new HiveCatalogView(
String.format("select * from %s", t1),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
createTableSchema(),
new HashMap<>(),
"This is a hive view");
}

@Override
public CatalogView createAnotherView() {
return new HiveCatalogView(
String.format("select * from %s", t2),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
createAnotherTableSchema(),
new HashMap<>(),
"This is another hive view");
}

@Override
protected CatalogFunction createFunction() {
return new HiveCatalogFunction("test.class.name");
Expand All @@ -175,18 +154,6 @@ public CatalogPartition createPartition() {
return new HiveCatalogPartition(getBatchTableProperties());
}

@Override
protected void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getSchema(), v1.getSchema());
assertEquals(v1.getComment(), v2.getComment());
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());

// Hive views may have properties created by itself
// thus properties of Hive view is a super set of those in its corresponding Flink view
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
}

@Override
protected void checkEquals(CatalogPartition expected, CatalogPartition actual) {
assertTrue(expected instanceof HiveCatalogPartition && actual instanceof HiveCatalogPartition);
Expand Down
22 changes: 2 additions & 20 deletions flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction',
'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
'CatalogColumnStatistics', 'HiveCatalog', 'HiveCatalogFunction',
'HiveCatalogPartition', 'HiveCatalogView']
'HiveCatalogPartition']


class Catalog(object):
Expand Down Expand Up @@ -626,11 +626,7 @@ def __init__(self, j_catalog_base_table):

@staticmethod
def _get(j_catalog_base_table):
if j_catalog_base_table.getClass().getName() == \
"org.apache.flink.table.catalog.hive.HiveCatalogView":
return HiveCatalogView(j_hive_catalog_view=j_catalog_base_table)
else:
return CatalogBaseTable(j_catalog_base_table)
return CatalogBaseTable(j_catalog_base_table)

def get_properties(self):
"""
Expand Down Expand Up @@ -1011,17 +1007,3 @@ def __int__(self, properties=None, location=None, j_hive_catalog_partition=None)

def get_location(self):
return self._j_catalog_partition.getLocation()


class HiveCatalogView(CatalogBaseTable):
"""
A Hive catalog view implementation.
"""

def __init__(self, original_query=None, expanded_query=None, table_schema=None,
properties=None, comment=None, j_hive_catalog_view=None):
gateway = get_gateway()
if j_hive_catalog_view is None:
j_hive_catalog_view = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogView(
original_query, expanded_query, table_schema._j_table_schema, properties, comment)
super(HiveCatalogView, self).__init__(j_hive_catalog_view)
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def create_another_partitioned_table():
def create_view():
gateway = get_gateway()
table_schema = CatalogTestBase.create_table_schema()
j_view = gateway.jvm.GenericCatalogView(
j_view = gateway.jvm.CatalogViewImpl(
"select * from t1",
"select * from test-catalog.db1.t1",
table_schema._j_table_schema,
Expand All @@ -167,7 +167,7 @@ def create_view():
def create_another_view():
gateway = get_gateway()
table_schema = CatalogTestBase.create_another_table_schema()
j_view = gateway.jvm.GenericCatalogView(
j_view = gateway.jvm.CatalogViewImpl(
"select * from t2",
"select * from test-catalog.db2.t2",
table_schema._j_table_schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@
* limitations under the License.
*/

package org.apache.flink.table.catalog.hive;
package org.apache.flink.table.catalog;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalogView;
import org.apache.flink.table.catalog.CatalogBaseTable;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* A Hive catalog view implementation.
* An implementation of catalog view.
*/
public class HiveCatalogView extends AbstractCatalogView {

public HiveCatalogView(
public class CatalogViewImpl extends AbstractCatalogView {
public CatalogViewImpl(
String originalQuery,
String expandedQuery,
TableSchema tableSchema,
TableSchema schema,
Map<String, String> properties,
String comment) {
super(originalQuery, expandedQuery, tableSchema, properties, comment);
super(originalQuery, expandedQuery, schema, properties, comment);
}

@Override
public CatalogBaseTable copy() {
return new HiveCatalogView(
this.getOriginalQuery(), this.getExpandedQuery(), this.getSchema().copy(), new HashMap<>(this.getProperties()), getComment());
return new CatalogViewImpl(
getOriginalQuery(),
getExpandedQuery(),
getSchema().copy(),
new HashMap<>(getProperties()),
getComment()
);
}

@Override
Expand All @@ -53,8 +55,6 @@ public Optional<String> getDescription() {

@Override
public Optional<String> getDetailedDescription() {
// TODO: return a detailed description
return Optional.ofNullable(getComment());
return Optional.of("This is a catalog view implementation");
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ public CatalogTable createAnotherPartitionedTable() {
TEST_COMMENT);
}

@Override
public CatalogView createView() {
return new CatalogViewImpl(
String.format("select * from %s", t1),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
createTableSchema(),
getBatchTableProperties(),
"This is a view");
}

@Override
public CatalogView createAnotherView() {
return new CatalogViewImpl(
String.format("select * from %s", t2),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
createAnotherTableSchema(),
getBatchTableProperties(),
"This is another view");
}

protected Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {{
put(IS_STREAMING, "false");
Expand Down
Loading

0 comments on commit 1ab79f6

Please sign in to comment.