From 4e1cf86942bd238961190b37ce986db27fa50faa Mon Sep 17 00:00:00 2001 From: rbucek Date: Thu, 16 Jul 2015 12:55:10 +0200 Subject: [PATCH] ticket VG-12 eliminate exit in new-config mode if failed connect to database clean code, remove obsolete code --- config_mysql.cpp | 47 +- config_mysql.h | 4 +- manager.cpp | 2 + pcap_queue.cpp | 1 - skinny.cpp | 1 - sniff.cpp | 582 ++++------------------- sniff.h | 56 +-- sql_db.cpp | 35 +- sql_db.h | 6 +- voipmonitor.cpp | 1145 ++++++++++++++++++++-------------------------- voipmonitor.h | 13 +- 11 files changed, 631 insertions(+), 1261 deletions(-) diff --git a/config_mysql.cpp b/config_mysql.cpp index 2b8115c85..767917883 100644 --- a/config_mysql.cpp +++ b/config_mysql.cpp @@ -1,3 +1,5 @@ +/* obsolete + #include #include #include @@ -680,33 +682,22 @@ config_load_mysql(bool checkConnect) { opt_saveaudio_reversestereo = atoi(row["saveaudio_reversestereo"].c_str()); } - /* - - packetbuffer default configuration - - packetbuffer_enable = no - packetbuffer_block_maxsize = 500 #kB - packetbuffer_block_maxtime = 500 #ms - packetbuffer_total_maxheap = 500 #MB - packetbuffer_thread_maxheap = 500 #MB - packetbuffer_file_totalmaxsize = 20000 #MB - packetbuffer_file_path = /var/spool/voipmonitor/packetbuffer - packetbuffer_file_maxfilesize = 1000 #MB - packetbuffer_file_maxtime = 5000 #ms - packetbuffer_compress = yes - #mirror_destination_ip = - #mirror_destination_port = - #mirror_source_ip = - #mirror_source_port = - */ - -#ifdef QUEUE_NONBLOCK2 - if(opt_scanpcapdir[0] != '\0') { - opt_pcap_queue = 0; - } -#else - opt_pcap_queue = 0; -#endif +// packetbuffer default configuration +// +// packetbuffer_enable = no +// packetbuffer_block_maxsize = 500 #kB +// packetbuffer_block_maxtime = 500 #ms +// packetbuffer_total_maxheap = 500 #MB +// packetbuffer_thread_maxheap = 500 #MB +// packetbuffer_file_totalmaxsize = 20000 #MB +// packetbuffer_file_path = /var/spool/voipmonitor/packetbuffer +// packetbuffer_file_maxfilesize = 1000 #MB +// packetbuffer_file_maxtime = 5000 #ms +// packetbuffer_compress = yes +// #mirror_destination_ip = +// #mirror_destination_port = +// #mirror_source_ip = +// #mirror_source_port = if(opt_pcap_queue) { if(!opt_pcap_queue_disk_folder.length() || !opt_pcap_queue_store_queue_max_disk_size) { @@ -740,3 +731,5 @@ config_load_mysql(bool checkConnect) { } delete sqlDb; } + +*/ \ No newline at end of file diff --git a/config_mysql.h b/config_mysql.h index 56adc8ec4..1eec757b9 100644 --- a/config_mysql.h +++ b/config_mysql.h @@ -1,3 +1,5 @@ +/* obsolete + #include #include #include @@ -154,7 +156,6 @@ extern int opt_pcap_queue_iface_dedup_separate_threads_extend; extern int opt_pcap_queue_iface_separate_threads; extern int opt_pcap_queue_receive_dlt; extern int opt_pcap_split; -extern int opt_pcap_threaded; extern int opt_printinsertid; extern int opt_promisc; extern int opt_read_from_file; @@ -268,3 +269,4 @@ extern unsigned int opt_openfile_max; void config_load_mysql(bool checkConnect = false); +*/ diff --git a/manager.cpp b/manager.cpp index d3c89cb86..179a24ed9 100644 --- a/manager.cpp +++ b/manager.cpp @@ -682,6 +682,7 @@ int parse_command(char *buf, int size, int client, int eof, const char *buf_long return -1; } } else if(strstr(buf, "listcalls") != NULL) { + if(calltable) { //list::iterator call; map::iterator callMAPIT; Call *call; @@ -787,6 +788,7 @@ int parse_command(char *buf, int size, int client, int eof, const char *buf_long return -1; } delete [] resbuf; + } return 0; } else if(strstr(buf, "d_lc_for_destroy") != NULL) { ostringstream outStr; diff --git a/pcap_queue.cpp b/pcap_queue.cpp index 153256d59..d4b615c0d 100644 --- a/pcap_queue.cpp +++ b/pcap_queue.cpp @@ -149,7 +149,6 @@ static bool __config_BYPASS_FIFO = true; static bool __config_USE_PCAP_FOR_FIFO = false; static bool __config_ENABLE_TOGETHER_READ_WRITE_FILE = false; -int opt_pcap_queue = 1; #if TEST_DEBUG_PARAMS > 0 u_int opt_pcap_queue_block_max_time_ms = 500; size_t opt_pcap_queue_block_max_size = OPT_PCAP_BLOCK_STORE_MAX_ITEMS * AVG_PACKET_SIZE; diff --git a/skinny.cpp b/skinny.cpp index 889f31f69..380b4a918 100644 --- a/skinny.cpp +++ b/skinny.cpp @@ -65,7 +65,6 @@ extern int opt_mirroronly; extern char opt_scanpcapdir[2048]; extern int opt_ipaccount; extern int rtp_threaded; -extern int opt_pcap_threaded; extern int opt_rtpnosip; extern char opt_cachedir[1024]; extern int opt_savewav_force; diff --git a/sniff.cpp b/sniff.cpp index de7d20d29..18341d75d 100644 --- a/sniff.cpp +++ b/sniff.cpp @@ -76,12 +76,6 @@ and insert them into Call class. extern MirrorIP *mirrorip; -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) -extern "C" { -#include "liblfds.6/inc/liblfds.h" -} -#endif - #define MAXLIVEFILTERS 10 using namespace std; @@ -107,7 +101,6 @@ volatile unsigned int glob_last_packet_time; Calltable *calltable; extern volatile int calls_counter; -extern int opt_pcap_queue; extern int opt_saveSIP; // save SIP packets to pcap file? extern int opt_saveRTP; // save RTP packets to pcap file? extern int opt_saveRTCP; // save RTCP packets to pcap file? @@ -160,18 +153,12 @@ extern SIP_HEADERfilter *sipheaderfilter; extern SIP_HEADERfilter *sipheaderfilter_reload; extern volatile int sipheaderfilter_reload_do; extern int rtp_threaded; -extern int opt_pcap_threaded; extern int opt_rtpsave_threaded; extern int opt_rtpnosip; extern char opt_cachedir[1024]; extern int opt_savewav_force; extern int opt_saveudptl; extern nat_aliases_t nat_aliases; -extern pcap_packet *pcap_qring; -extern volatile unsigned int pcap_readit; -extern volatile unsigned int pcap_writeit; -extern unsigned int pcap_qring_max; -extern unsigned int pcap_qring_usleep; extern int opt_enable_preprocess_packet; extern unsigned int opt_preprocess_packets_qring_length; extern unsigned int opt_preprocess_packets_qring_usleep; @@ -227,10 +214,6 @@ extern bool _save_sip_history_all_requests; extern bool _save_sip_history_all_responses;extern int absolute_timeout; unsigned int glob_ssl_calls = 0; -#ifdef QUEUE_MUTEX -extern sem_t readpacket_thread_semaphore; -#endif - char * gettag(const void *ptr, unsigned long len, const char *tag, unsigned long *gettaglen, unsigned long *limitLen = NULL, ParsePacket *parsePacket = NULL); static void logPacketSipMethodCall(u_int64_t packet_number, int sip_method, int lastSIPresponseNum, pcap_pkthdr *header, @@ -1308,133 +1291,61 @@ void add_to_rtp_thread_queue(Call *call, unsigned char *data, int datalen, int d rtp_read_thread *params = &(rtp_threads[call->thread_num]); -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - rtp_packet *rtpp = new FILE_LINE rtp_packet; - rtpp->data = new FILE_LINE unsigned char[datalen]; -#endif - -#ifdef QUEUE_NONBLOCK2 - rtp_packet *rtpp; - if(!opt_pcap_queue) { - rtpp = &(params->vmbuffer[params->writeit % params->vmbuffermax]); - while(params->vmbuffer[params->writeit % params->vmbuffermax].free == 0) { - // no room left, loop until there is room - usleep(100); - } + if(block_store) { + block_store->lock_packet(block_store_index); } -#endif - - if(opt_pcap_queue) { - if(block_store) { - block_store->lock_packet(block_store_index); - } - if(params->rtpp_queue_quick || - params->rtpp_queue_quick_boost) { - rtp_packet_pcap_queue rtpp_pq; - rtpp_pq.call = call; - rtpp_pq.saddr = saddr; - rtpp_pq.daddr = daddr; - rtpp_pq.sport = sport; - rtpp_pq.dport = dport; - rtpp_pq.iscaller = iscaller; - rtpp_pq.is_rtcp = is_rtcp; - rtpp_pq.save_packet = enable_save_packet; - rtpp_pq.packet = packet; - rtpp_pq.istcp = istcp; - rtpp_pq.dlt = dlt; - rtpp_pq.sensor_id = sensor_id; - rtpp_pq.data = data; - rtpp_pq.datalen = datalen; - rtpp_pq.dataoffset = dataoffset; - rtpp_pq.header = *header; - rtpp_pq.block_store = block_store; - rtpp_pq.block_store_index =block_store_index; - if(params->rtpp_queue_quick) { - params->rtpp_queue_quick->push(&rtpp_pq, true, opt_enable_process_rtp_packet > 1); - } else { - params->rtpp_queue_quick_boost->push(&rtpp_pq, true, opt_enable_process_rtp_packet > 1); - } + if(params->rtpp_queue_quick || + params->rtpp_queue_quick_boost) { + rtp_packet_pcap_queue rtpp_pq; + rtpp_pq.call = call; + rtpp_pq.saddr = saddr; + rtpp_pq.daddr = daddr; + rtpp_pq.sport = sport; + rtpp_pq.dport = dport; + rtpp_pq.iscaller = iscaller; + rtpp_pq.is_rtcp = is_rtcp; + rtpp_pq.save_packet = enable_save_packet; + rtpp_pq.packet = packet; + rtpp_pq.istcp = istcp; + rtpp_pq.dlt = dlt; + rtpp_pq.sensor_id = sensor_id; + rtpp_pq.data = data; + rtpp_pq.datalen = datalen; + rtpp_pq.dataoffset = dataoffset; + rtpp_pq.header = *header; + rtpp_pq.block_store = block_store; + rtpp_pq.block_store_index =block_store_index; + if(params->rtpp_queue_quick) { + params->rtpp_queue_quick->push(&rtpp_pq, true, opt_enable_process_rtp_packet > 1); } else { - params->rtpp_queue->lock(); - rtp_packet_pcap_queue *rtpp_pq; - while((rtpp_pq = params->rtpp_queue->push_get_pointer()) == NULL) { - usleep(10); - } - rtpp_pq->call = call; - rtpp_pq->saddr = saddr; - rtpp_pq->daddr = daddr; - rtpp_pq->sport = sport; - rtpp_pq->dport = dport; - rtpp_pq->iscaller = iscaller; - rtpp_pq->is_rtcp = is_rtcp; - rtpp_pq->save_packet = enable_save_packet; - rtpp_pq->packet = packet; - rtpp_pq->istcp = istcp; - rtpp_pq->dlt = dlt; - rtpp_pq->sensor_id = sensor_id; - rtpp_pq->data = data; - rtpp_pq->datalen = datalen; - rtpp_pq->dataoffset = dataoffset; - rtpp_pq->header = *header; - rtpp_pq->block_store = block_store; - rtpp_pq->block_store_index =block_store_index; - params->rtpp_queue->unlock(); + params->rtpp_queue_quick_boost->push(&rtpp_pq, true, opt_enable_process_rtp_packet > 1); } } else { - rtpp->call = call; - rtpp->datalen = datalen; - rtpp->dataoffset = dataoffset; - rtpp->saddr = saddr; - rtpp->daddr = daddr; - rtpp->sport = sport; - rtpp->dport = dport; - rtpp->iscaller = iscaller; - rtpp->is_rtcp = is_rtcp; - rtpp->save_packet = enable_save_packet; - rtpp->packet = packet; - rtpp->istcp = istcp; - rtpp->dlt = dlt; - rtpp->sensor_id = sensor_id; - - memcpy(&rtpp->header, header, sizeof(struct pcap_pkthdr)); - memcpy(&rtpp->header_ip, (struct iphdr2*)(data - sizeof(struct iphdr2) - sizeof(udphdr2)), sizeof(struct iphdr2)); - if(datalen > MAXPACKETLENQRING) { - syslog(LOG_ERR, "error: packet is to large [%d]b for RTP QRING[%d]b", header->caplen, MAXPACKETLENQRING); - return; - } - if(opt_skiprtpdata) { - memcpy(rtpp->data, data, MIN((unsigned int)datalen, sizeof(RTPFixedHeader))); - } else { - memcpy(rtpp->data, data, datalen); - } - } - -#ifdef QUEUE_NONBLOCK2 - if(!opt_pcap_queue) { - params->vmbuffer[params->writeit % params->vmbuffermax].free = 0; - if((params->writeit + 1) == params->vmbuffermax) { - params->writeit = 0; - } else { - params->writeit++; - } - } -#endif - -#ifdef QUEUE_MUTEX - pthread_mutex_lock(&(threads[call->thread_num].qlock)); - threads[call->thread_num].pqueue.push(rtpp); - pthread_mutex_unlock(&(threads[call->thread_num].qlock)); - sem_post(&threads[call->thread_num].semaphore); -#endif - -#ifdef QUEUE_NONBLOCK - if(queue_enqueue(threads[call->thread_num].pqueue, (void*)rtpp) == 0) { - // enqueue failed, try to raise queue - if(queue_guaranteed_enqueue(threads[call->thread_num].pqueue, (void*)rtpp) == 0) { - syslog(LOG_ERR, "error: add_to_rtp_thread_queue cannot allocate memory"); - } + params->rtpp_queue->lock(); + rtp_packet_pcap_queue *rtpp_pq; + while((rtpp_pq = params->rtpp_queue->push_get_pointer()) == NULL) { + usleep(10); + } + rtpp_pq->call = call; + rtpp_pq->saddr = saddr; + rtpp_pq->daddr = daddr; + rtpp_pq->sport = sport; + rtpp_pq->dport = dport; + rtpp_pq->iscaller = iscaller; + rtpp_pq->is_rtcp = is_rtcp; + rtpp_pq->save_packet = enable_save_packet; + rtpp_pq->packet = packet; + rtpp_pq->istcp = istcp; + rtpp_pq->dlt = dlt; + rtpp_pq->sensor_id = sensor_id; + rtpp_pq->data = data; + rtpp_pq->datalen = datalen; + rtpp_pq->dataoffset = dataoffset; + rtpp_pq->header = *header; + rtpp_pq->block_store = block_store; + rtpp_pq->block_store_index =block_store_index; + params->rtpp_queue->unlock(); } -#endif #if RTP_PROF if(preSyncRtp) { @@ -1445,123 +1356,50 @@ void add_to_rtp_thread_queue(Call *call, unsigned char *data, int datalen, int d void *rtp_read_thread_func(void *arg) { - rtp_packet *rtpp = NULL; rtp_packet_pcap_queue rtpp_pq; rtp_read_thread *params = (rtp_read_thread*)arg; while(1) { -#ifdef QUEUE_MUTEX - sem_wait(¶ms->semaphore); - - pthread_mutex_lock(&(params->qlock)); - rtpp = params->pqueue.front(); - params->pqueue.pop(); - pthread_mutex_unlock(&(params->qlock)); -#endif - -#ifdef QUEUE_NONBLOCK - if(queue_dequeue(params->pqueue, (void **)&rtpp) != 1) { - // queue is empty - if(is_terminating() || readend) { - return NULL; + if(params->rtpp_queue_quick) { + if(!params->rtpp_queue_quick->pop(&rtpp_pq, true) && + is_terminating()) { + return(NULL); } - usleep(rtp_qring_usleep); - continue; - }; -#endif - -#ifdef QUEUE_NONBLOCK2 - if(opt_pcap_queue) { - if(params->rtpp_queue_quick) { - if(!params->rtpp_queue_quick->pop(&rtpp_pq, true) && - is_terminating()) { - return(NULL); - } - } else if(params->rtpp_queue_quick_boost) { - if(!params->rtpp_queue_quick_boost->pop(&rtpp_pq, true) && - is_terminating()) { - return(NULL); - } - } else { - if(!params->rtpp_queue->pop(&rtpp_pq, true)) { - if(is_terminating() || readend) { - return NULL; - } - // no packet to read, wait and try again - usleep(rtp_qring_usleep); - continue; - } + } else if(params->rtpp_queue_quick_boost) { + if(!params->rtpp_queue_quick_boost->pop(&rtpp_pq, true) && + is_terminating()) { + return(NULL); } } else { - - if(params->vmbuffer[params->readit % params->vmbuffermax].free == 1) { + if(!params->rtpp_queue->pop(&rtpp_pq, true)) { if(is_terminating() || readend) { return NULL; } // no packet to read, wait and try again usleep(rtp_qring_usleep); continue; - } else { - rtpp = &(params->vmbuffer[params->readit % params->vmbuffermax]); } } -#endif - if(opt_pcap_queue) { - if(rtpp_pq.is_rtcp) { - rtpp_pq.call->read_rtcp(rtpp_pq.data, rtpp_pq.datalen, rtpp_pq.dataoffset, &rtpp_pq.header, rtpp_pq.saddr, rtpp_pq.daddr, rtpp_pq.sport, rtpp_pq.dport, rtpp_pq.iscaller, - rtpp_pq.save_packet, rtpp_pq.packet, rtpp_pq.istcp, rtpp_pq.dlt, rtpp_pq.sensor_id); - } else { - int monitor; - rtpp_pq.call->read_rtp(rtpp_pq.data, rtpp_pq.datalen, rtpp_pq.dataoffset, &rtpp_pq.header, NULL, rtpp_pq.saddr, rtpp_pq.daddr, rtpp_pq.sport, rtpp_pq.dport, rtpp_pq.iscaller, &monitor, - rtpp_pq.save_packet, rtpp_pq.packet, rtpp_pq.istcp, rtpp_pq.dlt, rtpp_pq.sensor_id, - rtpp_pq.block_store && rtpp_pq.block_store->ifname[0] ? rtpp_pq.block_store->ifname : NULL); - } - rtpp_pq.call->set_last_packet_time(rtpp_pq.header.ts.tv_sec); - if(rtpp_pq.block_store) { - rtpp_pq.block_store->unlock_packet(rtpp_pq.block_store_index); - } - } else { - if(rtpp->is_rtcp) { - rtpp->call->read_rtcp((unsigned char*)rtpp->data, rtpp->datalen, rtpp->dataoffset, &rtpp->header, rtpp->saddr, rtpp->daddr, rtpp->sport, rtpp->dport, rtpp->iscaller, - rtpp->save_packet, rtpp->packet, rtpp->istcp, rtpp->dlt, rtpp->sensor_id); - } else { - int monitor; - rtpp->call->read_rtp(rtpp->data, rtpp->datalen, rtpp->dataoffset, &rtpp->header, &rtpp->header_ip, rtpp->saddr, rtpp->daddr, rtpp->sport, rtpp->dport, rtpp->iscaller, &monitor, - rtpp->save_packet, rtpp->packet, rtpp->istcp, rtpp->dlt, rtpp->sensor_id); - } - rtpp->call->set_last_packet_time(rtpp->header.ts.tv_sec); + if(rtpp_pq.is_rtcp) { + rtpp_pq.call->read_rtcp(rtpp_pq.data, rtpp_pq.datalen, rtpp_pq.dataoffset, &rtpp_pq.header, rtpp_pq.saddr, rtpp_pq.daddr, rtpp_pq.sport, rtpp_pq.dport, rtpp_pq.iscaller, + rtpp_pq.save_packet, rtpp_pq.packet, rtpp_pq.istcp, rtpp_pq.dlt, rtpp_pq.sensor_id); + } else { + int monitor; + rtpp_pq.call->read_rtp(rtpp_pq.data, rtpp_pq.datalen, rtpp_pq.dataoffset, &rtpp_pq.header, NULL, rtpp_pq.saddr, rtpp_pq.daddr, rtpp_pq.sport, rtpp_pq.dport, rtpp_pq.iscaller, &monitor, + rtpp_pq.save_packet, rtpp_pq.packet, rtpp_pq.istcp, rtpp_pq.dlt, rtpp_pq.sensor_id, + rtpp_pq.block_store && rtpp_pq.block_store->ifname[0] ? rtpp_pq.block_store->ifname : NULL); } - -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - delete [] rtpp->data; - delete rtpp; -#endif - -#ifdef QUEUE_NONBLOCK2 - if(!opt_pcap_queue) { - params->vmbuffer[params->readit % params->vmbuffermax].free = 1; - if((params->readit + 1) == params->vmbuffermax) { - params->readit = 0; - } else { - params->readit++; - } + rtpp_pq.call->set_last_packet_time(rtpp_pq.header.ts.tv_sec); + if(rtpp_pq.block_store) { + rtpp_pq.block_store->unlock_packet(rtpp_pq.block_store_index); } -#endif - if(opt_pcap_queue) { - #if SYNC_CALL_RTP - __sync_sub_and_fetch(&rtpp_pq.call->rtppcaketsinqueue, 1); - #else - ++rtpp_pq.call->rtppcaketsinqueue_m; - #endif - } else { - #if SYNC_CALL_RTP - __sync_sub_and_fetch(&rtpp->call->rtppcaketsinqueue, 1); - #else - ++rtpp->call->rtppcaketsinqueue_m; - #endif - } + #if SYNC_CALL_RTP + __sync_sub_and_fetch(&rtpp_pq.call->rtppcaketsinqueue, 1); + #else + ++rtpp_pq.call->rtppcaketsinqueue_m; + #endif } @@ -2967,7 +2805,8 @@ Call *process_packet(bool is_ssl, u_int64_t packet_number, sendCallInfoEvCall(call, sSciInfo::sci_18X, header->ts); call->onCall_18X = true; } - call->destroy_call_at = header->ts.tv_sec + absolute_timeout; + call->destroy_call_at = 0; + call->destroy_call_at_bye = 0; } // if the call ends with some of SIP [456]XX response code, we can shorten timeout when the call will be closed @@ -3694,13 +3533,8 @@ void process_packet__parse_custom_headers(Call *call, char *data, int datalen) { } void process_packet__cleanup(pcap_pkthdr *header, pcap_t *handle) { - static int pcapstatres = 0; - static unsigned int lostpacket = 0; - static unsigned int lostpacketif = 0; - - //if(verbosity > 0) syslog(LOG_NOTICE, "Active calls [%d] calls in sql queue [%d] calls in delete queue [%d]\n", (int)calltable->calls_listMAP.size(), (int)calltable->calls_queue.size(), (int)calltable->calls_deletequeue.size()); - if(verbosity > 0 && !opt_pcap_queue) { + if(verbosity > 0 && is_read_from_file_simple()) { if(opt_dup_check) { syslog(LOG_NOTICE, "Active calls [%d] calls in sql queue [%d] skipped dupe pkts [%u]\n", (int)calltable->calls_listMAP.size(), (int)calltable->calls_queue.size(), duplicate_counter); @@ -3714,22 +3548,6 @@ void process_packet__cleanup(pcap_pkthdr *header, pcap_t *handle) { calltable->cleanup(header->ts.tv_sec); } - /* also do every 10 seconds pcap statistics */ - if(!opt_pcap_queue) { - pcap_drop_flag = 0; - pcapstatres = pcap_stats(handle, &pcapstat); - if (pcapstatres == 0 && (lostpacket < pcapstat.ps_drop || lostpacketif < pcapstat.ps_ifdrop)) { - if(pcapstatresCount) { - syslog(LOG_ERR, "warning: libpcap or interface dropped packets! rx:%u pcapdrop:%u ifdrop:%u increase --ring-buffer (kernel >= 2.6.31 and libpcap >= 1.0.0)\n", pcapstat.ps_recv, pcapstat.ps_drop, pcapstat.ps_ifdrop); - } else { - // do not show first error, it is normal on startup. - pcapstatresCount++; - } - lostpacket = pcapstat.ps_drop; - lostpacketif = pcapstat.ps_ifdrop; - pcap_drop_flag = 1; - } - } process_packet__last_cleanup = header->ts.tv_sec; if(!(preProcessPacket && opt_enable_preprocess_packet == 2)) { @@ -4288,192 +4106,6 @@ void readdump_libnids(pcap_t *handle) { } #endif -void *pcap_read_thread_func(void *arg) { - pcap_packet *pp; - struct iphdr2 *header_ip; - struct udphdr2 *header_udp; - struct udphdr2 header_udp_tmp; - struct tcphdr2 *header_tcp; - char *data; - int datalen; - int istcp = 0; - int was_rtp; - unsigned int packets = 0; - bool useTcpReassemblyHttp; - bool useTcpReassemblyWebrtc; - bool useTcpReassemblySsl; - u_int64_t packet_counter = 0; - -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - int res = 0; -#endif - - while(1) { - - ++packet_counter; - -#ifdef QUEUE_MUTEX - int res = sem_wait(&readpacket_thread_semaphore); - if(res != 0) { - printf("Error pcap_read_thread_func sem_wait returns != 0\n"); - } - - pthread_mutex_lock(&readpacket_thread_queue_lock); - pp = readpacket_thread_queue.front(); - readpacket_thread_queue.pop(); - pthread_mutex_unlock(&readpacket_thread_queue_lock); -#endif - -#ifdef QUEUE_NONBLOCK - if((res = queue_dequeue(qs_readpacket_thread_queue, (void **)&pp)) != 1) { - // queue is empty - if(is_terminating() || readend) { - //printf("packets: [%u]\n", packets); - return NULL; - } - usleep(qringusleep); - continue; - }; -#endif - -#ifdef QUEUE_NONBLOCK2 - if(pcap_qring[pcap_readit % pcap_qring_max].free == 1) { - // no packet to read - if(is_terminating() || readend) { - //printf("packets: [%u]\n", packets); - return NULL; - } - usleep(pcap_qring_usleep); - continue; - } else { - pp = &(pcap_qring[pcap_readit % pcap_qring_max]); - } -#endif - packets++; - - int destroypp = 0; - u_char *packet = pp->packet2 ? pp->packet2 : pp->packet; - if(pp->packet2) { - destroypp = 1; - } - - header_ip = (struct iphdr2 *) ((char*)packet + pp->offset); - - bool nextPass; - do { - nextPass = false; - if(header_ip->protocol == IPPROTO_IPIP) { - // ip in ip protocol - header_ip = (struct iphdr2 *) ((char*)header_ip + sizeof(iphdr2)); - } else if(header_ip->protocol == IPPROTO_GRE) { - // gre protocol - header_ip = convertHeaderIP_GRE(header_ip); - if(header_ip) { - nextPass = true; - } - } - } while(nextPass); - if(!header_ip) { - continue; - } - - header_udp = &header_udp_tmp; - useTcpReassemblyHttp = false; - useTcpReassemblyWebrtc = false; - useTcpReassemblySsl = false; - if (header_ip->protocol == IPPROTO_UDP) { - // prepare packet pointers - header_udp = (struct udphdr2 *) ((char *) header_ip + sizeof(*header_ip)); - data = (char *) header_udp + sizeof(*header_udp); - datalen = (int)(pp->header.caplen - ((char*)data - (char*)packet)); - istcp = 0; - } else if (header_ip->protocol == IPPROTO_TCP) { - header_tcp = (struct tcphdr2 *) ((char *) header_ip + sizeof(*header_ip)); - // dokončit nezbytné paměťové operace pro udržení obsahu paketu !!!! - // zatím reassemblování v módu bez pb zakázáno - /* - if(opt_enable_http && (httpportmatrix[htons(header_tcp->source)] || httpportmatrix[htons(header_tcp->dest)])) { - tcpReassembly->push(&pp->header, header_ip, packet); - useTcpReassemblyHttp = true; - } else if(opt_enable_webrtc && (webrtcportmatrix[htons(header_tcp->source)] || webrtcportmatrix[htons(header_tcp->dest)])) { - tcpReassemblyWebrtc->push(&pp->header, header_ip, packet); - useTcpReassemblyWebrtc = true; - } els if(opt_enable_ssl && - (isSslIpPort(htonl(header_ip->saddr), htons(header_tcp->source)) || - isSslIpPort(htonl(header_ip->daddr), htons(header_tcp->dest)))) { - tcpReassemblySsl->push(&pp->header, header_ip, packet); - useTcpReassemblySsl = true; - } else*/{ - istcp = 1; - // prepare packet pointers - data = (char *) header_tcp + (header_tcp->doff * 4); - datalen = (int)(pp->header.caplen - ((char*)data - (char*)packet)); - header_udp->source = header_tcp->source; - header_udp->dest = header_tcp->dest; - } - } else { - //packet is not UDP and is not TCP, we are not interested, go to the next packet - // - interested only for ipaccount - if(opt_ipaccount) { - ipaccount(pp->header.ts.tv_sec, (struct iphdr2 *) ((char*)(packet) + pp->offset), pp->header.len - pp->offset, false); - } -#ifdef QUEUE_NONBLOCK2 - if(destroypp) { - delete [] pp->packet2; - pp->packet2 = NULL; - } - pcap_qring[pcap_readit % pcap_qring_max].free = 1; - if((pcap_readit + 1) == pcap_qring_max) { - pcap_readit = 0; - } else { - pcap_readit++; - } -#endif - continue; - } - - if(opt_mirrorip && (sipportmatrix[htons(header_udp->source)] || sipportmatrix[htons(header_udp->dest)])) { - mirrorip->send((char *)header_ip, (int)(pp->header.caplen - ((char*)header_ip - (char*)packet))); - } - int voippacket = 0; - if(!useTcpReassemblyHttp && !useTcpReassemblyWebrtc && !useTcpReassemblySsl && - opt_enable_http < 2 && opt_enable_webrtc < 2 && opt_enable_ssl < 2) { - process_packet(false, packet_counter, - header_ip->saddr, htons(header_udp->source), header_ip->daddr, htons(header_udp->dest), - data, datalen, data - (char*)packet, - global_pcap_handle, &pp->header, packet, - istcp, &was_rtp, header_ip, &voippacket, 0, - NULL, 0, global_pcap_dlink, opt_id_sensor); - } - - // if packet was VoIP add it to ipaccount - if(opt_ipaccount) { - ipaccount(pp->header.ts.tv_sec, (struct iphdr2 *) ((char*)(packet) + pp->offset), pp->header.len - pp->offset, voippacket); - } - -#ifdef QUEUE_NONBLOCK2 - if(destroypp) { - delete [] pp->packet2; - pp->packet2 = NULL; - } - pcap_qring[pcap_readit % pcap_qring_max].free = 1; - if((pcap_readit + 1) == pcap_qring_max) { - pcap_readit = 0; - } else { - pcap_readit++; - } -#endif - -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - delete [] pp->packet; - delete pp; -#endif - } - //printf("packets: [%u]\n", packets); - - return NULL; -} - /* defragment packets from queue and allocates memory for new header and packet which is returned @@ -4775,64 +4407,6 @@ void readdump_libpcap(pcap_t *handle) { continue; } - if(opt_pcap_threaded) { - //add packet to queue -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - pcap_packet *pp = new FILE_LINE pcap_packet; - pp->packet = new FILE_LINE u_char[header->caplen]; - pp->offset = ppd.header_ip_offset; - memcpy(&pp->header, header, sizeof(struct pcap_pkthdr)); - memcpy(pp->packet, packet, header->caplen); -#endif - -#ifdef QUEUE_NONBLOCK2 - while(pcap_qring[pcap_writeit % pcap_qring_max].free == 0) { - // no room left, loop until there is room - usleep(100); - } - if(header->caplen > MAXPACKETLENQRING) { - //allocate special structure - //syslog(LOG_ERR, "error: packet is to large [%d]b for QRING[%d]b", header->caplen, MAXPACKETLENQRING); - pcap_qring[pcap_writeit % pcap_qring_max].packet2 = new FILE_LINE u_char[header->caplen]; - memcpy(pcap_qring[pcap_writeit % pcap_qring_max].packet2, packet, header->caplen); - } else { - pcap_qring[pcap_writeit % pcap_qring_max].packet2 = NULL; - memcpy(&pcap_qring[pcap_writeit % pcap_qring_max].packet, packet, header->caplen); - } - memcpy(&pcap_qring[pcap_writeit % pcap_qring_max].header, header, sizeof(struct pcap_pkthdr)); - pcap_qring[pcap_writeit % pcap_qring_max].offset = ppd.header_ip_offset; - pcap_qring[pcap_writeit % pcap_qring_max].free = 0; - if((pcap_writeit + 1) == pcap_qring_max) { - pcap_writeit = 0; - } else { - pcap_writeit++; - } -#endif - - if(header->caplen > header->caplen) { - syslog(LOG_ERR, "error: header->caplen > header->caplen FIX!"); - } - -#ifdef QUEUE_MUTEX - pthread_mutex_lock(&readpacket_thread_queue_lock); - readpacket_thread_queue.push(pp); - pthread_mutex_unlock(&readpacket_thread_queue_lock); -#endif - -#ifdef QUEUE_NONBLOCK - if(queue_enqueue(qs_readpacket_thread_queue, (void*)pp) == 0) { - // enqueue failed, try to raise queue - if(queue_guaranteed_enqueue(qs_readpacket_thread_queue, (void*)pp) == 0) { - syslog(LOG_ERR, "error: readpacket_queue cannot allocate memory"); - } - } -#endif - - //sem_post(&readpacket_thread_semaphore); - if(destroy) { delete header; delete [] packet;}; - continue; - } - if(opt_mirrorall || (opt_mirrorip && (sipportmatrix[htons(ppd.header_udp->source)] || sipportmatrix[htons(ppd.header_udp->dest)]))) { mirrorip->send((char *)ppd.header_ip, (int)(header->caplen - ((unsigned long) ppd.header_ip - (unsigned long) packet))); } diff --git a/sniff.h b/sniff.h index e1ad61248..87bdfb3c4 100644 --- a/sniff.h +++ b/sniff.h @@ -23,12 +23,6 @@ #define MAXPACKETLENQRING 1600 #define RTP_FIXED_HEADERLEN 12 -#ifdef QUEUE_NONBLOCK -extern "C" { -#include "liblfds.6/inc/liblfds.h" -} -#endif - #define IP_DF 0x4000 /* Flag: "Don't Fragment" */ #define IP_MF 0x2000 /* Flag: "More Fragments" */ #define IP_OFFSET 0x1FFF /* "Fragment Offset" part */ @@ -60,7 +54,6 @@ struct iphdr2 { #endif void *rtp_read_thread_func(void *arg); -void *pcap_read_thread_func(void *arg); void readdump_libnids(pcap_t *handle); void readdump_libpcap(pcap_t *handle); @@ -92,12 +85,7 @@ struct udphdr2 { typedef struct { Call *call; -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) - unsigned char *data; -#endif -#ifdef QUEUE_NONBLOCK2 unsigned char data[MAXPACKETLENQRING]; -#endif int datalen; int dataoffset; u_int32_t saddr; @@ -137,55 +125,18 @@ typedef struct { int block_store_index; } rtp_packet_pcap_queue; -#ifdef QUEUE_NONBLOCK2 -extern int opt_pcap_queue; -#endif - struct rtp_read_thread { rtp_read_thread() { - #ifdef QUEUE_NONBLOCK - this->pqueue = NULL; - #endif - #ifdef QUEUE_NONBLOCK2 - this->vmbuffer = NULL; - this->vmbuffermax = 0; - this->readit = 0; - this->writeit = 0; - this->rtpp_queue = NULL; - this->rtpp_queue_quick = NULL; - this->rtpp_queue_quick_boost = NULL; - #endif + this->rtpp_queue = NULL; + this->rtpp_queue_quick = NULL; + this->rtpp_queue_quick_boost = NULL; } pthread_t thread; // ID of worker storing CDR thread -#ifdef QUEUE_MUTEX - queue pqueue; - pthread_mutex_t qlock; - sem_t semaphore; -#endif -#ifdef QUEUE_NONBLOCK - struct queue_state *pqueue; -#endif -#ifdef QUEUE_NONBLOCK2 - rtp_packet *vmbuffer; - int vmbuffermax; - volatile int readit; - volatile int writeit; rqueue *rtpp_queue; rqueue_quick *rtpp_queue_quick; rqueue_quick_boost *rtpp_queue_quick_boost; -#endif }; -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) -typedef struct { - struct pcap_pkthdr header; - u_char *packet; - int offset; -} pcap_packet; -#endif - - -#if defined(QUEUE_NONBLOCK2) typedef struct { struct pcap_pkthdr header; u_char packet[MAXPACKETLENQRING]; @@ -193,7 +144,6 @@ typedef struct { int offset; volatile char free; } pcap_packet; -#endif #define MAXLIVEFILTERS 10 #define MAXLIVEFILTERSCHARS 32 diff --git a/sql_db.cpp b/sql_db.cpp index 510b3ced8..9e9f5220a 100644 --- a/sql_db.cpp +++ b/sql_db.cpp @@ -831,11 +831,11 @@ bool SqlDb_mysql::createRoutine(string routine, string routineName, string routi bool rslt = this->query(string("create ") + (routineType == procedure ? "PROCEDURE" : "FUNCTION") + " " + routineName + routineParamsAndReturn + " " + routine); if(!rslt && abortIfFailed) { - string abortString = + string errorString = string("create routine ") + routineName + " failed\n" + "tip: SET GLOBAL log_bin_trust_function_creators = 1 or put it in my.cnf configuration or grant SUPER privileges to your voipmonitor mysql user."; - syslog(LOG_ERR, abortString.c_str()); - abort(); + syslog(LOG_ERR, errorString.c_str()); + vm_terminate_error(errorString.c_str()); } return(rslt); } else { @@ -2752,7 +2752,7 @@ string prepareQueryForPrintf(string &query) { } -void SqlDb_mysql::createSchema(SqlDb *sourceDb) { +bool SqlDb_mysql::createSchema(SqlDb *sourceDb) { const char *cdrMainTables[] = { "cdr", @@ -2775,6 +2775,7 @@ void SqlDb_mysql::createSchema(SqlDb *sourceDb) { sql_disable_next_attempt_if_error = 1; this->multi_off(); + /* obsolete #if 1 this->query( "CREATE TABLE IF NOT EXISTS `sensor_conf` (\ @@ -2883,7 +2884,21 @@ void SqlDb_mysql::createSchema(SqlDb *sourceDb) { PRIMARY KEY (`id`)\ ) ENGINE=InnoDB DEFAULT CHARSET=latin1;"); #endif - + */ + + bool okTableFilterIp = false; + this->query("show tables like 'filter_ip'"); + if(this->fetchRow()) { + if(this->query("select * from filter_ip")) { + okTableFilterIp = true; + } else { + if(this->getLastError() == ER_NO_DB_ERROR || + this->getLastError() == ER_DBACCESS_DENIED_ERROR) { + return(false); + } + } + } + this->query( "CREATE TABLE IF NOT EXISTS `filter_ip` (\ `id` int NOT NULL AUTO_INCREMENT,\ @@ -2903,6 +2918,10 @@ void SqlDb_mysql::createSchema(SqlDb *sourceDb) { `remove_at` date default NULL,\ PRIMARY KEY (`id`)\ ) ENGINE=InnoDB DEFAULT CHARSET=latin1;"); + + if(!okTableFilterIp && this->getLastError()) { + return(false); + } this->query( "CREATE TABLE IF NOT EXISTS `filter_telnum` (\ @@ -4462,6 +4481,8 @@ void SqlDb_mysql::createSchema(SqlDb *sourceDb) { sql_disable_next_attempt_if_error = 0; syslog(LOG_DEBUG, "done"); + + return(true); } void SqlDb_mysql::checkDbMode() { @@ -4865,7 +4886,7 @@ vector SqlDb_mysql::getSourceTables(int typeTables) { } -void SqlDb_odbc::createSchema(SqlDb *sourceDb) { +bool SqlDb_odbc::createSchema(SqlDb *sourceDb) { this->query( "IF NOT EXISTS (SELECT * FROM sys.objects WHERE name = 'filter_ip') BEGIN\ @@ -5562,6 +5583,8 @@ void SqlDb_odbc::createSchema(SqlDb *sourceDb) { BEGIN\ RETURN DATEADD(SECOND, -DATEDIFF(second, 0, @time2), @time1)\ END"); + + return(true); } void SqlDb_odbc::createTable(const char *tableName) { diff --git a/sql_db.h b/sql_db.h index 26008c016..b77eea249 100644 --- a/sql_db.h +++ b/sql_db.h @@ -138,7 +138,7 @@ class SqlDb { } virtual void cleanFields(); virtual void clean() = 0; - virtual void createSchema(SqlDb *sourceDb = NULL) = 0; + virtual bool createSchema(SqlDb *sourceDb = NULL) = 0; virtual void createTable(const char *tableName) = 0; virtual void checkDbMode() = 0; virtual void checkSchema() = 0; @@ -229,7 +229,7 @@ class SqlDb_mysql : public SqlDb { } bool checkLastError(string prefixError, bool sysLog = false,bool clearLastError = false); void clean(); - void createSchema(SqlDb *sourceDb = NULL); + bool createSchema(SqlDb *sourceDb = NULL); void createTable(const char *tableName); void checkDbMode(); void checkSchema(); @@ -315,7 +315,7 @@ class SqlDb_odbc : public SqlDb { bool checkLastError(string prefixError, bool sysLog = false,bool clearLastError = false); void cleanFields(); void clean(); - void createSchema(SqlDb *sourceDb = NULL); + bool createSchema(SqlDb *sourceDb = NULL); void createTable(const char *tableName); void checkDbMode(); void checkSchema(); diff --git a/voipmonitor.cpp b/voipmonitor.cpp index d9981f747..adc16c5e6 100644 --- a/voipmonitor.cpp +++ b/voipmonitor.cpp @@ -74,7 +74,6 @@ #include "ip_frag.h" #include "cleanspool.h" #include "regcache.h" -#include "config_mysql.h" #include "fraud.h" #include "rrd.h" #include "heap_safe.h" @@ -84,12 +83,6 @@ #include "send_call_info.h" #include "config_param.h" -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) -extern "C" { -#include "liblfds.6/inc/liblfds.h" -} -#endif - #ifndef FREEBSD #define BACKTRACE 1 #endif @@ -217,7 +210,6 @@ int opt_audio_format = FORMAT_WAV; // define format for audio writing (if -W opt int opt_manager_port = 5029; // manager api TCP port char opt_manager_ip[32] = "127.0.0.1"; // manager api listen IP address int opt_manager_nonblock_mode = 0; -int opt_pcap_threaded = 0; // run reading packets from pcap in one thread and process packets in another thread via queue int opt_rtpsave_threaded = 1; int opt_norecord_header = 0; // if = 1 SIP call with X-VoipMonitor-norecord header will be not saved although global configuration says to record. int opt_rtpnosip = 0; // if = 1 RTP stream will be saved into calls regardless on SIP signalizatoin (handy if you need extract RTP without SIP) @@ -385,7 +377,6 @@ char opt_php_path[1024]; struct pcap_stat pcapstat; -extern int opt_pcap_queue; extern u_int opt_pcap_queue_block_max_time_ms; extern size_t opt_pcap_queue_block_max_size; extern u_int opt_pcap_queue_file_store_max_time_ms; @@ -556,7 +547,6 @@ volatile int sipheaderfilter_reload_do = 0; // for reload in main thread pthread_t storing_cdr_thread; // ID of worker storing CDR thread pthread_t scanpcapdir_thread; //pthread_t destroy_calls_thread; -pthread_t readdump_libpcap_thread; pthread_t manager_thread = 0; // ID of worker manager thread pthread_t manager_client_thread; // ID of worker manager thread pthread_t manager_ssh_thread; @@ -588,11 +578,6 @@ int opt_sdp_reverse_ipport = 0; volatile unsigned int pcap_readit = 0; volatile unsigned int pcap_writeit = 0; int global_livesniffer = 0; -unsigned int pcap_qring_max = 12500; -unsigned int pcap_qring_usleep = 10000; -#if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) || defined(QUEUE_NONBLOCK2) -pcap_packet *pcap_qring; -#endif pcap_t *global_pcap_handle = NULL; // pcap handler pcap_t *global_pcap_handle_dead_EN10MB = NULL; @@ -605,14 +590,6 @@ pthread_mutex_t mysqlconnect_lock; pthread_mutex_t vm_rrd_lock; pthread_t pcap_read_thread; -#ifdef QUEUE_MUTEX -pthread_mutex_t readpacket_thread_queue_lock; -sem_t readpacket_thread_semaphore; -#endif - -#ifdef QUEUE_NONBLOCK -struct queue_state *qs_readpacket_thread_queue = NULL; -#endif nat_aliases_t nat_aliases; // net_aliases[local_ip] = extern_ip @@ -1170,10 +1147,7 @@ void *storing_cdr( void *dummy ) { firstIter = false; if(opt_autocleanspool && - isSqlDriver("mysql") && - !(opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + is_enable_cleanspool()) { time_t actTime = time(NULL); if(!checkDiskFreeAt) { checkDiskFreeAt = actTime; @@ -1183,32 +1157,13 @@ void *storing_cdr( void *dummy ) { } } - if(request_iptelnum_reload == 1) { reload_capture_rules(); request_iptelnum_reload = 0;}; + if(request_iptelnum_reload == 1) { reload_capture_rules(); request_iptelnum_reload = 0;} - if(verbosity > 0 && !opt_pcap_queue) { + if(verbosity > 0 && is_read_from_file_simple()) { ostringstream outStr; outStr << "calls[" << calls_counter << "]"; -#ifdef HAVE_LIBGNUTLS - extern string getSslStat(); - string sslStat = getSslStat(); - if(!sslStat.empty()) { - outStr << sslStat; - } -#endif - if(opt_ipaccount) { - outStr << " ipacc_buffer[" << lengthIpaccBuffer() << "]"; - } - #ifdef QUEUE_NONBLOCK2 - if(!opt_pcap_queue) { - outStr << " qring[" << (pcap_writeit >= pcap_readit ? pcap_writeit - pcap_readit : pcap_writeit + pcap_qring_max - pcap_readit) - << " (w" << pcap_writeit << ",r" << pcap_readit << ")]"; - } - #endif - syslog(LOG_NOTICE, outStr.str().c_str()); } - if(request_iptelnum_reload == 1) { reload_capture_rules(); request_iptelnum_reload = 0;}; - size_t calls_queue_size = 0; for(int pass = 0; pass < 10; pass++) { @@ -1285,7 +1240,7 @@ void *storing_cdr( void *dummy ) { } calltable->unlock_calls_queue(); } - if(verbosity || !opt_read_from_file) { + if(verbosity && !opt_nocdr) { syslog(LOG_NOTICE, "terminated - storing cdr / message / register"); } return NULL; @@ -1469,8 +1424,6 @@ void reload_config(const char *jsonConfig) { if(!opt_nocdr && isSqlDriver("mysql") && opt_mysqlloadconfig) { if(useNewCONFIG) { CONFIG.setFromMysql(); - } else { - config_load_mysql(); } } if(useNewCONFIG && jsonConfig) { @@ -1648,13 +1601,16 @@ void resetTerminating() { } -void *readdump_libpcap_thread_fce(void *handle); void test(); PcapQueue_readFromFifo *pcapQueueR; PcapQueue_readFromInterface *pcapQueueI; PcapQueue_readFromFifo *pcapQueueQ; +int main_init_read(); +void main_term_read(); +void main_init_sqlstore(); + int main(int argc, char *argv[]) { extern unsigned int HeapSafeCheck; bool memoryStatInArg = false; @@ -1743,8 +1699,6 @@ int main(int argc, char *argv[]) { pthread_mutex_init(&terminate_packetbuffer_lock, NULL); // if the system has more than one CPU enable threading - opt_pcap_threaded = sysconf( _SC_NPROCESSORS_ONLN ) > 1; - opt_pcap_threaded = 1; // TODO: this must be enabled for now. num_threads = sysconf( _SC_NPROCESSORS_ONLN ) - 1; if(num_threads <= 0) num_threads = 1; set_mac(); @@ -1773,8 +1727,6 @@ int main(int argc, char *argv[]) { isSqlDriver("mysql") && opt_mysqlloadconfig) { if(useNewCONFIG) { CONFIG.setFromMysql(true); - } else { - config_load_mysql(true); } } get_command_line_arguments(); @@ -1845,11 +1797,7 @@ int main(int argc, char *argv[]) { thread_setup(); // end init - if(opt_ipaccount && !opt_test) { - initIpacc(); - } - - if(opt_rrd && opt_read_from_file) { + if(opt_rrd && is_read_from_file()) { //disable update of rrd statistics when reading packets from file opt_rrd = 0; } @@ -1940,7 +1888,7 @@ int main(int argc, char *argv[]) { } } - if(opt_fork && !opt_read_from_file && reloadLoopCounter == 0) { + if(opt_fork && !is_read_from_file() && reloadLoopCounter == 0) { #if ENABLE_SEMAPHOR_FORK_MODE for(int pass = 0; pass < 2; pass ++) { globalSemaphore = sem_open(SEMAPHOR_FORK_MODE_NAME().c_str(), O_CREAT | O_EXCL); @@ -2037,6 +1985,8 @@ int main(int argc, char *argv[]) { return 1; } #endif + + daemonize(); } if(opt_generator) { @@ -2050,19 +2000,28 @@ int main(int argc, char *argv[]) { return 0; } + // start manager thread + if(opt_manager_port > 0) { + pthread_create(&manager_thread, NULL, manager_server, NULL); + // start reversed manager thread + if(opt_clientmanager[0] != '\0') { + pthread_create(&manager_client_thread, NULL, manager_client, NULL); + } + }; + cout << "SQL DRIVER: " << sql_driver << endl; if(!opt_nocdr && - !(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + !is_sender()) { + bool connectError = false; SqlDb *sqlDb = createSqlObject(); + bool rsltConnect = false; for(int pass = 0; pass < 2; pass++) { - if(sqlDb->connect(true, true)) { + if((rsltConnect = sqlDb->connect(true, true))) { break; } sleep(1); } - if(sqlDb->connected()) { + if(rsltConnect && sqlDb->connected()) { if(isSqlDriver("mysql")) { sql_noerror = 1; sqlDb->query("repair table mysql.proc"); @@ -2070,284 +2029,194 @@ int main(int argc, char *argv[]) { } sqlDb->checkDbMode(); if(!opt_database_backup) { - sqlDb->createSchema(); - sqlDb->checkSchema(); + if(sqlDb->createSchema()) { + sqlDb->checkSchema(); + } else { + connectError = true; + } } sensorsMap.fillSensors(); } else { - syslog(LOG_ERR, "Can't connect to MySQL server - exit!"); - return 1; + connectError = true; + } + if(connectError) { + string error = sqlDb->getLastErrorString(); + if(useNewCONFIG && !is_read_from_file()) { + vm_terminate_error(error.c_str()); + } else { + syslog(LOG_ERR, (error + " - exit!").c_str()); + return 1; + } } delete sqlDb; } - if(opt_database_backup) { - if (opt_fork) { - daemonize(); - } - sqlStore = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database, cloud_host, cloud_token); - custom_headers_cdr = new CustomHeaders(CustomHeaders::cdr); - custom_headers_message = new CustomHeaders(CustomHeaders::message); - pthread_create(&database_backup_thread, NULL, database_backup, NULL); - pthread_join(database_backup_thread, NULL); - return(0); - // reload loop not supported - } + if(!is_terminating()) { - if(isSqlDriver("mysql")) { - if(opt_load_query_from_files != 2) { - sqlStore = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database, cloud_host, cloud_token); - if(opt_save_query_to_files) { - sqlStore->queryToFiles(opt_save_query_to_files, opt_save_query_to_files_directory, opt_save_query_to_files_period); + if(opt_test) { + ipfilter = new FILE_LINE IPfilter; + telnumfilter = new FILE_LINE TELNUMfilter; + domainfilter = new FILE_LINE DOMAINfilter; + sipheaderfilter = new FILE_LINE SIP_HEADERfilter; + _parse_packet_global.setStdParse(); + test(); + if(sqlStore) { + delete sqlStore; } + return(0); } - if(opt_load_query_from_files) { - loadFromQFiles = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database); - loadFromQFiles->loadFromQFiles(opt_load_query_from_files, opt_load_query_from_files_directory, opt_load_query_from_files_period); - } - if(opt_load_query_from_files != 2) { - if(!opt_nocdr) { - sqlStore->connect(STORE_PROC_ID_CDR_1); - sqlStore->connect(STORE_PROC_ID_MESSAGE_1); - } - if(opt_mysqlstore_concat_limit) { - sqlStore->setDefaultConcatLimit(opt_mysqlstore_concat_limit); + + if(!opt_database_backup && opt_load_query_from_files != 2) { + main_init_sqlstore(); + int rslt_main_init_read = main_init_read(); + if(rslt_main_init_read) { + return(rslt_main_init_read); } - for(int i = 0; i < opt_mysqlstore_max_threads_cdr; i++) { - if(opt_mysqlstore_concat_limit_cdr) { - sqlStore->setConcatLimit(STORE_PROC_ID_CDR_1 + i, opt_mysqlstore_concat_limit_cdr); - } - if(i) { - sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_CDR_1 + i); - } - if(opt_mysql_enable_transactions_cdr) { - sqlStore->setEnableTransaction(STORE_PROC_ID_CDR_1 + i); - } - if(opt_cdr_check_duplicity_callid_in_next_pass_insert) { - sqlStore->setEnableFixDeadlock(STORE_PROC_ID_CDR_1 + i); + main_term_read(); + } else { + if(opt_database_backup) { + sqlStore = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database, cloud_host, cloud_token); + custom_headers_cdr = new CustomHeaders(CustomHeaders::cdr); + custom_headers_message = new CustomHeaders(CustomHeaders::message); + pthread_create(&database_backup_thread, NULL, database_backup, NULL); + pthread_join(database_backup_thread, NULL); + } else if(opt_load_query_from_files == 2) { + main_init_sqlstore(); + loadFromQFiles->loadFromQFiles_start(); + unsigned int counter; + while(!is_terminating()) { + sleep(1); + if(!(++counter % 10) && verbosity) { + string stat = loadFromQFiles->getLoadFromQFilesStat(); + syslog(LOG_NOTICE, "SQLf: [%s]", stat.c_str()); + } } } - for(int i = 0; i < opt_mysqlstore_max_threads_message; i++) { - if(opt_mysqlstore_concat_limit_message) { - sqlStore->setConcatLimit(STORE_PROC_ID_MESSAGE_1 + i, opt_mysqlstore_concat_limit_message); - } - if(i) { - sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_MESSAGE_1 + i); - } - if(opt_mysql_enable_transactions_message) { - sqlStore->setEnableTransaction(STORE_PROC_ID_MESSAGE_1 + i); - } - if(opt_message_check_duplicity_callid_in_next_pass_insert) { - sqlStore->setEnableFixDeadlock(STORE_PROC_ID_MESSAGE_1 + i); - } + if(sqlStore) { + delete sqlStore; + sqlStore = NULL; } - for(int i = 0; i < opt_mysqlstore_max_threads_register; i++) { - if(opt_mysqlstore_concat_limit_register) { - sqlStore->setConcatLimit(STORE_PROC_ID_REGISTER_1 + i, opt_mysqlstore_concat_limit_register); - } - if(i) { - sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_REGISTER_1 + i); - } - if(opt_mysql_enable_transactions_register) { - sqlStore->setEnableTransaction(STORE_PROC_ID_REGISTER_1 + i); - } + if(loadFromQFiles) { + delete loadFromQFiles; + loadFromQFiles = NULL; } - for(int i = 0; i < opt_mysqlstore_max_threads_http; i++) { - if(opt_mysqlstore_concat_limit_http) { - sqlStore->setConcatLimit(STORE_PROC_ID_HTTP_1 + i, opt_mysqlstore_concat_limit_http); - } - if(i) { - sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_HTTP_1 + i); - } - if(opt_mysql_enable_transactions_http) { - sqlStore->setEnableTransaction(STORE_PROC_ID_HTTP_1 + i); - } + if(custom_headers_cdr) { + delete custom_headers_cdr; + custom_headers_cdr = NULL; } - for(int i = 0; i < opt_mysqlstore_max_threads_webrtc; i++) { - if(opt_mysqlstore_concat_limit_webrtc) { - sqlStore->setConcatLimit(STORE_PROC_ID_WEBRTC_1 + i, opt_mysqlstore_concat_limit_webrtc); - } - if(i) { - sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_WEBRTC_1 + i); - } - if(opt_mysql_enable_transactions_webrtc) { - sqlStore->setEnableTransaction(STORE_PROC_ID_WEBRTC_1 + i); - } + if(custom_headers_message) { + delete custom_headers_message; + custom_headers_message = NULL; } - if(opt_mysqlstore_concat_limit_ipacc) { - for(int i = 0; i < opt_mysqlstore_max_threads_ipacc_base; i++) { - sqlStore->setConcatLimit(STORE_PROC_ID_IPACC_1 + i, opt_mysqlstore_concat_limit_ipacc); - } - for(int i = STORE_PROC_ID_IPACC_AGR_INTERVAL; i <= STORE_PROC_ID_IPACC_AGR_DAY; i++) { - sqlStore->setConcatLimit(i, opt_mysqlstore_concat_limit_ipacc); - } - for(int i = 0; i < opt_mysqlstore_max_threads_ipacc_agreg2; i++) { - sqlStore->setConcatLimit(STORE_PROC_ID_IPACC_AGR2_HOUR_1 + i, opt_mysqlstore_concat_limit_ipacc); + } + + } + + bool _break = false; + + if(useNewCONFIG && !is_read_from_file()) { + string _terminating_error = terminating_error; + if(!hot_restarting && _terminating_error.empty()) { + _break = true; + } + if(!_terminating_error.empty()) { + clear_terminating(); + while(!is_terminating()) { + syslog(LOG_NOTICE, "%s - wait for terminating or hot restarting", _terminating_error.c_str()); + for(int i = 0; i < 10 && !is_terminating(); i++) { + sleep(1); } } - if(!opt_nocdr && opt_autoload_from_sqlvmexport) { - sqlStore->autoloadFromSqlVmExport(); + if(!hot_restarting) { + _break = true; } } + terminating_error = ""; + } else { + _break = true; } - if(opt_load_query_from_files == 2) { - if (opt_fork) { - daemonize(); - } - loadFromQFiles->loadFromQFiles_start(); - unsigned int counter; - while(!is_terminating()) { - sleep(1); - if(!(++counter % 10) && verbosity) { - string stat = loadFromQFiles->getLoadFromQFilesStat(); - syslog(LOG_NOTICE, "SQLf: [%s]", stat.c_str()); - } + //wait for manager to properly terminate + if(opt_manager_port && manager_thread > 0) { + int res; + res = shutdown(manager_socket_server, SHUT_RDWR); // break accept syscall in manager thread + if(res == -1) { + // if shutdown failed it can happen when reding very short pcap file and the bind socket was not created in manager + usleep(10000); + res = shutdown(manager_socket_server, SHUT_RDWR); // break accept syscall in manager thread } - delete loadFromQFiles; - return(0); - // reaload loop not supported + struct timespec ts; + ts.tv_sec = 1; + ts.tv_nsec = 0; + // wait for thread max 1 sec +#ifndef FREEBSD + //TODO: solve it for freebsd + pthread_timedjoin_np(manager_thread, NULL, &ts); +#endif + } + + if(_break) { + break; } - calltable = new FILE_LINE Calltable; + } + // END RELOAD LOOP + + _parse_packet_global.free(); + + delete [] sipportmatrix; + delete [] httpportmatrix; + delete [] webrtcportmatrix; - // preparing pcap reading and pcap filters + delete regfailedcache; - bpf_u_int32 mask; // Holds the subnet mask associated with device. - char errbuf[PCAP_ERRBUF_SIZE]; // Returns error text and is only set when the pcap_lookupnet subroutine fails. +#ifdef HAVE_LIBGNUTLS + ssl_clean(); +#endif - if(opt_test) { - ipfilter = new FILE_LINE IPfilter; - telnumfilter = new FILE_LINE TELNUMfilter; - domainfilter = new FILE_LINE DOMAINfilter; - sipheaderfilter = new FILE_LINE SIP_HEADERfilter; - _parse_packet_global.setStdParse(); - test(); - if(sqlStore) { - delete sqlStore; - } - return(0); - // test - reaload loop not supported + if(sverb.memory_stat) { + cout << "memory stat at end" << endl; + printMemoryStat(true); + } + if (opt_fork){ + unlink(opt_pidfile); } + return(0); +} + +int main_init_read() { + calltable = new FILE_LINE Calltable; + rtp_threaded = num_threads > 0; // check if sniffer will be reading pcap files from dir and if not if it reads from eth interface or read only one file - if(opt_scanpcapdir[0] == '\0') { - if (opt_read_from_file_fname[0] == '\0' && (ifname[0] != '\0' || opt_pcap_queue_receive_from_ip_port)) { - if(!opt_pcap_queue) { - bpf_u_int32 net; - - printf("Capturing on interface: %s\n", ifname); - // Find the properties for interface - if (pcap_lookupnet(ifname, &net, &mask, errbuf) == -1) { - // if not available, use default - mask = PCAP_NETMASK_UNKNOWN; - } - /* - global_pcap_handle = pcap_open_live(ifname, 1600, opt_promisc, 1000, errbuf); - if (global_pcap_handle == NULL) { - fprintf(stderr, "Couldn't open inteface '%s': %s\n", ifname, errbuf); - return(2); - } - */ - - /* to set own pcap_set_buffer_size it must be this way and not useing pcap_lookupnet */ - - int status = 0; - if((global_pcap_handle = pcap_create(ifname, errbuf)) == NULL) { - fprintf(stderr, "pcap_create failed on iface '%s': %s\n", ifname, errbuf); - return(2); - } - if((status = pcap_set_snaplen(global_pcap_handle, 3200)) != 0) { - fprintf(stderr, "error pcap_set_snaplen\n"); - return(2); - } - if((status = pcap_set_promisc(global_pcap_handle, opt_promisc)) != 0) { - fprintf(stderr, "error pcap_set_promisc\n"); - return(2); - } - if((status = pcap_set_timeout(global_pcap_handle, 1000)) != 0) { - fprintf(stderr, "error pcap_set_timeout\n"); - return(2); - } - - /* this is not possible for libpcap older than 1.0.0 so now voipmonitor requires libpcap > 1.0.0 - set ring buffer size to 5M to prevent packet drops whan CPU goes high or on very high traffic - - default is 2MB for libpcap > 1.0.0 - - for libpcap < 1.0.0 it is controled by /proc/sys/net/core/rmem_default which is very low - */ - if((status = pcap_set_buffer_size(global_pcap_handle, opt_ringbuffer * 1024 * 1024)) != 0) { - fprintf(stderr, "error pcap_set_buffer_size\n"); - return(2); - } - - if((status = pcap_activate(global_pcap_handle)) != 0) { - fprintf(stderr, "libpcap error: [%s]\n", pcap_geterr(global_pcap_handle)); - return(2); - } - } - if(opt_convert_dlt_sll_to_en10) { - global_pcap_handle_dead_EN10MB = pcap_open_dead(DLT_EN10MB, 65535); - } - } else { - // if reading file - rtp_threaded = 0; - opt_mirrorip = 0; // disable mirroring packets when reading pcap files from file + if(is_read_from_file_simple()) { + // if reading file + rtp_threaded = 0; + opt_mirrorip = 0; // disable mirroring packets when reading pcap files from file // opt_cachedir[0] = '\0'; //disabling cache if reading from file - opt_pcap_threaded = 0; //disable threading because it is useless while reading packets from file - //opt_cleanspool_interval = 0; // disable cleaning spooldir when reading from file - opt_maxpoolsize = 0; - opt_maxpooldays = 0; - opt_maxpoolsipsize = 0; - opt_maxpoolsipdays = 0; - opt_maxpoolrtpsize = 0; - opt_maxpoolrtpdays = 0; - opt_maxpoolgraphsize = 0; - opt_maxpoolgraphdays = 0; - opt_maxpoolaudiosize = 0; - opt_maxpoolaudiodays = 0; - - opt_manager_port = 0; // disable cleaning spooldir when reading from file - printf("Reading file: %s\n", opt_read_from_file_fname); - mask = PCAP_NETMASK_UNKNOWN; - global_pcap_handle = pcap_open_offline_zip(opt_read_from_file_fname, errbuf); - if(global_pcap_handle == NULL) { - fprintf(stderr, "Couldn't open pcap file '%s': %s\n", opt_read_from_file_fname, errbuf); - return(2); - } - } + //opt_cleanspool_interval = 0; // disable cleaning spooldir when reading from file + opt_maxpoolsize = 0; + opt_maxpooldays = 0; + opt_maxpoolsipsize = 0; + opt_maxpoolsipdays = 0; + opt_maxpoolrtpsize = 0; + opt_maxpoolrtpdays = 0; + opt_maxpoolgraphsize = 0; + opt_maxpoolgraphdays = 0; + opt_maxpoolaudiosize = 0; + opt_maxpoolaudiodays = 0; - if(!opt_pcap_queue) { - if(opt_mirrorip) { - if(opt_mirrorip_dst[0] == '\0') { - syslog(LOG_ERR, "Mirroring SIP packets disabled because mirroripdst was not set"); - opt_mirrorip = 0; - } else { - syslog(LOG_NOTICE, "Starting SIP mirroring [%s]->[%s]", opt_mirrorip_src, opt_mirrorip_dst); - mirrorip = new FILE_LINE MirrorIP(opt_mirrorip_src, opt_mirrorip_dst); - } - } - - char filter_exp[2048] = ""; // The filter expression - struct bpf_program fp; // The compiled filter - - if(*user_filter != '\0') { - snprintf(filter_exp, sizeof(filter_exp), "%s", user_filter); - - // Compile and apply the filter - if (pcap_compile(global_pcap_handle, &fp, filter_exp, 0, mask) == -1) { - fprintf(stderr, "Couldn't parse filter %s: %s\n", filter_exp, pcap_geterr(global_pcap_handle)); - return(2); - } - if (pcap_setfilter(global_pcap_handle, &fp) == -1) { - fprintf(stderr, "Couldn't install filter %s: %s\n", filter_exp, pcap_geterr(global_pcap_handle)); - return(2); - } - } + opt_manager_port = 0; // disable cleaning spooldir when reading from file + printf("Reading file: %s\n", opt_read_from_file_fname); + char errbuf[PCAP_ERRBUF_SIZE]; + global_pcap_handle = pcap_open_offline_zip(opt_read_from_file_fname, errbuf); + if(global_pcap_handle == NULL) { + fprintf(stderr, "Couldn't open pcap file '%s': %s\n", opt_read_from_file_fname, errbuf); + return(2); } } - //opt_pcap_threaded = 0; //disable threading because it is useless while reading packets from file chdir(opt_chdir); @@ -2382,9 +2251,7 @@ int main(int argc, char *argv[]) { domainfilter = new FILE_LINE DOMAINfilter; sipheaderfilter = new FILE_LINE SIP_HEADERfilter; if(!opt_nocdr && - !(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + !is_sender()) { ipfilter->load(); telnumfilter->load(); domainfilter->load(); @@ -2397,16 +2264,15 @@ int main(int argc, char *argv[]) { _parse_packet_global.setStdParse(); + if(opt_ipaccount && !opt_test) { + initIpacc(); + } + if(opt_ipaccount and !ipaccountportmatrix) { ipaccountportmatrix = new FILE_LINE char[65537]; memset(ipaccountportmatrix, 0, 65537); } - // filters are ok, we can daemonize - if (opt_fork && !opt_read_from_file && reloadLoopCounter == 0) { - daemonize(); - } - if(opt_save_query_to_files) { sqlStore->queryToFiles_start(); } @@ -2433,19 +2299,13 @@ int main(int argc, char *argv[]) { asyncClose->startThreads(opt_pcap_dump_writethreads, opt_pcap_dump_writethreads_max); } - if(!opt_nocdr && - isSqlDriver("mysql") && - !(opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port) && + if(is_enable_cleanspool() && isSetCleanspoolParameters()) { runCleanSpoolThread(); } // start thread processing queued cdr and sql queue - supressed if run as sender - if(!(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + if(!is_sender()) { pthread_create(&storing_cdr_thread, NULL, storing_cdr, NULL); /* pthread_create(&destroy_calls_thread, NULL, destroy_calls, NULL); @@ -2457,15 +2317,6 @@ int main(int argc, char *argv[]) { pthread_create(&cachedir_thread, NULL, moving_cache, NULL); } - // start manager thread - if(opt_manager_port > 0) { - pthread_create(&manager_thread, NULL, manager_server, NULL); - // start reversed manager thread - if(opt_clientmanager[0] != '\0') { - pthread_create(&manager_client_thread, NULL, manager_client, NULL); - } - }; - // start tar dumper if(opt_pcap_dump_tar) { pthread_create(&tarqueuethread, NULL, TarQueueThread, NULL); @@ -2478,88 +2329,39 @@ int main(int argc, char *argv[]) { #endif // start reading threads - if(rtp_threaded && - !(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + if(is_enable_rtp_threads()) { rtp_threads = new FILE_LINE rtp_read_thread[num_threads]; for(int i = 0; i < num_threads; i++) { -#ifdef QUEUE_MUTEX - pthread_mutex_init(&(rtp_threads[i].qlock), NULL); - sem_init(&(rtp_threads[i].semaphore), 0, 0); -#endif - -#ifdef QUEUE_NONBLOCK - rtp_threads[i].pqueue = NULL; - queue_new(&(rtp_threads[i].pqueue), 10000); -#endif - -#ifdef QUEUE_NONBLOCK2 - if(opt_pcap_queue) { - size_t _rtp_qring_length = rtp_qring_length ? - rtp_qring_length : - rtpthreadbuffer * 1024 * 1024 / sizeof(rtp_packet_pcap_queue); - if(rtp_qring_quick == 2) { - rtp_threads[i].rtpp_queue_quick_boost = new FILE_LINE rqueue_quick_boost( - 100, rtp_qring_usleep, - &terminating, - __FILE__, __LINE__); - } else if(rtp_qring_quick) { - rtp_threads[i].rtpp_queue_quick = new FILE_LINE rqueue_quick( - _rtp_qring_length, + size_t _rtp_qring_length = rtp_qring_length ? + rtp_qring_length : + rtpthreadbuffer * 1024 * 1024 / sizeof(rtp_packet_pcap_queue); + if(rtp_qring_quick == 2) { + rtp_threads[i].rtpp_queue_quick_boost = new FILE_LINE rqueue_quick_boost( 100, rtp_qring_usleep, - &terminating, true, + &terminating, __FILE__, __LINE__); - } else { - rtp_threads[i].rtpp_queue = new FILE_LINE rqueue(_rtp_qring_length / 2, _rtp_qring_length / 5, _rtp_qring_length * 1.5); - char rtpp_queue_name[20]; - sprintf(rtpp_queue_name, "rtp thread %i", i + 1); - rtp_threads[i].rtpp_queue->setName(rtpp_queue_name); - } + } else if(rtp_qring_quick) { + rtp_threads[i].rtpp_queue_quick = new FILE_LINE rqueue_quick( + _rtp_qring_length, + 100, rtp_qring_usleep, + &terminating, true, + __FILE__, __LINE__); } else { - rtp_threads[i].vmbuffermax = rtp_qring_length ? - rtp_qring_length : - rtpthreadbuffer * 1024 * 1024 / sizeof(rtp_packet); - rtp_threads[i].writeit = 0; - rtp_threads[i].readit = 0; - rtp_threads[i].vmbuffer = new FILE_LINE rtp_packet[rtp_threads[i].vmbuffermax + 1]; - for(int j = 0; j < rtp_threads[i].vmbuffermax + 1; j++) { - rtp_threads[i].vmbuffer[j].free = 1; - } + rtp_threads[i].rtpp_queue = new FILE_LINE rqueue(_rtp_qring_length / 2, _rtp_qring_length / 5, _rtp_qring_length * 1.5); + char rtpp_queue_name[20]; + sprintf(rtpp_queue_name, "rtp thread %i", i + 1); + rtp_threads[i].rtpp_queue->setName(rtpp_queue_name); } -#endif - pthread_create(&(rtp_threads[i].thread), NULL, rtp_read_thread_func, (void*)&rtp_threads[i]); } } - if(opt_pcap_threaded) { -#ifdef QUEUE_MUTEX - pthread_mutex_init(&readpacket_thread_queue_lock, NULL); - sem_init(&readpacket_thread_semaphore, 0, 0); -#endif - -#ifdef QUEUE_NONBLOCK - queue_new(&qs_readpacket_thread_queue, 100000); - pthread_create(&pcap_read_thread, NULL, pcap_read_thread_func, NULL); -#endif - -#ifdef QUEUE_NONBLOCK2 - if(!opt_pcap_queue) { - pcap_qring = new FILE_LINE pcap_packet[pcap_qring_max + 1]; - for(unsigned int i = 0; i < pcap_qring_max + 1; i++) { - pcap_qring[i].free = 1; - } - pthread_create(&pcap_read_thread, NULL, pcap_read_thread_func, NULL); - } -#endif - } if((opt_enable_preprocess_packet || opt_enable_ssl) && - !opt_read_from_file) { + !is_read_from_file_simple()) { preProcessPacket = new FILE_LINE PreProcessPacket(); } if(opt_enable_process_rtp_packet && - !opt_read_from_file) { + !is_read_from_file_simple()) { for(int i = 0; i < opt_enable_process_rtp_packet; i++) { processRtpPacket[i] = new FILE_LINE ProcessRtpPacket(i); } @@ -2608,7 +2410,7 @@ int main(int argc, char *argv[]) { sslData = new FILE_LINE SslData; tcpReassemblySsl->setDataCallback(sslData); tcpReassemblySsl->setLinkTimeout(opt_ssl_link_timeout); - if(opt_pb_read_from_file[0]) { + if(is_read_from_file_by_pb()) { tcpReassemblySsl->setEnableWildLink(); } } @@ -2630,187 +2432,171 @@ int main(int argc, char *argv[]) { syslog(LOG_NOTICE, "reindex date %s completed", maxSpoolDate.c_str()); } } + + readend = 0; - if(opt_pcap_threaded) { - if(opt_pcap_queue) { - - PcapQueue_init(); - - if(opt_pb_read_from_file[0] && opt_enable_http) { - if(opt_tcpreassembly_thread) { - tcpReassemblyHttp->setIgnoreTerminating(true); - } - } - if(opt_pb_read_from_file[0] && opt_enable_webrtc) { - if(opt_tcpreassembly_thread) { - tcpReassemblyWebrtc->setIgnoreTerminating(true); - } - } - if(opt_pb_read_from_file[0] && opt_enable_ssl) { - if(opt_tcpreassembly_thread) { - tcpReassemblySsl->setIgnoreTerminating(true); - } - } + if(is_enable_packetbuffer()) { + PcapQueue_init(); - if(ifname[0]) { - pcapQueueI = new FILE_LINE PcapQueue_readFromInterface("interface"); - pcapQueueI->setInterfaceName(ifname); - pcapQueueI->setEnableAutoTerminate(false); - } - - pcapQueueQ = new FILE_LINE PcapQueue_readFromFifo("queue", opt_pcap_queue_disk_folder.c_str()); - if(pcapQueueI) { - pcapQueueQ->setInstancePcapHandle(pcapQueueI); + if(is_read_from_file_by_pb() && opt_enable_http) { + if(opt_tcpreassembly_thread) { + tcpReassemblyHttp->setIgnoreTerminating(true); } - pcapQueueQ->setEnableAutoTerminate(false); - - if(opt_pcap_queue_receive_from_ip_port) { - pcapQueueQ->setPacketServer(opt_pcap_queue_receive_from_ip_port, PcapQueue_readFromFifo::directionRead); - } else if(opt_pcap_queue_send_to_ip_port) { - pcapQueueQ->setPacketServer(opt_pcap_queue_send_to_ip_port, PcapQueue_readFromFifo::directionWrite); - } - - pcapQueueQ->start(); - if(pcapQueueI) { - pcapQueueI->start(); - pcapQueueInterface = pcapQueueI; + } + if(is_read_from_file_by_pb() && opt_enable_webrtc) { + if(opt_tcpreassembly_thread) { + tcpReassemblyWebrtc->setIgnoreTerminating(true); } - pcapQueueStatInterface = pcapQueueQ; - - if(opt_scanpcapdir[0] != '\0') { - pthread_create(&scanpcapdir_thread, NULL, scanpcapdir, NULL); + } + if(is_read_from_file_by_pb() && opt_enable_ssl) { + if(opt_tcpreassembly_thread) { + tcpReassemblySsl->setIgnoreTerminating(true); } - - uint64_t _counter = 0; - int _pcap_stat_period = sverb.pcap_stat_period ? sverb.pcap_stat_period : 10; - while(!is_terminating()) { - if(_counter && (verbosityE > 0 || !(_counter % _pcap_stat_period))) { - pthread_mutex_lock(&terminate_packetbuffer_lock); - pcapQueueQ->pcapStat(verbosityE > 0 ? 1 : _pcap_stat_period); - pthread_mutex_unlock(&terminate_packetbuffer_lock); - if(sverb.memory_stat_log) { - printMemoryStat(); - } - if(tcpReassemblyHttp) { - tcpReassemblyHttp->setDoPrintContent(); - } - if(tcpReassemblyWebrtc) { - tcpReassemblyWebrtc->setDoPrintContent(); - } - #if RTP_PROF - for(int i = 0; i < opt_enable_process_rtp_packet; i++) if(processRtpPacket[i]) { - unsigned long long ___prof__ProcessRtpPacket_outThreadFunction = processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction; - unsigned long long ___prof__ProcessRtpPacket_outThreadFunction__usleep = processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction__usleep; - unsigned long long ___prof__ProcessRtpPacket_rtp = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp; - unsigned long long ___prof__ProcessRtpPacket_rtp__hashfind = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__hashfind; - unsigned long long ___prof__ProcessRtpPacket_rtp__fill_call_array = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__fill_call_array; - unsigned long long ___prof__process_packet__rtp = processRtpPacket[i]->__prof__process_packet__rtp; - unsigned long long ___prof__add_to_rtp_thread_queue = processRtpPacket[i]->__prof__add_to_rtp_thread_queue; - unsigned long long ___prof__ProcessRtpPacket_outThreadFunction2 = ___prof__ProcessRtpPacket_outThreadFunction - ___prof__ProcessRtpPacket_outThreadFunction__usleep; - cout << fixed - << "RTP PROF - " << (processRtpPacket[i]->indexThread + 1) << "/" << processRtpPacket[i]->outThreadId - << endl - << left << setw(50) << "ProcessRtpPacket::outThreadFunction" - << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction - << endl - << left << setw(50) << "ProcessRtpPacket::outThreadFunction / usleep" - << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction__usleep - << endl - << left << setw(50) << "ProcessRtpPacket::outThreadFunction / process time" - << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction2 - << endl - << left << setw(50) << " ProcessRtpPacket::rtp" - << right << setw(15) << ___prof__ProcessRtpPacket_rtp - << setw(15) << setprecision(5) - << ((double)___prof__ProcessRtpPacket_rtp / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" - << endl - << left << setw(50) << " ProcessRtpPacket::rtp / hashfind" - << right << setw(15) << ___prof__ProcessRtpPacket_rtp__hashfind - << setw(15) << setprecision(5) - << ((double)___prof__ProcessRtpPacket_rtp__hashfind / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" - << endl - << left << setw(50) << " ProcessRtpPacket::rtp / fill call array" - << right << setw(15) << ___prof__ProcessRtpPacket_rtp__fill_call_array - << setw(15) << setprecision(5) - << ((double)___prof__ProcessRtpPacket_rtp__fill_call_array / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" - << endl - << left << setw(50) << " process_packet__rtp" - << right << setw(15) << ___prof__process_packet__rtp - << setw(15) << setprecision(5) - << ((double)___prof__process_packet__rtp / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" - << endl - << left << setw(50) << " add_to_rtp_thread_queue" - << right << setw(15) << ___prof__add_to_rtp_thread_queue - << setw(15) << setprecision(5) - << ((double)___prof__add_to_rtp_thread_queue / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" - << endl; - processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction_begin = rdtsc(); - processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction__usleep = 0; - processRtpPacket[i]->__prof__ProcessRtpPacket_rtp = 0; - processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__hashfind = 0; - processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__fill_call_array = 0; - processRtpPacket[i]->__prof__process_packet__rtp = 0; - processRtpPacket[i]->__prof__add_to_rtp_thread_queue = 0; - } - #endif + } + + if(ifname[0] || is_read_from_file_by_pb()) { + pcapQueueI = new FILE_LINE PcapQueue_readFromInterface("interface"); + pcapQueueI->setInterfaceName(ifname); + pcapQueueI->setEnableAutoTerminate(false); + } + + pcapQueueQ = new FILE_LINE PcapQueue_readFromFifo("queue", opt_pcap_queue_disk_folder.c_str()); + if(pcapQueueI) { + pcapQueueQ->setInstancePcapHandle(pcapQueueI); + } + pcapQueueQ->setEnableAutoTerminate(false); + + if(opt_pcap_queue_receive_from_ip_port) { + pcapQueueQ->setPacketServer(opt_pcap_queue_receive_from_ip_port, PcapQueue_readFromFifo::directionRead); + } else if(opt_pcap_queue_send_to_ip_port) { + pcapQueueQ->setPacketServer(opt_pcap_queue_send_to_ip_port, PcapQueue_readFromFifo::directionWrite); + } + + pcapQueueQ->start(); + if(pcapQueueI) { + pcapQueueI->start(); + pcapQueueInterface = pcapQueueI; + } + pcapQueueStatInterface = pcapQueueQ; + + if(opt_scanpcapdir[0] != '\0') { + pthread_create(&scanpcapdir_thread, NULL, scanpcapdir, NULL); + } + + uint64_t _counter = 0; + int _pcap_stat_period = sverb.pcap_stat_period ? sverb.pcap_stat_period : 10; + while(!is_terminating()) { + if(_counter && (verbosityE > 0 || !(_counter % _pcap_stat_period))) { + pthread_mutex_lock(&terminate_packetbuffer_lock); + pcapQueueQ->pcapStat(verbosityE > 0 ? 1 : _pcap_stat_period); + pthread_mutex_unlock(&terminate_packetbuffer_lock); + if(sverb.memory_stat_log) { + printMemoryStat(); } - sleep(1); - ++_counter; - } - - if(opt_scanpcapdir[0] != '\0') { - //pthread_join(scanpcapdir_thread, NULL); // failed - stop at: scanpcapdir::'len = read(fd, buff, 4096);' - sleep(2); - } - - terminate_packetbuffer(); - - if(opt_pb_read_from_file[0] && (opt_enable_http || opt_enable_webrtc || opt_enable_ssl)) { - sleep(2); + if(tcpReassemblyHttp) { + tcpReassemblyHttp->setDoPrintContent(); + } + if(tcpReassemblyWebrtc) { + tcpReassemblyWebrtc->setDoPrintContent(); + } + #if RTP_PROF + for(int i = 0; i < opt_enable_process_rtp_packet; i++) if(processRtpPacket[i]) { + unsigned long long ___prof__ProcessRtpPacket_outThreadFunction = processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction; + unsigned long long ___prof__ProcessRtpPacket_outThreadFunction__usleep = processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction__usleep; + unsigned long long ___prof__ProcessRtpPacket_rtp = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp; + unsigned long long ___prof__ProcessRtpPacket_rtp__hashfind = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__hashfind; + unsigned long long ___prof__ProcessRtpPacket_rtp__fill_call_array = processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__fill_call_array; + unsigned long long ___prof__process_packet__rtp = processRtpPacket[i]->__prof__process_packet__rtp; + unsigned long long ___prof__add_to_rtp_thread_queue = processRtpPacket[i]->__prof__add_to_rtp_thread_queue; + unsigned long long ___prof__ProcessRtpPacket_outThreadFunction2 = ___prof__ProcessRtpPacket_outThreadFunction - ___prof__ProcessRtpPacket_outThreadFunction__usleep; + cout << fixed + << "RTP PROF - " << (processRtpPacket[i]->indexThread + 1) << "/" << processRtpPacket[i]->outThreadId + << endl + << left << setw(50) << "ProcessRtpPacket::outThreadFunction" + << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction + << endl + << left << setw(50) << "ProcessRtpPacket::outThreadFunction / usleep" + << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction__usleep + << endl + << left << setw(50) << "ProcessRtpPacket::outThreadFunction / process time" + << right << setw(15) << ___prof__ProcessRtpPacket_outThreadFunction2 + << endl + << left << setw(50) << " ProcessRtpPacket::rtp" + << right << setw(15) << ___prof__ProcessRtpPacket_rtp + << setw(15) << setprecision(5) + << ((double)___prof__ProcessRtpPacket_rtp / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" + << endl + << left << setw(50) << " ProcessRtpPacket::rtp / hashfind" + << right << setw(15) << ___prof__ProcessRtpPacket_rtp__hashfind + << setw(15) << setprecision(5) + << ((double)___prof__ProcessRtpPacket_rtp__hashfind / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" + << endl + << left << setw(50) << " ProcessRtpPacket::rtp / fill call array" + << right << setw(15) << ___prof__ProcessRtpPacket_rtp__fill_call_array + << setw(15) << setprecision(5) + << ((double)___prof__ProcessRtpPacket_rtp__fill_call_array / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" + << endl + << left << setw(50) << " process_packet__rtp" + << right << setw(15) << ___prof__process_packet__rtp + << setw(15) << setprecision(5) + << ((double)___prof__process_packet__rtp / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" + << endl + << left << setw(50) << " add_to_rtp_thread_queue" + << right << setw(15) << ___prof__add_to_rtp_thread_queue + << setw(15) << setprecision(5) + << ((double)___prof__add_to_rtp_thread_queue / ___prof__ProcessRtpPacket_outThreadFunction2) * 100 << "%" + << endl; + processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction_begin = rdtsc(); + processRtpPacket[i]->__prof__ProcessRtpPacket_outThreadFunction__usleep = 0; + processRtpPacket[i]->__prof__ProcessRtpPacket_rtp = 0; + processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__hashfind = 0; + processRtpPacket[i]->__prof__ProcessRtpPacket_rtp__fill_call_array = 0; + processRtpPacket[i]->__prof__process_packet__rtp = 0; + processRtpPacket[i]->__prof__add_to_rtp_thread_queue = 0; + } + #endif } - - PcapQueue_term(); - - } else { - pthread_create(&readdump_libpcap_thread, NULL, readdump_libpcap_thread_fce, global_pcap_handle); - pthread_join(readdump_libpcap_thread, NULL); + sleep(1); + ++_counter; } + + if(opt_scanpcapdir[0] != '\0') { + //pthread_join(scanpcapdir_thread, NULL); // failed - stop at: scanpcapdir::'len = read(fd, buff, 4096);' + sleep(2); + } + + terminate_packetbuffer(); + + if(is_read_from_file_by_pb() && (opt_enable_http || opt_enable_webrtc || opt_enable_ssl)) { + sleep(2); + } + + PcapQueue_term(); } else { readdump_libpcap(global_pcap_handle); } + + return(0); +} +void main_term_read() { readend = 1; -#ifdef QUEUE_NONBLOCK2 - if(opt_pcap_threaded && !opt_pcap_queue) { - pthread_join(pcap_read_thread, NULL); - } -#endif - // wait for RTP threads - if(rtp_threaded && - !(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + if(rtp_threads) { for(int i = 0; i < num_threads; i++) { pthread_join((rtp_threads[i].thread), NULL); -#ifdef QUEUE_NONBLOCK2 - if(opt_pcap_queue) { - if(rtp_threads[i].rtpp_queue_quick) { - delete rtp_threads[i].rtpp_queue_quick; - } else { - delete rtp_threads[i].rtpp_queue; - } + if(rtp_threads[i].rtpp_queue_quick) { + delete rtp_threads[i].rtpp_queue_quick; } else { - delete [] rtp_threads[i].vmbuffer; + delete rtp_threads[i].rtpp_queue; } -#endif } delete [] rtp_threads; rtp_threads = NULL; } - if(!opt_pcap_queue && global_pcap_handle) { + if(is_read_from_file_simple() && global_pcap_handle) { pcap_close(global_pcap_handle); } if(global_pcap_handle_dead_EN10MB) { @@ -2936,9 +2722,7 @@ int main(int argc, char *argv[]) { } } - if(!(opt_pcap_threaded && opt_pcap_queue && - !opt_pcap_queue_receive_from_ip_port && - opt_pcap_queue_send_to_ip_port)) { + if(storing_cdr_thread) { terminating_storing_cdr = 1; pthread_join(storing_cdr_thread, NULL); } @@ -3016,80 +2800,109 @@ int main(int argc, char *argv[]) { thread_cleanup(); _parse_packet_global.clear(); - - bool _break = false; - - if(useNewCONFIG) { - string _terminating_error = terminating_error; - if(!hot_restarting && _terminating_error.empty()) { - _break = true; +} + +void main_init_sqlstore() { + if(isSqlDriver("mysql")) { + if(opt_load_query_from_files != 2) { + sqlStore = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database, cloud_host, cloud_token); + if(opt_save_query_to_files) { + sqlStore->queryToFiles(opt_save_query_to_files, opt_save_query_to_files_directory, opt_save_query_to_files_period); + } } - if(!_terminating_error.empty()) { - clear_terminating(); - while(!is_terminating()) { - syslog(LOG_NOTICE, "%s - wait for terminating or hot restarting", _terminating_error.c_str()); - for(int i = 0; i < 10 && !is_terminating(); i++) { - sleep(1); + if(opt_load_query_from_files) { + loadFromQFiles = new FILE_LINE MySqlStore(mysql_host, mysql_user, mysql_password, mysql_database); + loadFromQFiles->loadFromQFiles(opt_load_query_from_files, opt_load_query_from_files_directory, opt_load_query_from_files_period); + } + if(opt_load_query_from_files != 2) { + if(!opt_nocdr) { + sqlStore->connect(STORE_PROC_ID_CDR_1); + sqlStore->connect(STORE_PROC_ID_MESSAGE_1); + } + if(opt_mysqlstore_concat_limit) { + sqlStore->setDefaultConcatLimit(opt_mysqlstore_concat_limit); + } + for(int i = 0; i < opt_mysqlstore_max_threads_cdr; i++) { + if(opt_mysqlstore_concat_limit_cdr) { + sqlStore->setConcatLimit(STORE_PROC_ID_CDR_1 + i, opt_mysqlstore_concat_limit_cdr); + } + if(i) { + sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_CDR_1 + i); + } + if(opt_mysql_enable_transactions_cdr) { + sqlStore->setEnableTransaction(STORE_PROC_ID_CDR_1 + i); + } + if(opt_cdr_check_duplicity_callid_in_next_pass_insert) { + sqlStore->setEnableFixDeadlock(STORE_PROC_ID_CDR_1 + i); } } - if(!hot_restarting) { - _break = true; + for(int i = 0; i < opt_mysqlstore_max_threads_message; i++) { + if(opt_mysqlstore_concat_limit_message) { + sqlStore->setConcatLimit(STORE_PROC_ID_MESSAGE_1 + i, opt_mysqlstore_concat_limit_message); + } + if(i) { + sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_MESSAGE_1 + i); + } + if(opt_mysql_enable_transactions_message) { + sqlStore->setEnableTransaction(STORE_PROC_ID_MESSAGE_1 + i); + } + if(opt_message_check_duplicity_callid_in_next_pass_insert) { + sqlStore->setEnableFixDeadlock(STORE_PROC_ID_MESSAGE_1 + i); + } + } + for(int i = 0; i < opt_mysqlstore_max_threads_register; i++) { + if(opt_mysqlstore_concat_limit_register) { + sqlStore->setConcatLimit(STORE_PROC_ID_REGISTER_1 + i, opt_mysqlstore_concat_limit_register); + } + if(i) { + sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_REGISTER_1 + i); + } + if(opt_mysql_enable_transactions_register) { + sqlStore->setEnableTransaction(STORE_PROC_ID_REGISTER_1 + i); + } + } + for(int i = 0; i < opt_mysqlstore_max_threads_http; i++) { + if(opt_mysqlstore_concat_limit_http) { + sqlStore->setConcatLimit(STORE_PROC_ID_HTTP_1 + i, opt_mysqlstore_concat_limit_http); + } + if(i) { + sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_HTTP_1 + i); + } + if(opt_mysql_enable_transactions_http) { + sqlStore->setEnableTransaction(STORE_PROC_ID_HTTP_1 + i); + } + } + for(int i = 0; i < opt_mysqlstore_max_threads_webrtc; i++) { + if(opt_mysqlstore_concat_limit_webrtc) { + sqlStore->setConcatLimit(STORE_PROC_ID_WEBRTC_1 + i, opt_mysqlstore_concat_limit_webrtc); + } + if(i) { + sqlStore->setEnableAutoDisconnect(STORE_PROC_ID_WEBRTC_1 + i); + } + if(opt_mysql_enable_transactions_webrtc) { + sqlStore->setEnableTransaction(STORE_PROC_ID_WEBRTC_1 + i); + } + } + if(opt_mysqlstore_concat_limit_ipacc) { + for(int i = 0; i < opt_mysqlstore_max_threads_ipacc_base; i++) { + sqlStore->setConcatLimit(STORE_PROC_ID_IPACC_1 + i, opt_mysqlstore_concat_limit_ipacc); + } + for(int i = STORE_PROC_ID_IPACC_AGR_INTERVAL; i <= STORE_PROC_ID_IPACC_AGR_DAY; i++) { + sqlStore->setConcatLimit(i, opt_mysqlstore_concat_limit_ipacc); + } + for(int i = 0; i < opt_mysqlstore_max_threads_ipacc_agreg2; i++) { + sqlStore->setConcatLimit(STORE_PROC_ID_IPACC_AGR2_HOUR_1 + i, opt_mysqlstore_concat_limit_ipacc); + } + } + if(!opt_nocdr && opt_autoload_from_sqlvmexport) { + sqlStore->autoloadFromSqlVmExport(); } } - terminating_error = ""; - } else { - _break = true; - } - - //wait for manager to properly terminate - if(opt_manager_port && manager_thread > 0) { - int res; - res = shutdown(manager_socket_server, SHUT_RDWR); // break accept syscall in manager thread - if(res == -1) { - // if shutdown failed it can happen when reding very short pcap file and the bind socket was not created in manager - usleep(10000); - res = shutdown(manager_socket_server, SHUT_RDWR); // break accept syscall in manager thread - } - struct timespec ts; - ts.tv_sec = 1; - ts.tv_nsec = 0; - // wait for thread max 1 sec -#ifndef FREEBSD - //TODO: solve it for freebsd - pthread_timedjoin_np(manager_thread, NULL, &ts); -#endif - } - - if(_break) { - break; - } - - } - // END RELOAD LOOP - - _parse_packet_global.free(); - - delete [] sipportmatrix; - delete [] httpportmatrix; - delete [] webrtcportmatrix; - - delete regfailedcache; - -#ifdef HAVE_LIBGNUTLS - ssl_clean(); -#endif - - if(sverb.memory_stat) { - cout << "memory stat at end" << endl; - printMemoryStat(true); - } - if (opt_fork){ - unlink(opt_pidfile); } } void terminate_packetbuffer() { - if(opt_pcap_threaded && opt_pcap_queue) { + if(is_enable_packetbuffer()) { pthread_mutex_lock(&terminate_packetbuffer_lock); extern bool pstat_quietly_errors; pstat_quietly_errors = true; @@ -3098,7 +2911,7 @@ void terminate_packetbuffer() { pcapQueueI->terminate(); } sleep(1); - if(opt_pb_read_from_file[0] && (opt_enable_http || opt_enable_webrtc || opt_enable_ssl) && opt_tcpreassembly_thread) { + if(is_read_from_file_by_pb() && (opt_enable_http || opt_enable_webrtc || opt_enable_ssl) && opt_tcpreassembly_thread) { if(opt_enable_http) { tcpReassemblyHttp->setIgnoreTerminating(false); } @@ -3137,11 +2950,6 @@ void terminate_packetbuffer() { } } -void *readdump_libpcap_thread_fce(void *handle) { - readdump_libpcap((pcap_t*)handle); - return(NULL); -} - #include "rqueue.h" #include "fraud.h" @@ -4043,8 +3851,6 @@ void cConfig::addConfigItems() { addConfigItem(new cConfigItem_integer("preprocess_packets_qring_usleep", &opt_preprocess_packets_qring_usleep)); addConfigItem(new cConfigItem_integer("process_rtp_packets_qring_length", &opt_process_rtp_packets_qring_length)); addConfigItem(new cConfigItem_integer("process_rtp_packets_qring_usleep", &opt_process_rtp_packets_qring_usleep)); - addConfigItem(new cConfigItem_integer("pcap_qring_length", &pcap_qring_max)); - addConfigItem(new cConfigItem_integer("pcap_qring_usleep", &pcap_qring_usleep)); addConfigItem(new cConfigItem_integer("rtp_qring_length", &rtp_qring_length)); addConfigItem(new cConfigItem_integer("rtp_qring_usleep", &rtp_qring_usleep)); addConfigItem((new cConfigItem_yesno("rtp_qring_quick", &rtp_qring_quick)) @@ -4152,7 +3958,6 @@ void cConfig::addConfigItems() { addConfigItem((new cConfigItem_integer("max_buffer_mem")) ->setNaDefaultValueStr()); advanced(); - addConfigItem(new cConfigItem_yesno("packetbuffer_enable", &opt_pcap_queue)); addConfigItem(new cConfigItem_yesno("packetbuffer_compress", &opt_pcap_queue_compress)); addConfigItem(new cConfigItem_integer("pcap_queue_dequeu_window_length", &opt_pcap_queue_dequeu_window_length)); expert(); @@ -4176,9 +3981,6 @@ void cConfig::addConfigItems() { addConfigItem(new cConfigItem_yesno("savertp-threaded", &opt_rtpsave_threaded)); obsolete(); addConfigItem(new cConfigItem_yesno("pcap_dispatch", &opt_pcap_dispatch)); - addConfigItem((new cConfigItem_integer("vmbuffer", &pcap_qring_max)) - ->setMaximum(4000) - ->setMultiple(1024.0 * 1024 / (unsigned int)sizeof(pcap_packet))); // JITTERBUFFER group("jitterbuffer"); advanced(); @@ -4729,9 +4531,6 @@ void get_command_line_arguments() { case 'E': rtpthreadbuffer = atoi(optarg); break; - case 'T': - pcap_qring_max = (unsigned int)((unsigned int)MIN(atoi(optarg), 4000) * 1024 * 1024 / (unsigned int)sizeof(pcap_packet)); - break; case 's': opt_id_sensor = atoi(optarg); break; @@ -4886,7 +4685,6 @@ void get_command_line_arguments() { opt_read_from_file = 1; opt_scanpcapdir[0] = '\0'; //opt_cachedir[0] = '\0'; - opt_pcap_queue = 0; opt_enable_preprocess_packet = 0; opt_enable_process_rtp_packet = 0; } @@ -4974,11 +4772,9 @@ void get_command_line_arguments() { } void set_context_config() { - #ifndef QUEUE_NONBLOCK2 - opt_pcap_queue = 0; - #endif - if(opt_pcap_queue && !opt_read_from_file && !opt_untar_gui_params && command_line_data.size()) { + if(!is_read_from_file_simple() && + !opt_untar_gui_params && command_line_data.size()) { // restore orig values buffersControl.restoreMaxBufferMemFromOrig(); static u_int64_t opt_pcap_queue_store_queue_max_memory_size_orig = 0; @@ -5162,8 +4958,8 @@ void set_context_config() { } bool check_complete_parameters() { - if (opt_read_from_file_fname[0] == '\0' && ifname[0] == '\0' && opt_scanpcapdir[0] == '\0' && - !opt_untar_gui_params && !printConfigStruct && !opt_pcap_queue_receive_from_ip_port && + if (!is_read_from_file() && ifname[0] == '\0' && opt_scanpcapdir[0] == '\0' && + !opt_untar_gui_params && !printConfigStruct && !is_receiver() && !opt_test){ /* Ruler to assist with keeping help description to max. 80 chars wide: 1 2 3 4 5 6 7 8 @@ -5876,9 +5672,6 @@ int eval_config(string inistr) { if((value = ini.GetValue("general", "norecord-dtmf", NULL))) { opt_norecord_dtmf = yesno(value); } - if((value = ini.GetValue("general", "vmbuffer", NULL))) { - pcap_qring_max = (unsigned int)((unsigned int)MIN(atoi(value), 4000) * 1024 * 1024 / (unsigned int)sizeof(pcap_packet)); - } if((value = ini.GetValue("general", "matchheader", NULL))) { snprintf(opt_match_header, sizeof(opt_match_header), "\n%s:", value); } @@ -6296,9 +6089,6 @@ int eval_config(string inistr) { opt_enable_webrtc_table = yesno(value); } - if((value = ini.GetValue("general", "packetbuffer_enable", NULL))) { - opt_pcap_queue = yesno(value); - } //EXPERT VALUES if((value = ini.GetValue("general", "packetbuffer_block_maxsize", NULL))) { opt_pcap_queue_block_max_size = atol(value) * 1024; @@ -6764,13 +6554,6 @@ int eval_config(string inistr) { opt_process_rtp_packets_qring_usleep = atol(value); } - if((value = ini.GetValue("general", "pcap_qring_length", NULL))) { - pcap_qring_max = atol(value); - } - if((value = ini.GetValue("general", "pcap_qring_usleep", NULL))) { - pcap_qring_usleep = atol(value); - } - if((value = ini.GetValue("general", "rtp_qring_length", NULL))) { rtp_qring_length = atol(value); } @@ -6985,3 +6768,43 @@ int load_config(char *fname) { } return res; } + + +bool is_read_from_file() { + return(is_read_from_file_simple() || + is_read_from_file_by_pb()); +} + +bool is_read_from_file_simple() { + return(opt_read_from_file); +} + +bool is_read_from_file_by_pb() { + return(opt_pb_read_from_file[0]); +} + +bool is_enable_packetbuffer() { + return(!is_read_from_file_simple()); +} + +bool is_enable_rtp_threads() { + return(is_enable_packetbuffer() && + rtp_threaded && + !is_sender()); +} + +bool is_enable_cleanspool() { + return(!opt_nocdr && + isSqlDriver("mysql") && + !is_read_from_file() && + !is_sender()); +} + +bool is_receiver() { + return(opt_pcap_queue_receive_from_ip_port); +} + +bool is_sender() { + return(!opt_pcap_queue_receive_from_ip_port && + opt_pcap_queue_send_to_ip_port); +} diff --git a/voipmonitor.h b/voipmonitor.h index 1dcc39003..bb0380931 100644 --- a/voipmonitor.h +++ b/voipmonitor.h @@ -41,8 +41,6 @@ #define SNIFFER_INLINE_FUNCTIONS true #define TCPREPLAY_WORKARROUND false -#define QUEUE_NONBLOCK2 1 - #define SYNC_PCAP_BLOCK_STORE true #define SYNC_CALL_RTP true @@ -56,8 +54,6 @@ /* choose what method wil be used to synchronize threads. NONBLOCK is the fastest. Do not enable both at once */ // this is now defined in Makefile -//#define QUEUE_NONBLOCK -//#define QUEUE_MUTEX /* if you want to see all new calls in syslog enable DEBUG_INVITE */ //#define DEBUG_INVITE @@ -162,4 +158,13 @@ bool is_terminating() { } bool is_terminating_without_error(); +bool is_read_from_file(); +bool is_read_from_file_simple(); +bool is_read_from_file_by_pb(); +bool is_enable_packetbuffer(); +bool is_enable_rtp_threads(); +bool is_enable_cleanspool(); +bool is_receiver(); +bool is_sender(); + #endif