Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[fix][client]Fix client memory limit currentUsage leak and semaphore …
…release duplicated in ProducerImpl (apache#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed`
- Loading branch information