Skip to content

Commit

Permalink
memory leak fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kingster committed May 10, 2019
1 parent 652b10f commit f2b22d2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
6 changes: 6 additions & 0 deletions distribution/make-deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ set -ex

BUILD_ROOT=$(mktemp -d)
VERSION=$(date +%s)

pushd /fs/mod_event_kafka
make
make install
popd

cp -r debian/* $BUILD_ROOT/

mkdir -p $BUILD_ROOT/usr/local/lib/
Expand Down
45 changes: 23 additions & 22 deletions mod_event_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
* Contributor(s):
*
* Kinshuk Bairagi <[email protected]>
* Anthony Minessale II <[email protected]>
* Neal Horman <neal at wanlink dot com>
*
*
* mod_event_kafka.c -- Sends FreeSWITCH events to an Kafka broker
*
Expand Down Expand Up @@ -98,15 +95,18 @@ namespace mod_event_kafka {

producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s", errstr);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s \n", errstr);
}

std::string topic_str = std::string(globals.topic_prefix) + "_" + std::string(switch_core_get_switchname());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s", topic_str.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s \n", topic_str.c_str());

rd_kafka_topic_conf_t *tconf = rd_kafka_topic_conf_new();
rd_kafka_topic_conf_set(tconf, "message.timeout.ms", "30000", NULL, 0);

topic = rd_kafka_topic_new(producer, topic_str.c_str(), NULL);
if (!topic) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic_str.c_str(), rd_kafka_err2str(rd_kafka_last_error()));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s \n", topic_str.c_str(), rd_kafka_err2str(rd_kafka_last_error()));
}

_initialized = 1;
Expand All @@ -121,15 +121,16 @@ namespace mod_event_kafka {
if(_initialized){
int resp = send(event_json, uuid ,0);
if (resp == -1){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s", rd_kafka_err2str(rd_kafka_last_error()));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s \n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
//size_t len = strlen(event_json);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"Produced message (%zu bytes)", len);
}
rd_kafka_poll(producer, 0);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "PublishEvent without active KafkaPublisher");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, event_json);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "PublishEvent without active KafkaPublisher\n %s \n",event_json);
delete uuid;
delete event_json;
}
}

Expand All @@ -139,7 +140,7 @@ namespace mod_event_kafka {
}

~KafkaEventPublisher(){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Destroyed");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Destroyed\n");
rd_kafka_topic_destroy(topic);
rd_kafka_destroy(producer);
}
Expand All @@ -151,29 +152,28 @@ namespace mod_event_kafka {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, " Message delivery failed %s \n",rd_kafka_err2str(rkmessage->err));
}
else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",rkmessage->len, rkmessage->partition, rkmessage->offset);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",rkmessage->len, rkmessage->partition, rkmessage->offset);
// rd_kafka_message_destroy ((rd_kafka_message_t *)rkmessage);
}
}

int send(char *data, char *key, int currentCount = 0){
int send(char *data, char *key, int currentCount){
if(++currentCount <= max_retry_limit){
int key_length = key == NULL ? 0 : strlen(key);
int result = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY /* Copy payload */,
RD_KAFKA_MSG_F_FREE /* Auto Clear Payload */,
(void *)data, strlen(data),
/* Optional key and its length */
key, key_length,
(const void *)key, key_length,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL);

auto last_error = rd_kafka_last_error();
if (result != -1){
if(result == 0){
return result;
} else if(last_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {

switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"queue.buffering.max.messages limit reached, waiting 1sec to flush out.");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"queue.buffering.max.messages limit reached, waiting 1sec to flush out.\n");
std::thread([this, data, currentCount, key]() {
//localqueue is full, hold and flush them.
rd_kafka_poll(producer, 1000/*block for max 1000ms*/);
Expand All @@ -186,8 +186,10 @@ namespace mod_event_kafka {
return result;
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "KafkaEventPublisher send max_retry_limit hit");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, data);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "KafkaEventPublisher send max_retry_limit hit.\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s\n",data);
// delete data; //TODO: Doesn't work, throws segment fault.
// delete key;
}
return 0;
}
Expand Down Expand Up @@ -287,8 +289,7 @@ namespace mod_event_kafka {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down Kafka Event module: %s\n",
ex.what());
} catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
"Unknown error shutting down Kafka Event module\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down Kafka Event module\n");
}
return SWITCH_STATUS_SUCCESS;
}
Expand Down

0 comments on commit f2b22d2

Please sign in to comment.