Skip to content

Commit

Permalink
[FLINK-19650][jdbc][table] Support limit push down for the JDBC conne…
Browse files Browse the repository at this point in the history
…ctor

This closes apache#13800
  • Loading branch information
fsk119 authored Dec 27, 2020
1 parent a25b9b8 commit a859e2c
Show file tree
Hide file tree
Showing 13 changed files with 912 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public String dialectName() {
return "Derby";
}

@Override
public String getLimitClause(long limit) {
return String.format("FETCH FIRST %d ROWS ONLY", limit);
}

@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public interface JdbcDialect extends Serializable {
*/
JdbcRowConverter getRowConverter(RowType rowType);

/**
* Get limit clause to limit the number of emitted row from the jdbc source.
* @param limit number of row to emit. The value of the parameter should be non-negative.
* @return the limit clause.
*/
String getLimitClause(long limit);

/**
* Check if this dialect instance support a specific data type in table schema.
* @param schema the table schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public JdbcRowConverter getRowConverter(RowType rowType) {
return new MySQLRowConverter(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.mysql.jdbc.Driver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public JdbcRowConverter getRowConverter(RowType rowType) {
return new PostgresRowConverter(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.postgresql.Driver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
Expand All @@ -42,13 +43,18 @@
* A {@link DynamicTableSource} for JDBC.
*/
@Internal
public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
public class JdbcDynamicTableSource implements
ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown {

private final JdbcOptions options;
private final JdbcReadOptions readOptions;
private final JdbcLookupOptions lookupOptions;
private TableSchema physicalSchema;
private final String dialectName;
private long limit = -1;

public JdbcDynamicTableSource(
JdbcOptions options,
Expand Down Expand Up @@ -97,7 +103,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}
final JdbcDialect dialect = options.getDialect();
String query = dialect.getSelectFromStatement(
options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
Expand All @@ -108,6 +114,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) +
" BETWEEN ? AND ?";
}
if (limit >= 0) {
query = String.format("%s %s", query, dialect.getLimitClause(limit));
}
builder.setQuery(query);
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
Expand Down Expand Up @@ -156,11 +165,17 @@ public boolean equals(Object o) {
Objects.equals(readOptions, that.readOptions) &&
Objects.equals(lookupOptions, that.lookupOptions) &&
Objects.equals(physicalSchema, that.physicalSchema) &&
Objects.equals(dialectName, that.dialectName);
Objects.equals(dialectName, that.dialectName) &&
Objects.equals(limit, that.limit);
}

@Override
public int hashCode() {
return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);
return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName, limit);
}

@Override
public void applyLimit(long limit) {
this.limit = limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.test.util.AbstractTestBase;
Expand All @@ -35,12 +36,15 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* ITCase for {@link JdbcDynamicTableSource}.
Expand All @@ -51,8 +55,18 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
public static final String DB_URL = "jdbc:derby:memory:test";
public static final String INPUT_TABLE = "jdbDynamicTableSource";

public static StreamExecutionEnvironment env;
public static TableEnvironment tEnv;

@Before
public void before() throws ClassNotFoundException, SQLException {
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, envSettings);

System.setProperty("derby.stream.error.field", JdbcTestBase.class.getCanonicalName() + ".DEV_NULL");
Class.forName(DRIVER_CLASS);

Expand Down Expand Up @@ -89,13 +103,6 @@ public void clearOutputTable() throws Exception {

@Test
public void testJdbcSource() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);

tEnv.executeSql(
"CREATE TABLE " + INPUT_TABLE + "(" +
"id BIGINT," +
Expand Down Expand Up @@ -127,13 +134,6 @@ public void testJdbcSource() throws Exception {

@Test
public void testProject() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);

tEnv.executeSql(
"CREATE TABLE " + INPUT_TABLE + "(" +
"id BIGINT," +
Expand Down Expand Up @@ -166,4 +166,39 @@ public void testProject() throws Exception {
.sorted().collect(Collectors.toList());
assertEquals(expected, result);
}

@Test
public void testLimit() throws Exception {
tEnv.executeSql(
"CREATE TABLE " + INPUT_TABLE + "(\n" +
"id BIGINT,\n" +
"timestamp6_col TIMESTAMP(6),\n" +
"timestamp9_col TIMESTAMP(9),\n" +
"time_col TIME,\n" +
"real_col FLOAT,\n" +
"double_col DOUBLE,\n" +
"decimal_col DECIMAL(10, 4)\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='" + DB_URL + "',\n" +
" 'table-name'='" + INPUT_TABLE + "',\n" +
" 'scan.partition.column'='id',\n" +
" 'scan.partition.num'='2',\n" +
" 'scan.partition.lower-bound'='1',\n" +
" 'scan.partition.upper-bound'='2'\n" +
")"
);

Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE + " LIMIT 1").collect();
List<String> result = CollectionUtil.iteratorToList(collected).stream()
.map(Row::toString)
.sorted()
.collect(Collectors.toList());

Set<String> expected = new HashSet<>();
expected.add("1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234");
expected.add("2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
assertEquals(1, result.size());
assertTrue("The actual output is not a subset of the expected set.", expected.containsAll(result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;

import org.junit.Before;
import org.junit.Test;

/**
Expand All @@ -31,8 +32,8 @@ public class JdbcTablePlanTest extends TableTestBase {

private final StreamTableTestUtil util = streamTestUtil(new TableConfig());

@Test
public void testProjectionPushDown() {
@Before
public void setup() {
util.tableEnv().executeSql(
"CREATE TABLE jdbc (" +
"id BIGINT," +
Expand All @@ -48,7 +49,15 @@ public void testProjectionPushDown() {
" 'table-name'='test_table'" +
")"
);
}

@Test
public void testProjectionPushDown() {
util.verifyExecPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc");
}

@Test
public void testLimitPushDown() {
util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testLimitPushDown">
<Resource name="sql">
<![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSort(fetch=[3])
+- LogicalProject(id=[$0], time_col=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Limit(offset=[0], fetch=[3])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[id, time_col], limit=[3]]], fields=[id, time_col])
]]>
</Resource>
</TestCase>
<TestCase name="testProjectionPushDown">
<Resource name="sql">
<![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ object FlinkStreamRuleSets {
PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
PushLimitIntoTableSourceScanRule.INSTANCE,

// reorder the projecct and watermark assigner
ProjectWatermarkAssignerTransposeRule.INSTANCE,
Expand Down
Loading

0 comments on commit a859e2c

Please sign in to comment.