From ff697ba12a80f51702a5412c2deb6d4f89a966ff Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 26 Jul 2022 16:43:08 +0800 Subject: [PATCH] [FLINK-28687] BucketSelector is wrong when hashcode is negative This closes #239 --- .../store/connector/PredicateITCase.java | 4 +++ .../store/file/predicate/BucketSelector.java | 25 +++++++++++-------- .../store/table/sink/SinkRecordConverter.java | 17 ++++++++++--- .../file/predicate/BucketSelectorTest.java | 6 +++++ 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java index ab122791d8a1..6eeccf7ec64a 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java @@ -48,6 +48,10 @@ public void testAppendFilterBucket() throws Exception { private void innerTest() throws Exception { sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)"); + assertThat(sql("SELECT * FROM T WHERE a = 1")).containsExactlyInAnyOrder(Row.of(1, 2)); + assertThat(sql("SELECT * FROM T WHERE a = 3")).containsExactlyInAnyOrder(Row.of(3, 4)); assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6)); + assertThat(sql("SELECT * FROM T WHERE a = 7")).containsExactlyInAnyOrder(Row.of(7, 8)); + assertThat(sql("SELECT * FROM T WHERE a = 9")).containsExactlyInAnyOrder(Row.of(9, 10)); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java index 43753a26c277..c451145bcfc2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java @@ -20,7 +20,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; @@ -58,16 +60,7 @@ public BucketSelector(int[] hashCodes) { } public boolean select(int bucket, int numBucket) { - return buckets.computeIfAbsent( - numBucket, - k -> { - ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); - for (int hash : hashCodes) { - builder.add(hash % numBucket); - } - return builder.build(); - }) - .contains(bucket); + return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket)).contains(bucket); } @VisibleForTesting @@ -75,6 +68,15 @@ int[] hashCodes() { return hashCodes; } + @VisibleForTesting + Set createBucketSet(int numBucket) { + ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); + for (int hash : hashCodes) { + builder.add(SinkRecordConverter.bucket(hash, numBucket)); + } + return builder.build(); + } + public static Optional create( Predicate bucketPredicate, RowType bucketKeyType) { @SuppressWarnings("unchecked") @@ -137,7 +139,8 @@ public static Optional create( } private static int hash(List columns, RowDataSerializer serializer) { - return serializer.toBinaryRow(GenericRowData.of(columns.toArray())).hashCode(); + BinaryRowData binaryRow = serializer.toBinaryRow(GenericRowData.of(columns.toArray())); + return SinkRecordConverter.hashcode(binaryRow); } private static void assembleRows( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java index 394964ad31b2..6d61d6d42328 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java @@ -115,19 +115,28 @@ private BinaryRowData logPrimaryKey(RowData row) { } private int bucket(RowData row, BinaryRowData bucketKey) { - int hash = bucketKey.getArity() == 0 ? hashRow(row) : bucketKey.hashCode(); - return Math.abs(hash % numBucket); + int hash = bucketKey.getArity() == 0 ? hashRow(row) : hashcode(bucketKey); + return bucket(hash, numBucket); } private int hashRow(RowData row) { if (row instanceof BinaryRowData) { RowKind rowKind = row.getRowKind(); row.setRowKind(RowKind.INSERT); - int hash = row.hashCode(); + int hash = hashcode((BinaryRowData) row); row.setRowKind(rowKind); return hash; } else { - return allProjection.apply(row).hashCode(); + return hashcode(allProjection.apply(row)); } } + + public static int hashcode(BinaryRowData rowData) { + assert rowData.getRowKind() == RowKind.INSERT; + return rowData.hashCode(); + } + + public static int bucket(int hashcode, int numBucket) { + return Math.abs(hashcode % numBucket); + } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java index aea241b54ef2..0b1afcc27b28 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java @@ -69,6 +69,7 @@ public void testNormal() { newSelector(and(builder.equal(0, 0), builder.equal(1, 1), builder.equal(2, 2))) .get(); assertThat(selector.hashCodes()).containsExactly(1141287431); + assertThat(selector.createBucketSet(20)).containsExactly(11); } @Test @@ -81,6 +82,7 @@ public void testIn() { builder.equal(2, 2))) .get(); assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890); + assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); } @Test @@ -96,6 +98,7 @@ public void testOr() { builder.equal(2, 2))) .get(); assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890); + assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); } @Test @@ -108,6 +111,7 @@ public void testInNull() { builder.equal(2, 2))) .get(); assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890); + assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); } @Test @@ -122,6 +126,7 @@ public void testMultipleIn() { assertThat(selector.hashCodes()) .containsExactly( -1272936927, -1567268077, 582056914, 2124429589, -1234868890, 1063796399); + assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19); } @Test @@ -139,6 +144,7 @@ public void testMultipleOr() { assertThat(selector.hashCodes()) .containsExactly( -1272936927, -1567268077, 582056914, 2124429589, -1234868890, 1063796399); + assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19); } private Optional newSelector(Predicate predicate) {