Skip to content

Commit

Permalink
Flink: Make flink 1.16 work
Browse files Browse the repository at this point in the history
  • Loading branch information
hililiwei authored and stevenzwu committed Nov 3, 2022
1 parent f318235 commit 9e76a84
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
flink: ['1.14', '1.15']
flink: ['1.14', '1.15', '1.16']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/java-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
with:
distribution: zulu
java-version: 8
- run: ./gradlew -DflinkVersions=1.14,1.15 -DsparkVersions=2.4,3.0,3.1,3.2,3.3 -DhiveVersions=2,3 build -x test -x javadoc -x integrationTest
- run: ./gradlew -DflinkVersions=1.14,1.15,1.16 -DsparkVersions=2.4,3.0,3.1,3.2,3.3 -DhiveVersions=2,3 build -x test -x javadoc -x integrationTest

build-javadoc:
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ jobs:
java-version: 8
- run: |
./gradlew printVersion
./gradlew -DflinkVersions=1.14,1.15 -DsparkVersions=2.4,3.0,3.1,3.2,3.3 -DhiveVersions=2,3 publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions=1.14,1.15,1.16 -DsparkVersions=2.4,3.0,3.1,3.2,3.3 -DhiveVersions=2,3 publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.2,3.3 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
2 changes: 1 addition & 1 deletion dev/stage-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#

SCALA_VERSION=2.12
FLINK_VERSIONS=1.14,1.15
FLINK_VERSIONS=1.14,1.15,1.16
SPARK_VERSIONS=2.4,3.0,3.1,3.2,3.3
HIVE_VERSIONS=2,3

Expand Down
4 changes: 4 additions & 0 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ if (flinkVersions.contains("1.14")) {
if (flinkVersions.contains("1.15")) {
apply from: file("$projectDir/v1.15/build.gradle")
}

if (flinkVersions.contains("1.16")) {
apply from: file("$projectDir/v1.16/build.gradle")
}
4 changes: 2 additions & 2 deletions flink/v1.16/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* under the License.
*/

String flinkVersion = '1.15.0'
String flinkMajorVersion = '1.15'
String flinkVersion = '1.16.0'
String flinkMajorVersion = '1.16'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
Expand Down Expand Up @@ -69,16 +73,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
List<String> equalityColumns =
tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of);

return (DataStreamSinkProvider)
(providerContext, dataStream) ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,53 +602,8 @@ public void testFilterPushDown2Literal() {
"Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
}

/**
* NaN is not supported by flink now, so we add the test case to assert the parse error, when we
* upgrade the flink that supports NaN, we will delele the method, and add some test case to test
* NaN.
*/
@Test
public void testSqlParseError() {
String sqlParseErrorEqual =
String.format("SELECT * FROM %s WHERE d = CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorEqual));

String sqlParseErrorNotEqual =
String.format("SELECT * FROM %s WHERE d <> CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorNotEqual));

String sqlParseErrorGT =
String.format("SELECT * FROM %s WHERE d > CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorGT));

String sqlParseErrorLT =
String.format("SELECT * FROM %s WHERE d < CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorLT));

String sqlParseErrorGTE =
String.format("SELECT * FROM %s WHERE d >= CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorGTE));

String sqlParseErrorLTE =
String.format("SELECT * FROM %s WHERE d <= CAST('NaN' AS DOUBLE) ", TABLE_NAME);
AssertHelpers.assertThrows(
"The NaN is not supported by flink now. ",
NumberFormatException.class,
() -> sql(sqlParseErrorLTE));
@Test
public void testSqlParseNaN() {
// todo add some test case to test NaN
}
}
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhIncludeRegex=.*
systemProp.defaultFlinkVersions=1.15
systemProp.knownFlinkVersions=1.14,1.15
systemProp.defaultFlinkVersions=1.16
systemProp.knownFlinkVersions=1.14,1.15,1.16
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.3
Expand Down
9 changes: 9 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ if (flinkVersions.contains("1.15")) {
project(":iceberg-flink:flink-runtime-1.15").name = "iceberg-flink-runtime-1.15"
}

if (flinkVersions.contains("1.16")) {
include ":iceberg-flink:flink-1.16"
include ":iceberg-flink:flink-runtime-1.16"
project(":iceberg-flink:flink-1.16").projectDir = file('flink/v1.16/flink')
project(":iceberg-flink:flink-1.16").name = "iceberg-flink-1.16"
project(":iceberg-flink:flink-runtime-1.16").projectDir = file('flink/v1.16/flink-runtime')
project(":iceberg-flink:flink-runtime-1.16").name = "iceberg-flink-runtime-1.16"
}

if (sparkVersions.contains("3.0")) {
include ':iceberg-spark:spark-3.0_2.12'
include ':iceberg-spark:spark-extensions-3.0_2.12'
Expand Down

0 comments on commit 9e76a84

Please sign in to comment.