Skip to content

Commit

Permalink
Merge branch 'c'
Browse files Browse the repository at this point in the history
  • Loading branch information
kingster committed Mar 8, 2018
2 parents 0866420 + 0126530 commit 0501d4f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 49 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ cmake-build-debug/


#VSCode
.vscode
.vscode


#dist
*.deb
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Customise these as appropriate
MODNAME = mod_event_kafka.so
MODOBJ = mod_event_kafka.o #file.o file2.o file3.o
MODOBJ = mod_event_kafka.o
MODCFLAGS = -Wall -Werror
MODLDFLAGS = -lssl

CXX = g++
CXXFLAGS = -fPIC -g -ggdb -I/usr/include `pkg-config --cflags freeswitch` $(MODCFLAGS) -std=c++0x
LDFLAGS = `pkg-config --libs freeswitch` -lrdkafka++ $(MODLDFLAGS)
LDFLAGS = `pkg-config --libs freeswitch` -lrdkafka -lz -lpthread -lrt $(MODLDFLAGS)

.PHONY: all
all: $(MODNAME)
Expand Down
2 changes: 1 addition & 1 deletion distribution/deb/DEBIAN/control
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ Version: 1._VERSION_
Architecture: all
Section: base
Priority: standard
Depends: curl, unzip
Depends: librdkafka1 (>= 0.9.3-1)
Maintainer: Kinshuk Bairagi <[email protected]>
Description: Publish FreeSwitch Event to Kafka
3 changes: 1 addition & 2 deletions distribution/make-deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ mkdir -p $BUILD_ROOT/etc/freeswitch/autoload_configs/

cp /etc/freeswitch/autoload_configs/event_kafka.conf.xml $BUILD_ROOT/etc/freeswitch/autoload_configs/event_kafka.conf.xml
cp /usr/lib/freeswitch/mod/mod_event_kafka.so $BUILD_ROOT/usr/lib/freeswitch/mod/mod_event_kafka.so
cp /usr/local/lib/librdkafka++.* $BUILD_ROOT/usr/local/lib/

sed -i "s/_VERSION_/$VERSION/g" $BUILD_ROOT/DEBIAN/control
dpkg-deb --build $BUILD_ROOT mod_event_kafka.deb
dpkg-deb --build $BUILD_ROOT mod-event-kafka.deb

rm -rf $BUILD_ROOT
94 changes: 53 additions & 41 deletions mod_event_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,51 +63,52 @@ namespace mod_event_kafka {
return SWITCH_STATUS_SUCCESS;
}

class KafkaDeliveryReportCallback : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) override {
if (message.err() == RdKafka::ERR_NO_ERROR){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",message.len(), message.partition(), message.offset());
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Message delivery failed %s \n",message.errstr().c_str());
}
}
};

class KafkaEventPublisher {

static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, " Message delivery failed %s \n",rd_kafka_err2str(rkmessage->err));
else
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",rkmessage->len, rkmessage->partition, rkmessage->offset);
}
};

public:
KafkaEventPublisher(){

switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Initialising...");

load_config(SWITCH_FALSE);

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf = rd_kafka_conf_new();

if (conf->set("metadata.broker.list", globals.brokers, errstr) != RdKafka::Conf::CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr.c_str());
if (rd_kafka_conf_set(conf, "metadata.broker.list", globals.brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

if (conf->set("queue.buffering.max.messages", "5", errstr) != RdKafka::Conf::CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr.c_str());
if (rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

KafkaDeliveryReportCallback *ex_dr_cb = new KafkaDeliveryReportCallback();
conf->set("dr_cb", ex_dr_cb, errstr);

producer = RdKafka::Producer::create(conf, errstr);
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s", errstr.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s", errstr);
}

std::string topic_str = std::string(globals.topic_prefix) + "_" + std::string(switch_core_get_switchname());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s", topic_str.c_str());

topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
// topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
topic = rd_kafka_topic_new(producer, topic_str.c_str(), NULL);
if (!topic) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic_str.c_str(), errstr.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic_str.c_str(), rd_kafka_err2str(rd_kafka_last_error()));
}

switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic.c_str(), rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
}

_initialized = 1;
Expand All @@ -120,13 +121,13 @@ namespace mod_event_kafka {
size_t len = strlen(event_json);

if(_initialized){
RdKafka::ErrorCode resp = send(event_json,0);
if (resp != RdKafka::ERR_NO_ERROR){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s", RdKafka::err2str(resp).c_str());
int resp = send(event_json,0);
if (resp == -1){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s", rd_kafka_err2str(rd_kafka_last_error()));
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"Produced message (%zu bytes)", len);
}
producer->poll(0);
rd_kafka_poll(producer, 0);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "PublishEvent without active KafkaPublisher");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, event_json);
Expand All @@ -135,28 +136,37 @@ namespace mod_event_kafka {

void Shutdown(){
//flush within 100ms
producer->flush(100);
rd_kafka_flush(producer, 100);
}

~KafkaEventPublisher(){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Destroyed");
rd_kafka_topic_destroy(topic);
rd_kafka_destroy(producer);
}

private:

RdKafka::ErrorCode send(char *data, int currentCount = 0){
int send(char *data, int currentCount = 0){
if(++currentCount <= max_retry_limit){
RdKafka::ErrorCode result = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
data, strlen(data),
NULL, NULL);
if (result == RdKafka::ERR_NO_ERROR){
int result = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY /* Copy payload */,
(void *)data, strlen(data),
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL);

auto last_error = rd_kafka_last_error();
if (result != -1){
return result;
} else if(result == RdKafka::ERR__QUEUE_FULL) {
} else if(last_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"queue.buffering.max.messages limit reached, waiting 1sec to flush out.");
std::thread([this, data, currentCount ]() {
std::thread([this, data, currentCount]() {
//localqueue is full, hold and flush them.
producer->poll(1000);
rd_kafka_poll(producer, 1000/*block for max 1000ms*/);
send(data,currentCount);
})
.detach(); //TODO: limit number of forked threads
Expand All @@ -169,14 +179,16 @@ namespace mod_event_kafka {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "KafkaEventPublisher send max_retry_limit hit");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, data);
}
return RdKafka::ERR__FAIL;
return 0;
}

int max_retry_limit = 3;
std::string errstr;
bool _initialized = 0;
RdKafka::Producer *producer;
RdKafka::Topic *topic;

rd_kafka_t *producer;
rd_kafka_topic_t *topic;
rd_kafka_conf_t *conf;
char errstr[512];

};

Expand Down
6 changes: 4 additions & 2 deletions mod_event_kafka.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#ifndef MOD_EVENT_KAFKA_H
#define MOD_EVENT_KAFKA_H

#include "librdkafka/rdkafkacpp.h"

extern "C" {
#include "librdkafka/rdkafka.h"
}

namespace mod_event_kafka {

Expand Down

0 comments on commit 0501d4f

Please sign in to comment.