Skip to content

Commit

Permalink
add separate ipaccount thread
Browse files Browse the repository at this point in the history
  • Loading branch information
rbucek committed Feb 12, 2015
1 parent 95a4943 commit 3cc07ec
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 111 deletions.
26 changes: 14 additions & 12 deletions calltable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ extern char mac[32];

unsigned int last_register_clean = 0;

extern CustPhoneNumberCache *custPnCache;
extern int opt_onewaytimeout;
extern int opt_saveaudio_reversestereo;

Expand Down Expand Up @@ -1891,17 +1890,20 @@ Call::saveToDb(bool enableBatchIfPossible) {
cdr.add(whohanged ? "callee" : "caller", "whohanged");
}

if(get_customers_pn_query[0] && custPnCache) {
cust_reseller cr;
cr = custPnCache->getCustomerByPhoneNumber(caller);
if(cr.cust_id) {
cdr.add(cr.cust_id, "caller_customer_id");
cdr.add(cr.reseller_id, "caller_reseller_id");
}
cr = custPnCache->getCustomerByPhoneNumber(called);
if(cr.cust_id) {
cdr.add(cr.cust_id, "called_customer_id");
cdr.add(cr.reseller_id, "called_reseller_id");
if(get_customers_pn_query[0]) {
CustPhoneNumberCache *custPnCache = getCustPnCache();
if(custPnCache) {
cust_reseller cr;
cr = custPnCache->getCustomerByPhoneNumber(caller);
if(cr.cust_id) {
cdr.add(cr.cust_id, "caller_customer_id");
cdr.add(cr.reseller_id, "caller_reseller_id");
}
cr = custPnCache->getCustomerByPhoneNumber(called);
if(cr.cust_id) {
cdr.add(cr.cust_id, "called_customer_id");
cdr.add(cr.reseller_id, "called_reseller_id");
}
}
}

Expand Down
288 changes: 209 additions & 79 deletions ipaccount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and insert them into Call class.
#include <syslog.h>
#include <semaphore.h>
#include <algorithm>
#include <iomanip>

#include "ipaccount.h"
#include "flags.h"
Expand Down Expand Up @@ -84,28 +85,65 @@ extern char mysql_user[256];
extern char mysql_password[256];

extern MySqlStore *sqlStore;
extern int terminating;

typedef map<unsigned int, octects_live_t*> t_ipacc_live;
t_ipacc_live ipacc_live;

typedef map<string, octects_t*> t_ipacc_buffer;
static t_ipacc_buffer ipacc_buffer[2];
static volatile int sync_save_ipacc_buffer[2];
Ipacc *IPACC;

static unsigned int last_flush_interval_time = 0;
CustIpCache *custIpCache = NULL;
static NextIpCache *nextIpCache = NULL;
CustPhoneNumberCache *custPnCache = NULL;

SqlDb *sqlDbSaveIpacc = NULL;
inline void *_Ipacc_outThreadFunction(void *arg) {
return(((Ipacc*)arg)->outThreadFunction());
}

Ipacc::Ipacc() {
sync_save_ipacc_buffer[0] = 0;
sync_save_ipacc_buffer[1] = 0;
last_flush_interval_time = 0;
custIpCache = NULL;
nextIpCache = NULL;
custPnCache = NULL;
sqlDbSave = NULL;

qringmax = 10000;
readit = 0;
writeit = 0;
qring = new packet[qringmax];
for(unsigned int i = 0; i < qringmax; i++) {
qring[i].used = 0;
}
memset(this->threadPstatData, 0, sizeof(this->threadPstatData));
pthread_create(&this->out_thread_handle, NULL, _Ipacc_outThreadFunction, this);

init();
}

void ipacc_save(int indexIpaccBuffer, unsigned int interval_time_limit = 0) {
Ipacc::~Ipacc() {
delete [] qring;
term();
}

if(!sqlDbSaveIpacc) {
sqlDbSaveIpacc = createSqlObject();
inline void Ipacc::push(time_t timestamp, unsigned int saddr, unsigned int daddr, int port, int proto, int packetlen, int voippacket) {
while(this->qring[this->writeit].used != 0) {
usleep(10);
}
packet *_packet = &this->qring[this->writeit];
_packet->timestamp = timestamp;
_packet->saddr = saddr;
_packet->daddr = daddr;
_packet->port = port;
_packet->proto = proto;
_packet->packetlen = packetlen;
_packet->voippacket = voippacket;
_packet->used = 1;
if((this->writeit + 1) == this->qringmax) {
this->writeit = 0;
} else {
this->writeit++;
}
}

void Ipacc::save(int indexIpaccBuffer, unsigned int interval_time_limit) {
if(custIpCache) {
custIpCache->flush();
}
Expand Down Expand Up @@ -212,7 +250,7 @@ void ipacc_save(int indexIpaccBuffer, unsigned int interval_time_limit = 0) {
row.add(ipacc_data->numpackets, "numpackets");
row.add(ipacc_data->voippacket, "voip");
row.add(opt_ipacc_sniffer_agregate ? 0 : 1, "do_agr_trigger");
sqlDbSaveIpacc->insert("ipacc", row);
sqlDbSave->insert("ipacc", row);
}
++_counter;

Expand Down Expand Up @@ -266,7 +304,7 @@ void ipacc_save(int indexIpaccBuffer, unsigned int interval_time_limit = 0) {
//printf("flush\n");
}

void ipacc_add_octets(time_t timestamp, unsigned int saddr, unsigned int daddr, int port, int proto, int packetlen, int voippacket) {
inline void Ipacc::add_octets(time_t timestamp, unsigned int saddr, unsigned int daddr, int port, int proto, int packetlen, int voippacket) {
string key;
char buf[100];
octects_t *octects_data;
Expand All @@ -281,7 +319,7 @@ void ipacc_add_octets(time_t timestamp, unsigned int saddr, unsigned int daddr,
int saveIndexIpaccBuffer = indexIpaccBuffer == 0 ? 1 : 0;
if(!__sync_fetch_and_add(&sync_save_ipacc_buffer[saveIndexIpaccBuffer], 1)) {
last_flush_interval_time = cur_interval_time;
ipacc_save(saveIndexIpaccBuffer, last_flush_interval_time);
save(saveIndexIpaccBuffer, last_flush_interval_time);
}
}

Expand All @@ -305,7 +343,136 @@ void ipacc_add_octets(time_t timestamp, unsigned int saddr, unsigned int daddr,
tmp->voippacket = voippacket;
// printf("key[%s] %u\n", key.c_str(), tmp->octects);
}
}

void Ipacc::init() {
sqlDbSave = createSqlObject();

if(get_customer_by_ip_sql_driver[0] && get_customer_by_ip_odbc_dsn[0]) {
custIpCache = new CustIpCache();
custIpCache->setConnectParams(
get_customer_by_ip_sql_driver,
get_customer_by_ip_odbc_dsn,
get_customer_by_ip_odbc_user,
get_customer_by_ip_odbc_password,
get_customer_by_ip_odbc_driver);
custIpCache->setConnectParamsRadius(
get_radius_ip_driver,
get_radius_ip_host,
get_radius_ip_db,
get_radius_ip_user,
get_radius_ip_password);
custIpCache->setQueryes(
get_customer_by_ip_query,
get_customers_ip_query);
custIpCache->setQueryesRadius(
get_customers_radius_name_query,
get_radius_ip_query,
get_radius_ip_query_where);
custIpCache->connect();
if(get_customers_ip_query[0]) {
custIpCache->fetchAllIpQueryFromDb();
custIpCache->setMaxQueryPass(2);
}
}
if(isSqlDriver("mysql")) {
nextIpCache = new NextIpCache();
nextIpCache->connect();
nextIpCache->fetch();
nextIpCache->setMaxQueryPass(2);
}
if(get_customer_by_pn_sql_driver[0] && get_customer_by_pn_odbc_dsn[0]) {
custPnCache = new CustPhoneNumberCache();
custPnCache->setConnectParams(
get_customer_by_pn_sql_driver,
get_customer_by_pn_odbc_dsn,
get_customer_by_pn_odbc_user,
get_customer_by_pn_odbc_password,
get_customer_by_pn_odbc_driver);
custPnCache->setQueryes(get_customers_pn_query);
custPnCache->connect();
if(get_customers_pn_query[0]) {
custPnCache->fetchPhoneNumbersFromDb();
custPnCache->setMaxQueryPass(2);
}
}
}

void Ipacc::term() {
if(custIpCache) {
delete custIpCache;
}
if(nextIpCache) {
delete nextIpCache;
}
if(custPnCache) {
delete custPnCache;
}
t_ipacc_buffer::iterator iter;
for(int i = 0; i < 2; i++) {
for(iter = ipacc_buffer[i].begin(); iter != ipacc_buffer[i].end(); ++iter) {
delete iter->second;
}
}
delete sqlDbSave;
}

int Ipacc::refreshCustIpCache() {
if(!custIpCache) {
return(0);
}
custIpCache->clear();
return(custIpCache->fetchAllIpQueryFromDb());
}

void Ipacc::preparePstatData() {
if(this->outThreadId) {
if(this->threadPstatData[0].cpu_total_time) {
this->threadPstatData[1] = this->threadPstatData[0];
}
pstat_get_data(this->outThreadId, this->threadPstatData);
}
}

double Ipacc::getCpuUsagePerc(bool preparePstatData) {
if(preparePstatData) {
this->preparePstatData();
}
if(this->outThreadId) {
double ucpu_usage, scpu_usage;
if(this->threadPstatData[0].cpu_total_time && this->threadPstatData[1].cpu_total_time) {
pstat_calc_cpu_usage_pct(
&this->threadPstatData[0], &this->threadPstatData[1],
&ucpu_usage, &scpu_usage);
return(ucpu_usage + scpu_usage);
}
}
return(-1);
}

void *Ipacc::outThreadFunction() {
this->outThreadId = get_unix_tid();
syslog(LOG_NOTICE, "start Ipacc out thread %i", this->outThreadId);
while(!terminating) {
if(this->qring[this->readit].used == 1) {
packet *_packet = &this->qring[this->readit];
add_octets(_packet->timestamp, _packet->saddr, _packet->daddr, _packet->port, _packet->proto, _packet->packetlen, _packet->voippacket);
_packet->used = 0;
if((this->readit + 1) == this->qringmax) {
this->readit = 0;
} else {
this->readit++;
}
} else {
usleep(1000);
}
}
return(NULL);
}

inline void ipacc_add_octets(time_t timestamp, unsigned int saddr, unsigned int daddr, int port, int proto, int packetlen, int voippacket) {
IPACC[0].push(timestamp, saddr, daddr, port, proto, packetlen, voippacket);

t_ipacc_live::iterator it;
octects_live_t *data;
for(it = ipacc_live.begin(); it != ipacc_live.end();) {
Expand Down Expand Up @@ -1109,75 +1276,38 @@ void octects_live_t::setFilter(const char *ipfilter) {
std::sort(this->ipfilter.begin(), this->ipfilter.end());
}

CustIpCache *getCustIpCache() {
return(IPACC ? IPACC[0].getCustIpCache() : NULL);
}

CustPhoneNumberCache *getCustPnCache() {
return(IPACC ? IPACC[0].getCustPnCache() : NULL);
}

int refreshCustIpCache() {
return(IPACC ? IPACC[0].refreshCustIpCache() : 0);
}

unsigned int lengthIpaccBuffer() {
return(ipacc_buffer[0].size() + ipacc_buffer[1].size());
return(IPACC ? IPACC[0].lengthBuffer() : 0);
}

void initIpacc() {
if(get_customer_by_ip_sql_driver[0] && get_customer_by_ip_odbc_dsn[0]) {
custIpCache = new CustIpCache();
custIpCache->setConnectParams(
get_customer_by_ip_sql_driver,
get_customer_by_ip_odbc_dsn,
get_customer_by_ip_odbc_user,
get_customer_by_ip_odbc_password,
get_customer_by_ip_odbc_driver);
custIpCache->setConnectParamsRadius(
get_radius_ip_driver,
get_radius_ip_host,
get_radius_ip_db,
get_radius_ip_user,
get_radius_ip_password);
custIpCache->setQueryes(
get_customer_by_ip_query,
get_customers_ip_query);
custIpCache->setQueryesRadius(
get_customers_radius_name_query,
get_radius_ip_query,
get_radius_ip_query_where);
custIpCache->connect();
if(get_customers_ip_query[0]) {
custIpCache->fetchAllIpQueryFromDb();
custIpCache->setMaxQueryPass(2);
}
}
if(isSqlDriver("mysql")) {
nextIpCache = new NextIpCache();
nextIpCache->connect();
nextIpCache->fetch();
nextIpCache->setMaxQueryPass(2);
}
if(get_customer_by_pn_sql_driver[0] && get_customer_by_pn_odbc_dsn[0]) {
custPnCache = new CustPhoneNumberCache();
custPnCache->setConnectParams(
get_customer_by_pn_sql_driver,
get_customer_by_pn_odbc_dsn,
get_customer_by_pn_odbc_user,
get_customer_by_pn_odbc_password,
get_customer_by_pn_odbc_driver);
custPnCache->setQueryes(get_customers_pn_query);
custPnCache->connect();
if(get_customers_pn_query[0]) {
custPnCache->fetchPhoneNumbersFromDb();
custPnCache->setMaxQueryPass(2);
string getIpaccCpuUsagePerc() {
ostringstream outStr;
if(IPACC) {
outStr << fixed;
double tipacc = IPACC[0].getCpuUsagePerc(true);
if(tipacc > 0) {
outStr << setprecision(1) << tipacc << "%";
}
}
return(outStr.str());
}

void freeMemIpacc() {
if(custIpCache) {
delete custIpCache;
}
if(nextIpCache) {
delete nextIpCache;
}
if(custPnCache) {
delete custPnCache;
}
t_ipacc_buffer::iterator iter;
for(int i = 0; i < 2; i++) {
for(iter = ipacc_buffer[i].begin(); iter != ipacc_buffer[i].end(); ++iter) {
delete iter->second;
}
}
void initIpacc() {
IPACC = new Ipacc[1];
}

void termIpacc() {
delete [] IPACC;
}
Loading

0 comments on commit 3cc07ec

Please sign in to comment.