Skip to content

Commit

Permalink
Flink : Implement the listPartitions method in FlinkCatalog (apache#1815
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zhangjun0x01 authored Nov 26, 2020
1 parent 7383b9d commit 7a306e5
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 5 deletions.
39 changes: 36 additions & 3 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
Expand All @@ -65,6 +69,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -88,6 +93,7 @@ public class FlinkCatalog extends AbstractCatalog {
private final String[] baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final boolean cacheEnabled;

// TODO - Update baseNamespace to use Namespace class
// https://github.com/apache/iceberg/issues/1541
Expand All @@ -100,6 +106,7 @@ public FlinkCatalog(
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Catalog originalCatalog = catalogLoader.loadCatalog();
icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
Expand Down Expand Up @@ -303,7 +310,12 @@ public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException

Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
try {
return icebergCatalog.loadTable(toIdentifier(tablePath));
Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
if (cacheEnabled) {
table.refresh();
}

return table;
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
Expand Down Expand Up @@ -618,8 +630,29 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio

@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws CatalogException {
throw new UnsupportedOperationException();
throws TableNotExistException, TableNotPartitionedException, CatalogException {
Table table = loadIcebergTable(tablePath);

if (table.spec().isUnpartitioned()) {
throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
}

Set<CatalogPartitionSpec> set = Sets.newHashSet();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
Map<String, String> map = Maps.newHashMap();
StructLike structLike = dataFile.partition();
PartitionSpec spec = table.specs().get(dataFile.specId());
for (int i = 0; i < structLike.size(); i++) {
map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
}
set.add(new CatalogPartitionSpec(map));
}
} catch (IOException e) {
throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
}

return Lists.newArrayList(set);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String HIVE_CONF_DIR = "hive-conf-dir";
public static final String DEFAULT_DATABASE = "default-database";
public static final String BASE_NAMESPACE = "base-namespace";
public static final String CACHE_ENABLED = "cache-enabled";

/**
* Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
Expand Down Expand Up @@ -117,6 +118,7 @@ public List<String> supportedProperties() {
properties.add(CatalogProperties.WAREHOUSE_LOCATION);
properties.add(CatalogProperties.HIVE_URI);
properties.add(CatalogProperties.HIVE_CLIENT_POOL_SIZE);
properties.add(CACHE_ENABLED);
return properties;
}

Expand All @@ -131,7 +133,7 @@ protected Catalog createCatalog(String name, Map<String, String> properties, Con
String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new String[0]) :
new String[0];
boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true"));
boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static Iterable<Object[]> parameters() {
protected final String[] baseNamespace;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
private final Map<String, String> config = Maps.newHashMap();
protected final Map<String, String> config = Maps.newHashMap();

protected final String flinkDatabase;
protected final Namespace icebergNamespace;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;

public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {

private String tableName = "test_table";

private final FileFormat format;

@Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}")
public static Iterable<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
for (Boolean cacheEnabled : new Boolean[] {true, false}) {
for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
String[] baseNamespace = (String[]) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format, cacheEnabled});
}
}
}
return parameters;
}

public TestFlinkCatalogTablePartitions(String catalogName, String[] baseNamespace, FileFormat format,
boolean cacheEnabled) {
super(catalogName, baseNamespace);
this.format = format;
config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
}

@Before
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
}

@After
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}

@Test
public void testListPartitionsWithUnpartitionedTable() {
sql("CREATE TABLE %s (id INT, data VARCHAR) with ('write.format.default'='%s')",
tableName, format.name());
sql("INSERT INTO %s SELECT 1,'a'", tableName);

ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
AssertHelpers.assertThrows("Should not list partitions for unpartitioned table.",
TableNotPartitionedException.class, () -> flinkCatalog.listPartitions(objectPath));
}

@Test
public void testListPartitionsWithPartitionedTable() throws TableNotExistException, TableNotPartitionedException {
sql("CREATE TABLE %s (id INT, data VARCHAR) PARTITIONED BY (data) " +
"with ('write.format.default'='%s')", tableName, format.name());
sql("INSERT INTO %s SELECT 1,'a'", tableName);
sql("INSERT INTO %s SELECT 2,'b'", tableName);

ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
Assert.assertEquals("Should have 2 partition", 2, list.size());

List<CatalogPartitionSpec> expected = Lists.newArrayList();
CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(ImmutableMap.of("data", "a"));
CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(ImmutableMap.of("data", "b"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected);
}
}

0 comments on commit 7a306e5

Please sign in to comment.