Skip to content

Commit

Permalink
[pulsar-io] Refine the key in redis sink when key is null (apache#11192)
Browse files Browse the repository at this point in the history
### Motivation

When the record has a null key, someone maybe confused by a NPE warn as below:

```
11:26:22.730 [pool-5-thread-1] WARN  org.apache.pulsar.io.redis.sink.RedisSink - Record flush thread was exception
java.lang.NullPointerException: null
        at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[?:1.8.0_291]
        at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) ~[?:1.8.0_291]
        at org.apache.pulsar.io.redis.sink.RedisSink.flush(RedisSink.java:127) ~[pulsar-io-redis-2.8.0.nar-unpacked/:?]
        at org.apache.pulsar.io.redis.sink.RedisSink.lambda$write$1(RedisSink.java:94) ~[pulsar-io-redis-2.8.0.nar-unpacked/:?]
```

We can use an empty string as key when its key is null as other sinks do.

### Modifications

- Use an empty string as key when its key is null.
  • Loading branch information
murong00 authored Jul 14, 2021
1 parent 481feea commit 27797d9
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ private void flush() {
if (CollectionUtils.isNotEmpty(recordsToFlush)) {
for (Record<byte[]> record: recordsToFlush) {
try {
// records with null keys or values will be ignored
byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
// use an empty string as key when the key is null
String recordKey = record.getKey().isPresent() ? record.getKey().get() : "";
byte[] key = recordKey.getBytes(StandardCharsets.UTF_8);
byte[] value = record.getValue();
recordsToSet.put(key, value);
} catch (Exception e) {
Expand Down

0 comments on commit 27797d9

Please sign in to comment.