Skip to content

Commit

Permalink
[Issue 8311][pulsar-client-go] Fix memory leak in cgo golang client (a…
Browse files Browse the repository at this point in the history
…pache#8325)

* [Issue 8311] Fixes memory leak in cgo golang client

In the C/C++ glue code, file c_Producer.cc function handle_producer_send(),
a new pulsar_message_id_t is created. This needs to be freed at some point.

Because of issues with legacy C clients, it is not possible to call delete
from inside c_Producer.cc. Instead, the client must call pulsar_message_id_free().

Previously, the cgo interface was not freeing this message ID, which caused a
memory leak on every message sent.

* The explicit free inside pulsarProducerSendCallbackProxyWithMsgID() is not required, because the getMessageId() call attaches a finalizer which will take care of things.
  • Loading branch information
bschofield authored Oct 29, 2020
1 parent 97f2e5b commit c5705f2
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/c_go_pulsar.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static inline void _pulsar_producer_close_async(pulsar_producer_t *producer, voi
pulsar_producer_close_async(producer, pulsarProducerCloseCallbackProxy, ctx);
}

void pulsarProducerSendCallbackProxy(pulsar_result result, pulsar_message_id_t *message, void *ctx);
void pulsarProducerSendCallbackProxy(pulsar_result result, pulsar_message_id_t *messageId, void *ctx);

void pulsarProducerSendCallbackProxyWithMsgID(pulsar_result result, pulsar_message_id_t *messageId, void *ctx);

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/c_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (p *producer) SendAndGetMsgID(ctx context.Context, msg ProducerMessage) (Me
}
}


type sendCallback struct {
message ProducerMessage
callback func(ProducerMessage, error)
Expand All @@ -287,6 +286,7 @@ func pulsarProducerSendCallbackProxy(res C.pulsar_result, messageId *C.pulsar_me
sendCallback.callback(sendCallback.message, newError(res, "Failed to send message"))
} else {
sendCallback.callback(sendCallback.message, nil)
C.pulsar_message_id_free(messageId)
}
}

Expand Down

0 comments on commit c5705f2

Please sign in to comment.