Skip to content

Commit

Permalink
[FLINK-28687] BucketSelector is wrong when hashcode is negative
Browse files Browse the repository at this point in the history
This closes apache#239
  • Loading branch information
JingsongLi committed Jul 26, 2022
1 parent ac950a2 commit ff697ba
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void testAppendFilterBucket() throws Exception {

private void innerTest() throws Exception {
sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
assertThat(sql("SELECT * FROM T WHERE a = 1")).containsExactlyInAnyOrder(Row.of(1, 2));
assertThat(sql("SELECT * FROM T WHERE a = 3")).containsExactlyInAnyOrder(Row.of(3, 4));
assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
assertThat(sql("SELECT * FROM T WHERE a = 7")).containsExactlyInAnyOrder(Row.of(7, 8));
assertThat(sql("SELECT * FROM T WHERE a = 9")).containsExactlyInAnyOrder(Row.of(9, 10));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -58,23 +60,23 @@ public BucketSelector(int[] hashCodes) {
}

public boolean select(int bucket, int numBucket) {
return buckets.computeIfAbsent(
numBucket,
k -> {
ImmutableSet.Builder<Integer> builder = new ImmutableSet.Builder<>();
for (int hash : hashCodes) {
builder.add(hash % numBucket);
}
return builder.build();
})
.contains(bucket);
return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket)).contains(bucket);
}

@VisibleForTesting
int[] hashCodes() {
return hashCodes;
}

@VisibleForTesting
Set<Integer> createBucketSet(int numBucket) {
ImmutableSet.Builder<Integer> builder = new ImmutableSet.Builder<>();
for (int hash : hashCodes) {
builder.add(SinkRecordConverter.bucket(hash, numBucket));
}
return builder.build();
}

public static Optional<BucketSelector> create(
Predicate bucketPredicate, RowType bucketKeyType) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -137,7 +139,8 @@ public static Optional<BucketSelector> create(
}

private static int hash(List<Object> columns, RowDataSerializer serializer) {
return serializer.toBinaryRow(GenericRowData.of(columns.toArray())).hashCode();
BinaryRowData binaryRow = serializer.toBinaryRow(GenericRowData.of(columns.toArray()));
return SinkRecordConverter.hashcode(binaryRow);
}

private static void assembleRows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,28 @@ private BinaryRowData logPrimaryKey(RowData row) {
}

private int bucket(RowData row, BinaryRowData bucketKey) {
int hash = bucketKey.getArity() == 0 ? hashRow(row) : bucketKey.hashCode();
return Math.abs(hash % numBucket);
int hash = bucketKey.getArity() == 0 ? hashRow(row) : hashcode(bucketKey);
return bucket(hash, numBucket);
}

private int hashRow(RowData row) {
if (row instanceof BinaryRowData) {
RowKind rowKind = row.getRowKind();
row.setRowKind(RowKind.INSERT);
int hash = row.hashCode();
int hash = hashcode((BinaryRowData) row);
row.setRowKind(rowKind);
return hash;
} else {
return allProjection.apply(row).hashCode();
return hashcode(allProjection.apply(row));
}
}

public static int hashcode(BinaryRowData rowData) {
assert rowData.getRowKind() == RowKind.INSERT;
return rowData.hashCode();
}

public static int bucket(int hashcode, int numBucket) {
return Math.abs(hashcode % numBucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void testNormal() {
newSelector(and(builder.equal(0, 0), builder.equal(1, 1), builder.equal(2, 2)))
.get();
assertThat(selector.hashCodes()).containsExactly(1141287431);
assertThat(selector.createBucketSet(20)).containsExactly(11);
}

@Test
Expand All @@ -81,6 +82,7 @@ public void testIn() {
builder.equal(2, 2)))
.get();
assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890);
assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
}

@Test
Expand All @@ -96,6 +98,7 @@ public void testOr() {
builder.equal(2, 2)))
.get();
assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890);
assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
}

@Test
Expand All @@ -108,6 +111,7 @@ public void testInNull() {
builder.equal(2, 2)))
.get();
assertThat(selector.hashCodes()).containsExactly(-1272936927, 582056914, -1234868890);
assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
}

@Test
Expand All @@ -122,6 +126,7 @@ public void testMultipleIn() {
assertThat(selector.hashCodes())
.containsExactly(
-1272936927, -1567268077, 582056914, 2124429589, -1234868890, 1063796399);
assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19);
}

@Test
Expand All @@ -139,6 +144,7 @@ public void testMultipleOr() {
assertThat(selector.hashCodes())
.containsExactly(
-1272936927, -1567268077, 582056914, 2124429589, -1234868890, 1063796399);
assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19);
}

private Optional<BucketSelector> newSelector(Predicate predicate) {
Expand Down

0 comments on commit ff697ba

Please sign in to comment.