diff --git a/.gitignore b/.gitignore index d9b1e0a..8d9557d 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,8 @@ cmake-build-debug/ #VSCode -.vscode \ No newline at end of file +.vscode + + +#dist +*.deb \ No newline at end of file diff --git a/Makefile b/Makefile index 3d316b6..c2c29f0 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,12 @@ # Customise these as appropriate MODNAME = mod_event_kafka.so -MODOBJ = mod_event_kafka.o #file.o file2.o file3.o +MODOBJ = mod_event_kafka.o MODCFLAGS = -Wall -Werror MODLDFLAGS = -lssl CXX = g++ CXXFLAGS = -fPIC -g -ggdb -I/usr/include `pkg-config --cflags freeswitch` $(MODCFLAGS) -std=c++0x -LDFLAGS = `pkg-config --libs freeswitch` -lrdkafka++ $(MODLDFLAGS) +LDFLAGS = `pkg-config --libs freeswitch` -lrdkafka -lz -lpthread -lrt $(MODLDFLAGS) .PHONY: all all: $(MODNAME) diff --git a/distribution/deb/DEBIAN/control b/distribution/deb/DEBIAN/control index cfd4159..cca1124 100755 --- a/distribution/deb/DEBIAN/control +++ b/distribution/deb/DEBIAN/control @@ -3,6 +3,6 @@ Version: 1._VERSION_ Architecture: all Section: base Priority: standard -Depends: curl, unzip +Depends: librdkafka1 (>= 0.9.3-1) Maintainer: Kinshuk Bairagi Description: Publish FreeSwitch Event to Kafka diff --git a/distribution/make-deb.sh b/distribution/make-deb.sh index e9d2168..4daa339 100755 --- a/distribution/make-deb.sh +++ b/distribution/make-deb.sh @@ -11,9 +11,8 @@ mkdir -p $BUILD_ROOT/etc/freeswitch/autoload_configs/ cp /etc/freeswitch/autoload_configs/event_kafka.conf.xml $BUILD_ROOT/etc/freeswitch/autoload_configs/event_kafka.conf.xml cp /usr/lib/freeswitch/mod/mod_event_kafka.so $BUILD_ROOT/usr/lib/freeswitch/mod/mod_event_kafka.so -cp /usr/local/lib/librdkafka++.* $BUILD_ROOT/usr/local/lib/ sed -i "s/_VERSION_/$VERSION/g" $BUILD_ROOT/DEBIAN/control -dpkg-deb --build $BUILD_ROOT mod_event_kafka.deb +dpkg-deb --build $BUILD_ROOT mod-event-kafka.deb rm -rf $BUILD_ROOT \ No newline at end of file diff --git a/mod_event_kafka.cpp b/mod_event_kafka.cpp index 446677d..754a311 100644 --- a/mod_event_kafka.cpp +++ b/mod_event_kafka.cpp @@ -63,19 +63,17 @@ namespace mod_event_kafka { return SWITCH_STATUS_SUCCESS; } - class KafkaDeliveryReportCallback : public RdKafka::DeliveryReportCb { - public: - void dr_cb (RdKafka::Message &message) override { - if (message.err() == RdKafka::ERR_NO_ERROR){ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",message.len(), message.partition(), message.offset()); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Message delivery failed %s \n",message.errstr().c_str()); - } - } - }; class KafkaEventPublisher { + static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, " Message delivery failed %s \n",rd_kafka_err2str(rkmessage->err)); + else + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Message delivered (%zd bytes, partition %d, offset %" PRId64 ") \n",rkmessage->len, rkmessage->partition, rkmessage->offset); + } + }; + public: KafkaEventPublisher(){ @@ -83,31 +81,34 @@ namespace mod_event_kafka { load_config(SWITCH_FALSE); - RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + conf = rd_kafka_conf_new(); - if (conf->set("metadata.broker.list", globals.brokers, errstr) != RdKafka::Conf::CONF_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr.c_str()); + if (rd_kafka_conf_set(conf, "metadata.broker.list", globals.brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr); } - if (conf->set("queue.buffering.max.messages", "5", errstr) != RdKafka::Conf::CONF_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr.c_str()); + if (rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr); } - - KafkaDeliveryReportCallback *ex_dr_cb = new KafkaDeliveryReportCallback(); - conf->set("dr_cb", ex_dr_cb, errstr); - producer = RdKafka::Producer::create(conf, errstr); + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s", errstr.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s", errstr); } std::string topic_str = std::string(globals.topic_prefix) + "_" + std::string(switch_core_get_switchname()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s", topic_str.c_str()); - topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); + // topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); + topic = rd_kafka_topic_new(producer, topic_str.c_str(), NULL); if (!topic) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic_str.c_str(), errstr.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic_str.c_str(), rd_kafka_err2str(rd_kafka_last_error())); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s", topic.c_str(), rd_kafka_err2str(rd_kafka_last_error())); + rd_kafka_destroy(rk); } _initialized = 1; @@ -120,13 +121,13 @@ namespace mod_event_kafka { size_t len = strlen(event_json); if(_initialized){ - RdKafka::ErrorCode resp = send(event_json,0); - if (resp != RdKafka::ERR_NO_ERROR){ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s", RdKafka::err2str(resp).c_str()); + int resp = send(event_json,0); + if (resp == -1){ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to produce, with error %s", rd_kafka_err2str(rd_kafka_last_error())); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"Produced message (%zu bytes)", len); } - producer->poll(0); + rd_kafka_poll(producer, 0); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "PublishEvent without active KafkaPublisher"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, event_json); @@ -135,28 +136,37 @@ namespace mod_event_kafka { void Shutdown(){ //flush within 100ms - producer->flush(100); + rd_kafka_flush(producer, 100); } ~KafkaEventPublisher(){ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Destroyed"); + rd_kafka_topic_destroy(topic); + rd_kafka_destroy(producer); } private: - RdKafka::ErrorCode send(char *data, int currentCount = 0){ + int send(char *data, int currentCount = 0){ if(++currentCount <= max_retry_limit){ - RdKafka::ErrorCode result = producer->produce(topic, RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - data, strlen(data), - NULL, NULL); - if (result == RdKafka::ERR_NO_ERROR){ + int result = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY /* Copy payload */, + (void *)data, strlen(data), + /* Optional key and its length */ + NULL, 0, + /* Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + NULL); + + auto last_error = rd_kafka_last_error(); + if (result != -1){ return result; - } else if(result == RdKafka::ERR__QUEUE_FULL) { + } else if(last_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"queue.buffering.max.messages limit reached, waiting 1sec to flush out."); - std::thread([this, data, currentCount ]() { + std::thread([this, data, currentCount]() { //localqueue is full, hold and flush them. - producer->poll(1000); + rd_kafka_poll(producer, 1000/*block for max 1000ms*/); send(data,currentCount); }) .detach(); //TODO: limit number of forked threads @@ -169,14 +179,16 @@ namespace mod_event_kafka { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "KafkaEventPublisher send max_retry_limit hit"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, data); } - return RdKafka::ERR__FAIL; + return 0; } int max_retry_limit = 3; - std::string errstr; bool _initialized = 0; - RdKafka::Producer *producer; - RdKafka::Topic *topic; + + rd_kafka_t *producer; + rd_kafka_topic_t *topic; + rd_kafka_conf_t *conf; + char errstr[512]; }; diff --git a/mod_event_kafka.hpp b/mod_event_kafka.hpp index f1e1159..c515f41 100644 --- a/mod_event_kafka.hpp +++ b/mod_event_kafka.hpp @@ -1,7 +1,9 @@ #ifndef MOD_EVENT_KAFKA_H #define MOD_EVENT_KAFKA_H - -#include "librdkafka/rdkafkacpp.h" + +extern "C" { + #include "librdkafka/rdkafka.h" +} namespace mod_event_kafka {