Skip to content

Commit

Permalink
[Java Client] Fix flaky BatchMessageTest by initializing lastBatchSen…
Browse files Browse the repository at this point in the history
…dNanoTime (apache#15406)

### Motivation

`BatchMessageTest` is very flaky, some tests failed at checking the
backlog of a topic. This flakiness, maybe the bug, was introduced from
apache#14185. `batchFlushTask` calculates
the time duration between now and `lastBatchSendNanoTime`. However,
for the first time `batchFlushTask` is called, `lastBatchSendNanoTime`
is not initialized, so the time diff duration satified so that
`batchMessageAndSend` will always be called for the first time.

For example, if all messages can be split into two batches for a given
batch size limit, there is a chance that when first few messages are in
the batch, `batchFlushTask` is called at the same time and these
messages could be sent first.

Here are some extra logs added by myself:

```
2022-05-01T12:35:23,545+0800 [main] INFO  org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer - XYZ add 0 | 1 (0) 12 (600) | false
2022-05-01T12:35:23,545+0800 [pulsar-client-io-35-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - XYZ batchFlushTask 52779290583 us (5000000)
2022-05-01T12:35:23,545+0800 [pulsar-client-io-35-1] INFO  org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer - XYZ before create: [0]
2022-05-01T12:35:23,546+0800 [pulsar-client-io-35-1] INFO  org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer - XYZ create 1 Ops: 1 (0) 12 (600)
```

We can see `batchFlushTask` was called while the batch contains only 1
message. The time duration is nearly 15 hours, which is appearly wrong.

### Modifications

Initialize `lastBatchSendNanoTime` with the current time point
immediately after the batch container is constructed.
  • Loading branch information
BewareMyPower authored May 1, 2022
1 parent bf15e83 commit 72bbb97
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
}
this.batchMessageContainer = (BatchMessageContainerBase) containerBuilder.build();
this.batchMessageContainer.setProducer(this);
this.lastBatchSendNanoTime = System.nanoTime();
} else {
this.batchMessageContainer = null;
}
Expand Down Expand Up @@ -2303,4 +2304,4 @@ CompletableFuture<Void> getOriginalLastSendFuture() {
}

private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
}
}

0 comments on commit 72bbb97

Please sign in to comment.