Skip to content

Commit

Permalink
Flink: Upgrade to flink 1.13.2 (apache#3116)
Browse files Browse the repository at this point in the history
  • Loading branch information
Flyangz authored Sep 26, 2021
1 parent 0fda1c8 commit bae5ce9
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
14 changes: 14 additions & 0 deletions api/src/test/java/org/apache/iceberg/AssertHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,18 @@ public static void assertEmptyAvroField(GenericRecord record, String field) {
AvroRuntimeException.class,
() -> record.get(field));
}

/**
* Same as {@link AssertHelpers#assertThrowsCause}, but this method compares root cause.
*/
public static void assertThrowsRootCause(String message,
Class<? extends Exception> expected,
String containedInMessage,
Runnable runnable) {
Assertions.assertThatThrownBy(runnable::run)
.as(message)
.getRootCause()
.isInstanceOf(expected)
.hasMessageContaining(containedInMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -70,6 +69,9 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String BASE_NAMESPACE = "base-namespace";
public static final String CACHE_ENABLED = "cache-enabled";

public static final String TYPE = "type";
public static final String PROPERTY_VERSION = "property-version";

/**
* Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
*
Expand Down Expand Up @@ -104,8 +106,8 @@ static CatalogLoader createCatalogLoader(String name, Map<String, String> proper
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = Maps.newHashMap();
context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
context.put(TYPE, "iceberg");
context.put(PROPERTY_VERSION, "1");
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.iceberg.flink.FlinkDynamicTableFactory
org.apache.iceberg.flink.FlinkDynamicTableFactory
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws Excep
Assert.assertEquals("should create table using format v2",
2, ops.refresh().formatVersion());

AssertHelpers.assertThrowsCause("should fail to downgrade to v1",
AssertHelpers.assertThrowsRootCause("should fail to downgrade to v1",
IllegalArgumentException.class,
"Cannot downgrade v2 table to v1",
() -> sql("ALTER TABLE tl SET('format-version'='1')"));
Expand Down
2 changes: 1 addition & 1 deletion versions.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.10.1
org.apache.calcite:* = 1.10.0
org.apache.flink:* = 1.12.1
org.apache.flink:* = 1.13.2
org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-metastore = 2.3.8
org.apache.hive:hive-serde = 2.3.8
Expand Down

0 comments on commit bae5ce9

Please sign in to comment.