Skip to content

Commit

Permalink
Fix NPE when send a large message and don't release batchedMessageMet…
Browse files Browse the repository at this point in the history
…adataAndPayload when discard in batch message container. (apache#5748)

Fixes apache#5746 apache#5747

### Motivation

Fix NPE and release an already released ByteBuf when publish an oversize message.

Here is error log:
```
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer$KeyedBatch.discard(BatchMessageKeyBasedContainer.java:244) [classes/:?]
	at org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsg(BatchMessageKeyBasedContainer.java:125) [classes/:?]
	at org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsgs(BatchMessageKeyBasedContainer.java:145) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1426) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112) [classes/:?]
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63) [classes/:?]
	at org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875) [test-classes/:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_201]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_201]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_201]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989) [testng-6.14.3.jar:?]
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) [testng-6.14.3.jar:?]
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) [testng-6.14.3.jar:?]
	at org.testng.TestRunner.privateRun(TestRunner.java:648) [testng-6.14.3.jar:?]
	at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.run(SuiteRunner.java:364) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
	at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
	at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73) [testng-plugin.jar:?]
	at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) [testng-plugin.jar:?]



16:19:13.850 [main:org.apache.pulsar.client.impl.ProducerImpl@1439] WARN  org.apache.pulsar.client.impl.ProducerImpl - [persistent://prop/ns-abc/testSendOverSizeMessage-623833fc-d9f7-4b28-aead-27955928fae9] [test-0-0] error while create opSendMsg by batch message container
java.lang.NullPointerException: null
	at org.apache.pulsar.client.impl.ProducerImpl.releaseSemaphoreForSendOp(ProducerImpl.java:858) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.processOpSendMsg(ProducerImpl.java:1477) ~[classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1432) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112) [classes/:?]
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) [classes/:?]
	at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63) [classes/:?]
	at org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875) [test-classes/:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_201]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_201]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_201]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719) [testng-6.14.3.jar:?]
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989) [testng-6.14.3.jar:?]
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) [testng-6.14.3.jar:?]
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) [testng-6.14.3.jar:?]
	at org.testng.TestRunner.privateRun(TestRunner.java:648) [testng-6.14.3.jar:?]
	at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunner.run(SuiteRunner.java:364) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) [testng-6.14.3.jar:?]
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137) [testng-6.14.3.jar:?]
	at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
	at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
	at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73) [testng-plugin.jar:?]
	at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) [testng-plugin.jar:?]
```

### Modifications

Add check for processOpSendMsg, if the op is null, just return.
Don't release the batchedMessageMetadataAndPayload since it is already released in getCompressedBatchMetadataAndPayload() method.
  • Loading branch information
codelipenghui authored and sijie committed Nov 28, 2019
1 parent 2e30c08 commit ecf7792
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void discard(Exception ex) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
lowestSequenceId, t);
}
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
clear();
}

Expand All @@ -174,6 +173,11 @@ public boolean isMultiBatches() {
@Override
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setHighestSequenceId(highestSequenceId);
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
Expand All @@ -182,16 +186,6 @@ public OpSendMsg createOpSendMsg() throws IOException {
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);

if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
cmd.release();
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
if (op != null) {
op.recycle();
}
return null;
}

op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
lowestSequenceId = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public boolean isMultiBatches() {

private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
keyedBatch.discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
return null;
}

final int numMessagesInBatch = keyedBatch.messages.size();
long currentBatchSizeBytes = 0;
for (MessageImpl<?> message : keyedBatch.messages) {
Expand All @@ -120,15 +126,6 @@ private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOE

ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);

if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
cmd.release();
keyedBatch.discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
if (op != null) {
op.recycle();
}
return null;
}
op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
return op;
Expand Down Expand Up @@ -241,7 +238,6 @@ public void discard(Exception ex) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
sequenceId, t);
}
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,9 @@ private void batchMessageAndSend() {
}

private void processOpSendMsg(OpSendMsg op) {
if (op == null) {
return;
}
try {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
Expand Down

0 comments on commit ecf7792

Please sign in to comment.