Skip to content

Commit

Permalink
[fix][broker] Fix direct memory leak in RawReaderImpl (apache#18928)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Dec 20, 2022
1 parent 1154d0a commit 46cacff
Showing 1 changed file with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,32 @@ void tryCompletePending() {
}
}

@Override
protected CompletableFuture<Void> failPendingReceive() {
if (internalPinnedExecutor.isShutdown()) {
failPendingRawReceives();
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
try {
failPendingRawReceives();
} finally {
future.complete(null);
}
});
return future;
}
}

private void failPendingRawReceives() {
List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
while (!pendingRawReceives.isEmpty()) {
toError.add(pendingRawReceives.remove());
}
toError.forEach((f) -> f.cancel(false));
}

CompletableFuture<RawMessage> receiveRawAsync() {
CompletableFuture<RawMessage> result = new CompletableFuture<>();
pendingRawReceives.add(result);
Expand All @@ -174,19 +200,22 @@ CompletableFuture<RawMessage> receiveRawAsync() {
}

private void reset() {
List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
synchronized (this) {
while (!pendingRawReceives.isEmpty()) {
toError.add(pendingRawReceives.remove());
}
RawMessageAndCnx m = incomingRawMessages.poll();
while (m != null) {
m.msg.close();
m = incomingRawMessages.poll();
}
incomingRawMessages.clear();
failPendingRawReceives();
clearIncomingRawMessages();
}

private void clearIncomingRawMessages() {
RawMessageAndCnx m = incomingRawMessages.poll();
while (m != null) {
m.msg.close();
m = incomingRawMessages.poll();
}
toError.forEach((f) -> f.cancel(false));
}

@Override
protected void clearIncomingMessages() {
super.clearIncomingMessages();
clearIncomingRawMessages();
}

@Override
Expand All @@ -203,12 +232,17 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = super.closeAsync();
reset();
return super.closeAsync();
return closeFuture;
}

@Override
void messageReceived(CommandMessage commandMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
State state = getState();
if (state == State.Closing || state == State.Closed) {
return;
}
MessageIdData messageId = commandMessage.getMessageId();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
Expand Down

0 comments on commit 46cacff

Please sign in to comment.