From d3fdcb12b1c9fbea016da0c8143b60c482954aec Mon Sep 17 00:00:00 2001 From: Radek Bucek Date: Mon, 2 Sep 2013 11:27:19 +0000 Subject: [PATCH] add: - convert from dlt ssl to en10 for pcap save (via configuration parameter convert_dlt_sll_to_en10) - tcpreassembly & http - part 1 (uncomplete) --- .gitattributes | 4 + http.cpp | 331 +++++++++ http.h | 20 + pcap_queue.cpp | 64 +- pcap_queue_block.h | 69 +- skinny.cpp | 18 +- sniff.cpp | 91 ++- sql_db.cpp | 16 +- sql_db.h | 5 +- tcpreassembly.cpp | 1652 ++++++++++++++++++++++++++++++++++++++++++++ tcpreassembly.h | 567 +++++++++++++++ tools.h | 14 + voipmonitor.cpp | 108 ++- voipmonitor.h | 1 + 14 files changed, 2860 insertions(+), 100 deletions(-) create mode 100644 http.cpp create mode 100644 http.h create mode 100644 tcpreassembly.cpp create mode 100644 tcpreassembly.h diff --git a/.gitattributes b/.gitattributes index dc4a050d6..ed1cac1cc 100644 --- a/.gitattributes +++ b/.gitattributes @@ -60,6 +60,8 @@ gzstream/test_gzip.C -text gzstream/version -text /hash.cpp -text /hash.h -text +/http.cpp -text +/http.h -text /install-sh -text /ipaccount.cpp -text /ipaccount.h -text @@ -234,6 +236,8 @@ simpleini/testsi.cpp -text /sniff.h -text /sql_db.cpp -text /sql_db.h -text +/tcpreassembly.cpp -text +/tcpreassembly.h -text /tools.cpp -text /tools.h -text /voipmonitor.cpp -text diff --git a/http.cpp b/http.cpp new file mode 100644 index 000000000..43c4e73da --- /dev/null +++ b/http.cpp @@ -0,0 +1,331 @@ +#include + +#include "http.h" +#include "sql_db.h" + + +using namespace std; + +extern int opt_id_sensor; +extern SqlDb *sqlDb; +extern MySqlStore *sqlStore; + + +void HttpData::processData(u_int32_t ip_src, u_int32_t ip_dst, + u_int16_t port_src, u_int16_t port_dst, + TcpReassemblyData *data) { + string request; + string response; + string expectContinue; + /* + for(size_t i = 0; i < data->request.size(); i++) { + if(i) { + request += "\r\n\r\n---\r\n\r\n"; + } + request += (char*)data->request[i].data; + } + for(size_t i = 0; i < data->response.size(); i++) { + if(i) { + response += "\r\n\r\n---\r\n\r\n"; + } + response += (char*)data->response[i].data; + } + for(size_t i = 0; i < data->expectContinue.size(); i++) { + if(i) { + expectContinue += "\r\n\r\n---\r\n\r\n"; + } + expectContinue += (char*)data->expectContinue[i].data; + } + */ + TcpReassemblyDataItem *request_data; + TcpReassemblyDataItem *response_data; + + string uri; + string http; + string body; + size_t contentLength; + + bool ok = false; + for(size_t i_request = 0; + i_request < /*(data->expectContinue.size() ? */data->request.size() /*: 1)*/; + i_request++) { + + /* + if(strstr(request.c_str(), "POST /jj-api/public/v1/sessions/0-2BLJ4pLJ0KKW18bglZ4-3D_503a622c-a3bb-4222-8424-b45431d8284e/dispatchAction/?ccid=0-2BLJ4pLJ0KKW18bglZ4-3D HTTP/1.1")) { + cout << " -- ***** -- "; + } + */ + if(!data->expectContinue.size() && + strcasestr(request.c_str(), "Expect: 100-continue") && + data->request.size() > i_request) { + request += (char*)data->request[i_request].data; + } else { + request_data = &data->request[i_request]; + request = (char*)request_data->data; + } + + uri = this->getUri(request); + if(!uri.length()) { + continue; + /*delete data; + return;*/ + } + contentLength = atol(this->getTag(request, "Content-Length").c_str()); + if(!contentLength) { + continue; + /*delete data; + return;*/ + } + response_data = NULL; + if(data->response.size()) { + response_data = &data->response[0]; + response = (char*)response_data->data; + } + if(data->expectContinue.size()) { + expectContinue = (char*)data->expectContinue[0].data; + if(data->expectContinueResponse.size()) { + response_data = &data->expectContinueResponse[0]; + response = (char*)response_data->data; + } + } + + if(expectContinue.length() && !data->forceAppendExpectContinue) { + body = expectContinue; + } else if(contentLength) { + if(data->forceAppendExpectContinue) { + request += expectContinue; + } + char *pointToBeginBody = (char*)strstr(request.c_str(), "\r\n\r\n"); + if(pointToBeginBody) { + char oldEndChar = *pointToBeginBody; + *pointToBeginBody = 0; + http = request.c_str(); + *pointToBeginBody = oldEndChar; + if(strlen(pointToBeginBody) > 4) { + body = pointToBeginBody + 4; + } + } + } + if(!body.length() || + contentLength != body.length()) { + if(body.length() == contentLength - 1 && + body[0] == '{' && body[body.length() - 1] != '}') { + body += "}"; + } else if(body.length() == contentLength + 1 && + body[0] == '{' && body[body.length()] == '}') { + // OK + } else { + continue; + /*delete data; + return;*/ + } + } + ok = true; + break; + + } + if(!ok) { + delete data; + return; + } + + + if(!http.length()) { + http = request; + } + string externalTransactionId = this->getTag(request, "External-Transaction-Id"); + string sessionid = this->getUriValue(uri, "jajahsessionid"); + if(!sessionid.length() && body.length()) { + sessionid = this->getJsonValue(body, "variable_jjSessionId"); + } + if(!sessionid.length()) { + sessionid = this->getUriPathValue(uri, "sessions"); + } + string callid; + if(body.length()) { + callid = this->getJsonValue(body, "variable_sip_call_id"); + } + if(externalTransactionId.length() || sessionid.length() || callid.length()) { + string queryInsert; + static int saveCounter; + cout << "SAVE " << (++saveCounter) << " time: " << sqlDateTimeString(request_data->time.tv_sec) + << (response.length() ? " with response" : "") + << endl; + SqlDb_row rowRequest; + rowRequest.add(sqlDateTimeString(request_data->time.tv_sec), "timestamp"); + rowRequest.add(request_data->time.tv_usec, "usec"); + rowRequest.add(htonl(ip_src), "srcip"); + rowRequest.add(htonl(ip_dst), "dstip"); + rowRequest.add(sqlEscapeString(uri), "url"); + rowRequest.add((const char*)NULL, "type"); + rowRequest.add(sqlEscapeString(http), "http"); + rowRequest.add(sqlEscapeString(body).c_str(), "body"); + rowRequest.add(sqlEscapeString(callid).c_str(), "callid"); + rowRequest.add(sqlEscapeString(sessionid).c_str(), "sessid"); + rowRequest.add(sqlEscapeString(externalTransactionId).c_str(), "external_transaction_id"); + rowRequest.add(opt_id_sensor > 0 ? opt_id_sensor : 0, "id_sensor", opt_id_sensor <= 0); + queryInsert = sqlDb->insertQuery("http_jj", rowRequest); + if(response.length()) { + queryInsert += ";\n"; + queryInsert += "set @http_jj_id = last_insert_id();\n"; + size_t responseContentLength = atol(this->getTag(response, "Content-Length").c_str()); + string responseHttp; + string responseBody; + if(responseContentLength) { + char *pointToBeginBody = (char*)strstr(response.c_str(), "\r\n\r\n"); + if(pointToBeginBody) { + char oldEndChar = *pointToBeginBody; + *pointToBeginBody = 0; + responseHttp = response.c_str(); + *pointToBeginBody = oldEndChar; + if(strlen(pointToBeginBody) > 4) { + responseBody = pointToBeginBody + 4; + } + } + } + if(!responseHttp.length()) { + responseHttp = response; + } + SqlDb_row rowRequest; + rowRequest.add("_\\_'SQL'_\\_:@http_jj_id", "master_id"); + rowRequest.add(sqlDateTimeString(response_data->time.tv_sec), "timestamp"); + rowRequest.add(response_data->time.tv_usec, "usec"); + rowRequest.add(htonl(ip_dst), "srcip"); + rowRequest.add(htonl(ip_src), "dstip"); + rowRequest.add("", "url"); + rowRequest.add("http_ok", "type"); + rowRequest.add(sqlEscapeString(responseHttp), "http"); + rowRequest.add(sqlEscapeString(responseBody).c_str(), "body"); + rowRequest.add("", "callid"); + rowRequest.add("", "sessid"); + rowRequest.add("", "external_transaction_id"); + rowRequest.add(opt_id_sensor > 0 ? opt_id_sensor : 0, "id_sensor", opt_id_sensor <= 0); + queryInsert += sqlDb->insertQuery("http_jj", rowRequest, true); + } + sqlStore->lock(STORE_PROC_ID_HTTP); + sqlStore->query(queryInsert.c_str(), STORE_PROC_ID_HTTP); + sqlStore->unlock(STORE_PROC_ID_HTTP); + } + delete data; +} + +string HttpData::getUri(string &request) { + const char *requestTypes[] = { "POST", "GET", "HEADER" }; + int requestTypeIndex = -1; + for(int i = 0; i < (int)(sizeof(requestTypes) / sizeof(requestTypes[0])); i++) { + if(!strncmp(request.c_str(), requestTypes[i], strlen(requestTypes[i])) && + request.c_str()[strlen(requestTypes[i])] == ' ') { + requestTypeIndex = i; + break; + } + } + if(requestTypeIndex >= 0) { + char *pointToBeginUri = (char*)(request.c_str() + strlen(requestTypes[requestTypeIndex]) + 1); + char *pointToEndUri = strstr(pointToBeginUri, "\r\n"); + if(pointToEndUri) { + char *pointToBeginHttpType = strstr(pointToBeginUri, " HTTP"); + if(pointToBeginHttpType && pointToBeginHttpType < pointToEndUri) { + pointToEndUri = pointToBeginHttpType; + } + char oldEndChar = *pointToEndUri; + *pointToEndUri = 0; + string rslt = pointToBeginUri; + *pointToEndUri = oldEndChar; + return(rslt); + } + } + return(""); +} + +string HttpData::getUriValue(string &uri, const char *valueName) { + char *pointToBeginName = (char*)strcasestr(uri.c_str(), valueName); + if(pointToBeginName) { + char *pointToBeginValue = pointToBeginName + strlen(valueName); + if(*pointToBeginValue == '=') { + ++pointToBeginValue; + } + char *pointToEndValue = (char*)(uri.c_str() + uri.length()); + char *pointToEndValue2 = strchr(pointToBeginValue, '&'); + if(pointToEndValue2) { + pointToEndValue = pointToEndValue2; + } + if(pointToEndValue > pointToBeginValue) { + char oldEndChar = *pointToEndValue; + *pointToEndValue = 0; + string rslt = pointToBeginValue; + *pointToEndValue = oldEndChar; + return(rslt); + } + } + return(""); +} + +string HttpData::getUriPathValue(string &uri, const char *valueName) { + char *pointToBeginName = (char*)strcasestr(uri.c_str(), valueName); + if(pointToBeginName) { + char *pointToBeginValue = pointToBeginName + strlen(valueName); + while(*pointToBeginValue == '/') { + ++pointToBeginValue; + } + char *pointToEndValue = (char*)(uri.c_str() + uri.length()); + char *pointToEndValue2 = strchr(pointToBeginValue, '/'); + if(pointToEndValue2) { + pointToEndValue = pointToEndValue2; + } + if(pointToEndValue > pointToBeginValue) { + char oldEndChar = *pointToEndValue; + *pointToEndValue = 0; + string rslt = pointToBeginValue; + *pointToEndValue = oldEndChar; + return(rslt); + } + } + return(""); +} + +string HttpData::getTag(string &data, const char *tag) { + char *pointToBeginTag = (char*)strcasestr(data.c_str(), tag); + if(pointToBeginTag) { + char *pointToEndValue = strstr(pointToBeginTag, "\r\n"); + if(pointToEndValue) { + char *pointToBeginValue = pointToBeginTag + strlen(tag); + while(pointToBeginValue < pointToEndValue && + (*pointToBeginValue == ':' || *pointToBeginValue == ' ')) { + ++pointToBeginValue; + } + while(pointToEndValue > pointToBeginValue && + *pointToEndValue == ' ') { + --pointToEndValue; + } + if(pointToEndValue > pointToBeginValue) { + char oldEndChar = *pointToEndValue; + *pointToEndValue = 0; + string rslt = pointToBeginValue; + *pointToEndValue = oldEndChar; + return(rslt); + } + } + } + return(""); +} + +string HttpData::getJsonValue(string &data, const char *valueName) { + string valueNameWithQuot = string("\"") + valueName + "\""; + char *pointToBeginName = (char*)strcasestr(data.c_str(), valueNameWithQuot.c_str()); + if(pointToBeginName) { + char *pointToBeginValue = pointToBeginName + strlen(valueName) + 2; + while(*pointToBeginValue == ' ' || *pointToBeginValue == '\t' || + *pointToBeginValue == ':' || *pointToBeginValue == '"') { + ++pointToBeginValue; + } + char *pointToEndValue = strchr(pointToBeginValue, '"'); + if(pointToEndValue && pointToEndValue > pointToBeginValue) { + char oldEndChar = *pointToEndValue; + *pointToEndValue = 0; + string rslt = pointToBeginValue; + *pointToEndValue = oldEndChar; + return(rslt); + } + } + return(""); +} \ No newline at end of file diff --git a/http.h b/http.h new file mode 100644 index 000000000..4a4cc84d5 --- /dev/null +++ b/http.h @@ -0,0 +1,20 @@ +#ifndef HTTP_H +#define HTTP_H + +#include "pcap_queue_block.h" +#include "tcpreassembly.h" + + +class HttpData : public TcpReassemblyProcessData { +public: + void processData(u_int32_t ip_src, u_int32_t ip_dst, + u_int16_t port_src, u_int16_t port_dst, + TcpReassemblyData *data); + string getUri(string &request); + string getUriValue(string &uri, const char *valueName); + string getUriPathValue(string &uri, const char *valueName); + string getTag(string &data, const char *tag); + string getJsonValue(string &data, const char *valueName); +}; + +#endif diff --git a/pcap_queue.cpp b/pcap_queue.cpp index bd4054c20..b19308da4 100644 --- a/pcap_queue.cpp +++ b/pcap_queue.cpp @@ -21,6 +21,7 @@ #include "mirrorip.h" #include "ipaccount.h" #include "filter_mysql.h" +#include "tcpreassembly.h" #define TEST_DEBUG_PARAMS 0 @@ -54,8 +55,6 @@ #define MAX_TCPSTREAMS 1024 #define FILE_BUFFER_SIZE 1000000 -//#define TEST_PCAP_FILE "/mnt/www_JX/voipmonitor/pcaps/3.pcap" - using namespace std; @@ -74,9 +73,11 @@ extern int opt_dup_check; extern int opt_mirrorip; extern char opt_mirrorip_src[20]; extern char opt_mirrorip_dst[20]; +extern int opt_enable_tcpreassembly; extern pcap_t *handle; extern char *sipportmatrix; +extern char *httpportmatrix; extern unsigned int duplicate_counter; extern struct tcp_stream2_t *tcp_streams_hashed[MAX_TCPSTREAMS]; extern MirrorIP *mirrorip; @@ -89,6 +90,8 @@ extern int telnumfilter_reload_do; extern char user_filter[2048]; extern Calltable *calltable; extern volatile int calls; +extern TcpReassembly *tcpReassembly; +extern char opt_pb_read_from_file[256]; extern int pcap_dlink; void *_PcapQueue_threadFunction(void* arg); @@ -1132,7 +1135,7 @@ PcapQueue_readFromInterface::PcapQueue_readFromInterface(const char *nameQueue) // CONFIG extern int opt_promisc; extern int opt_ringbuffer; - this->pcap_snaplen = 3200; + this->pcap_snaplen = opt_enable_tcpreassembly ? 5000 : 3200; this->pcap_promisc = opt_promisc; this->pcap_timeout = 1000; this->pcap_buffer_size = opt_ringbuffer * 1024 * 1024; @@ -1193,7 +1196,7 @@ void* PcapQueue_readFromInterface::threadFunction(void* ) { while(!TERMINATING) { res = this->pcap_next_ex(this->pcapHandle, &header, &packet); if(res == -1) { - #ifdef TEST_PCAP_FILE + if(opt_pb_read_from_file[0]) { blockStoreBypassQueue.push(blockStore); ++sumBlocksCounterIn[0]; blockStore = NULL; @@ -1201,7 +1204,7 @@ void* PcapQueue_readFromInterface::threadFunction(void* ) { calltable->cleanup(0); this->pcapStat(); terminating = 1; - #endif + } break; } else if(res == 0) { continue; @@ -1318,11 +1321,12 @@ bool PcapQueue_readFromInterface::openFifoForWrite() { bool PcapQueue_readFromInterface::startCapture() { char errbuf[PCAP_ERRBUF_SIZE]; - #ifdef TEST_PCAP_FILE - this->pcapHandle = pcap_open_offline(TEST_PCAP_FILE, errbuf); - this->pcapLinklayerHeaderType = pcap_datalink(this->pcapHandle);; + if(opt_pb_read_from_file[0]) { + this->pcapHandle = pcap_open_offline(opt_pb_read_from_file, errbuf); + this->pcapLinklayerHeaderType = pcap_datalink(this->pcapHandle); handle = this->pcapHandle; - #else + return(true); + } if(VERBOSE) { syslog(LOG_NOTICE, "packetbuffer %s: capturing on interface %s", this->nameQueue.c_str(), this->interfaceName.c_str()); } @@ -1391,7 +1395,6 @@ bool PcapQueue_readFromInterface::startCapture() { sprintf(pname, "/var/spool/voipmonitor/voipmonitordump-%u.pcap", (unsigned int)time(NULL)); this->pcapDumpHandle = pcap_dump_open(this->pcapHandle, pname); } - #endif return(true); } @@ -1453,7 +1456,6 @@ void PcapQueue_readFromInterface::initStat_interface() { int PcapQueue_readFromInterface::pcapProcess(pcap_pkthdr** header, u_char** packet, bool *destroy) { *destroy = false; - switch(this->pcapLinklayerHeaderType) { case DLT_LINUX_SLL: ppd.header_sll = (sll_header*)*packet; @@ -1562,8 +1564,9 @@ int PcapQueue_readFromInterface::pcapProcess(pcap_pkthdr** header, u_char** pack ppd.data = (char*) ppd.header_tcp + (ppd.header_tcp->doff * 4); ppd.datalen = (int)((*header)->caplen - ((unsigned long) ppd.data - (unsigned long) *packet)); //if (datalen == 0 || !(sipportmatrix[htons(header_tcp->source)] || sipportmatrix[htons(header_tcp->dest)])) { - if (!(sipportmatrix[htons(ppd.header_tcp->source)] || sipportmatrix[htons(ppd.header_tcp->dest)]) - and !(opt_skinny && (htons(ppd.header_tcp->source) == 2000 || htons(ppd.header_tcp->dest) == 2000))) { + if (!(sipportmatrix[htons(ppd.header_tcp->source)] || sipportmatrix[htons(ppd.header_tcp->dest)]) && + !(opt_enable_tcpreassembly && (httpportmatrix[htons(ppd.header_tcp->source)] || httpportmatrix[htons(ppd.header_tcp->dest)])) && + !(opt_skinny && (htons(ppd.header_tcp->source) == 2000 || htons(ppd.header_tcp->dest) == 2000))) { // not interested in TCP packet other than SIP port if(opt_ipaccount == 0 && !DEBUG_ALL_PACKETS) { return(0); @@ -1584,7 +1587,8 @@ int PcapQueue_readFromInterface::pcapProcess(pcap_pkthdr** header, u_char** pack } /* check for duplicate packets (md5 is expensive operation - enable only if you really need it */ - if(ppd.datalen > 0 && opt_dup_check && ppd.prevmd5s != NULL && (ppd.traillen < ppd.datalen)) { + if(ppd.datalen > 0 && opt_dup_check && ppd.prevmd5s != NULL && (ppd.traillen < ppd.datalen) && + !(opt_enable_tcpreassembly && (httpportmatrix[htons(ppd.header_tcp->source)] || httpportmatrix[htons(ppd.header_tcp->dest)]))) { MD5_Init(&ppd.ctx); MD5_Update(&ppd.ctx, ppd.data, MAX(0, (unsigned long)ppd.datalen - ppd.traillen)); MD5_Final((unsigned char*)ppd.md5, &ppd.ctx); @@ -2105,6 +2109,7 @@ void PcapQueue_readFromFifo::processPacket(pcap_pkthdr_plus *header_plus, u_char int datalen; int istcp = 0; int was_rtp; + bool useTcpReassembly = false; pcap_pkthdr *header = header_plus->convertToStdHeader(); @@ -2140,14 +2145,19 @@ void PcapQueue_readFromFifo::processPacket(pcap_pkthdr_plus *header_plus, u_char datalen = (int)(header->caplen - ((u_char*)data - packet)); istcp = 0; } else if (header_ip->protocol == IPPROTO_TCP) { - istcp = 1; - // prepare packet pointers header_tcp = (tcphdr*) ((char *) header_ip + sizeof(*header_ip)); - data = (char *) header_tcp + (header_tcp->doff * 4); - datalen = (int)(header->caplen - ((u_char*)data - packet)); - - header_udp->source = header_tcp->source; - header_udp->dest = header_tcp->dest; + if(opt_enable_tcpreassembly && (httpportmatrix[htons(header_tcp->source)] || httpportmatrix[htons(header_tcp->dest)])) { + tcpReassembly->push(header, header_ip, packet, + block_store, block_store_index); + useTcpReassembly = true; + } else { + istcp = 1; + // prepare packet pointers + data = (char *) header_tcp + (header_tcp->doff * 4); + datalen = (int)(header->caplen - ((u_char*)data - packet)); + header_udp->source = header_tcp->source; + header_udp->dest = header_tcp->dest; + } } else { //packet is not UDP and is not TCP, we are not interested, go to the next packet // - interested only for ipaccount @@ -2161,9 +2171,11 @@ void PcapQueue_readFromFifo::processPacket(pcap_pkthdr_plus *header_plus, u_char mirrorip->send((char *)header_ip, (int)(header->caplen - ((u_char*)header_ip - packet))); } int voippacket = 0; - process_packet(header_ip->saddr, htons(header_udp->source), header_ip->daddr, htons(header_udp->dest), - data, datalen, this->getPcapHandle(), header, packet, istcp, 0, 1, &was_rtp, header_ip, &voippacket, 0, - block_store, block_store_index); + if(!useTcpReassembly || opt_enable_tcpreassembly != 2) { + process_packet(header_ip->saddr, htons(header_udp->source), header_ip->daddr, htons(header_udp->dest), + data, datalen, this->getPcapHandle(), header, packet, istcp, 0, 1, &was_rtp, header_ip, &voippacket, 0, + block_store, block_store_index); + } // if packet was VoIP add it to ipaccount if(opt_ipaccount) { @@ -2173,6 +2185,10 @@ void PcapQueue_readFromFifo::processPacket(pcap_pkthdr_plus *header_plus, u_char } void PcapQueue_readFromFifo::cleanupBlockStoreTrash(bool all) { + if(all && opt_enable_tcpreassembly && opt_pb_read_from_file[0]) { + this->cleanupBlockStoreTrash(); + cout << "COUNT REST BLOCKS: " << this->blockStoreTrash.size() << endl; + } for(size_t i = 0; i < this->blockStoreTrash.size(); i++) { if(all || this->blockStoreTrash[i]->enableDestroy()) { this->blockStoreTrash_size -= this->blockStoreTrash[i]->getUseSize(); diff --git a/pcap_queue_block.h b/pcap_queue_block.h index d50c9698d..8372c3313 100644 --- a/pcap_queue_block.h +++ b/pcap_queue_block.h @@ -2,6 +2,7 @@ #define PCAP_QUEUE_BLOCK_H +#include #include #include #include @@ -11,6 +12,8 @@ #define PCAP_BLOCK_STORE_HEADER_STRING_LEN 16 +extern int opt_enable_tcpreassembly; + u_long getTimeMS(); unsigned long long getTimeNS(); @@ -84,19 +87,15 @@ struct pcap_block_store { this->idFileStore = 0; this->filePosition = 0; this->timestampMS = getTimeMS(); - /* this->packet_lock = NULL; - */ this->_sync_packet_lock = 0; } ~pcap_block_store() { this->destroy(); this->destroyRestoreBuffer(); - /* if(this->packet_lock) { free(this->packet_lock); } - */ } inline bool add(pcap_pkthdr *header, u_char *packet, int offset = -1); inline bool add(pcap_pkthdr_plus *header, u_char *packet); @@ -132,48 +131,52 @@ struct pcap_block_store { bool compress(); bool uncompress(); void lock_packet(int index) { - __sync_add_and_fetch(&this->_sync_packet_lock, 1); - /* - this->lock_sync_packet_lock(); - if(!this->packet_lock) { - this->packet_lock = (bool*)calloc(this->count, sizeof(bool)); + if(opt_enable_tcpreassembly) { + this->lock_sync_packet_lock(); + if(!this->packet_lock) { + this->packet_lock = (bool*)calloc(this->count, sizeof(bool)); + } + this->packet_lock[index] = true; + this->unlock_sync_packet_lock(); + } else { + __sync_add_and_fetch(&this->_sync_packet_lock, 1); } - this->packet_lock[index] = true; - this->unlock_sync_packet_lock(); - */ + } void unlock_packet(int index) { - __sync_sub_and_fetch(&this->_sync_packet_lock, 1); - /* - this->lock_sync_packet_lock(); - if(this->packet_lock) { - this->packet_lock[index] = false; + if(opt_enable_tcpreassembly) { + this->lock_sync_packet_lock(); + if(this->packet_lock) { + this->packet_lock[index] = false; + } + this->unlock_sync_packet_lock(); + } else { + __sync_sub_and_fetch(&this->_sync_packet_lock, 1); } - this->unlock_sync_packet_lock(); - */ } bool enableDestroy() { - return(this->_sync_packet_lock == 0); - /* - bool enableDestroy = true; - this->lock_sync_packet_lock(); - bool checkLock = true; - if(this->packet_lock && - memmem(this->packet_lock, this->count * sizeof(bool), &checkLock, sizeof(bool))) { - enableDestroy = false; + if(opt_enable_tcpreassembly) { + bool enableDestroy = true; + this->lock_sync_packet_lock(); + bool checkLock = true; + if(this->packet_lock && + memmem(this->packet_lock, this->count * sizeof(bool), &checkLock, sizeof(bool))) { + enableDestroy = false; + } + this->unlock_sync_packet_lock(); + return(enableDestroy); + } else { + return(this->_sync_packet_lock == 0); } - this->unlock_sync_packet_lock(); - return(enableDestroy); - */ } - /* + void lock_sync_packet_lock() { while(__sync_lock_test_and_set(&this->_sync_packet_lock, 1)); } void unlock_sync_packet_lock() { __sync_lock_release(&this->_sync_packet_lock); } - */ + // uint32_t *offsets; u_char *block; @@ -188,7 +191,7 @@ struct pcap_block_store { u_int idFileStore; u_long filePosition; u_long timestampMS; - //bool *packet_lock; + bool *packet_lock; volatile int _sync_packet_lock; }; diff --git a/skinny.cpp b/skinny.cpp index c007e8b36..fd27b072e 100644 --- a/skinny.cpp +++ b/skinny.cpp @@ -47,8 +47,10 @@ extern int terminating; extern int opt_rtp_firstleg; extern int opt_sip_register; extern int opt_norecord_header; +extern int opt_convert_dlt_sll_to_en10; extern char *sipportmatrix; extern pcap_t *handle; +extern pcap_t *handle_dead_EN10MB; extern read_thread *threads; extern int opt_norecord_dtmf; extern int opt_onlyRTPheader; @@ -1192,6 +1194,10 @@ struct skinny_container { void *data; }; +#define ENABLE_CONVERT_DLT_SLL_TO_EN10 (pcap_dlink ==DLT_LINUX_SLL && opt_convert_dlt_sll_to_en10 && handle_dead_EN10MB) +#define HANDLE_FOR_PCAP_SAVE (ENABLE_CONVERT_DLT_SLL_TO_EN10 ? handle_dead_EN10MB : handle) + + Call *new_skinny_channel(int state, char *data, int datalen, struct pcap_pkthdr *header, char *callidstr, u_int32_t saddr, u_int32_t daddr, int source, char *s, long unsigned int l){ if(opt_callslimit != 0 and opt_callslimit > calls) { if(verbosity > 0) @@ -1269,9 +1275,9 @@ Call *new_skinny_channel(int state, char *data, int datalen, struct pcap_pkthdr } call->sip_pcapfilename = call->dirname() + (opt_newdir ? "/SKINNY" : "") + "/" + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_fsip_pcap(pcap_dump_open(handle, str2)); + call->set_fsip_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_fsip_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1291,9 +1297,9 @@ Call *new_skinny_channel(int state, char *data, int datalen, struct pcap_pkthdr } call->rtp_pcapfilename = call->dirname() + (opt_newdir ? "/RTP" : "") + "/" + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_frtp_pcap(pcap_dump_open(handle, str2)); + call->set_frtp_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_frtp_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1313,9 +1319,9 @@ Call *new_skinny_channel(int state, char *data, int datalen, struct pcap_pkthdr } call->pcapfilename = call->dirname() + (opt_newdir ? "/ALL/" : "/") + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_f_pcap(pcap_dump_open(handle, str2)); + call->set_f_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_f_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); diff --git a/sniff.cpp b/sniff.cpp index acaca6b8f..d44e39494 100644 --- a/sniff.cpp +++ b/sniff.cpp @@ -56,6 +56,7 @@ and insert them into Call class. #include "sql_db.h" #include "rtp.h" #include "skinny.h" +#include "tcpreassembly.h" extern MirrorIP *mirrorip; @@ -102,8 +103,12 @@ extern int terminating; extern int opt_rtp_firstleg; extern int opt_sip_register; extern int opt_norecord_header; +extern int opt_enable_tcpreassembly; +extern int opt_convert_dlt_sll_to_en10; extern char *sipportmatrix; +extern char *httpportmatrix; extern pcap_t *handle; +extern pcap_t *handle_dead_EN10MB; extern read_thread *threads; extern int opt_norecord_dtmf; extern int opt_onlyRTPheader; @@ -156,6 +161,8 @@ extern vector opt_custom_headers_cdr; extern vector opt_custom_headers_message; extern livesnifferfilter_use_siptypes_s livesnifferfilterUseSipTypes; extern int opt_skipdefault; +extern TcpReassembly *tcpReassembly; +extern char ifname[1024]; #ifdef QUEUE_MUTEX extern sem_t readpacket_thread_semaphore; @@ -229,6 +236,9 @@ extern struct queue_state *qs_readpacket_thread_queue; map usersniffer; +#define ENABLE_CONVERT_DLT_SLL_TO_EN10 (pcap_dlink ==DLT_LINUX_SLL && opt_convert_dlt_sll_to_en10 && handle_dead_EN10MB) +#define HANDLE_FOR_PCAP_SAVE (ENABLE_CONVERT_DLT_SLL_TO_EN10 ? handle_dead_EN10MB : handle) + // return IP from nat_aliases[ip] or 0 if not found in_addr_t match_nat_aliases(in_addr_t ip) { nat_aliases_t::iterator iter; @@ -403,6 +413,22 @@ inline void save_live_packet(Call *call, struct pcap_pkthdr *header, const u_cha */ inline void save_packet(Call *call, struct pcap_pkthdr *header, const u_char *packet, unsigned int saddr, int source, unsigned int daddr, int dest, int istcp, char *data, int datalen, int type) { + bool allocPacket = false; + if(ENABLE_CONVERT_DLT_SLL_TO_EN10) { + const u_char *packet_orig = packet; + pcap_pkthdr *header_orig = header; + packet = (const u_char*) new u_char[header_orig->caplen]; + memcpy((u_char*)packet, (u_char*)packet_orig, 14); + memset((u_char*)packet, 0, 6); + ((ether_header*)packet)->ether_type = ((sll_header*)packet_orig)->sll_protocol; + memcpy((u_char*)packet + 14, (u_char*)packet_orig + 16, header_orig->caplen - 16); + header = new pcap_pkthdr; + memcpy(header, header_orig, sizeof(pcap_pkthdr)); + header->caplen -= 2; + header->len -= 2; + allocPacket = true; + } + // check if it should be stored to mysql if(type == TYPE_SIP and global_livesniffer and (sipportmatrix[source] || sipportmatrix[dest])) { save_live_packet(call, header, packet, saddr, source, daddr, dest, istcp, data, datalen, call->type); @@ -437,6 +463,11 @@ inline void save_packet(Call *call, struct pcap_pkthdr *header, const u_char *pa pcap_dump_flush(call->get_f_pcap()); } } + + if(allocPacket) { + delete [] packet; + delete header; + } } int check_sip20(char *data, unsigned long len){ @@ -1324,9 +1355,9 @@ Call *new_invite_register(int sip_method, char *data, int datalen, struct pcap_p call->fname2 = num + header->ts.tv_usec; call->pcapfilename = call->sip_pcapfilename = call->dirname() + (opt_newdir ? "/REG" : "") + "/" + filenamestr + ".pcap"; if(!file_exists(str2)) { - call->set_fsip_pcap(pcap_dump_open(handle, str2)); + call->set_fsip_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_fsip_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1352,9 +1383,9 @@ Call *new_invite_register(int sip_method, char *data, int datalen, struct pcap_p } call->pcapfilename = call->sip_pcapfilename = call->dirname() + (opt_newdir ? "/SIP" : "") + "/" + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_fsip_pcap(pcap_dump_open(handle, str2)); + call->set_fsip_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_fsip_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1372,9 +1403,9 @@ Call *new_invite_register(int sip_method, char *data, int datalen, struct pcap_p } call->rtp_pcapfilename = call->dirname() + (opt_newdir ? "/RTP" : "") + "/" + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_frtp_pcap(pcap_dump_open(handle, str2)); + call->set_frtp_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_frtp_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1392,9 +1423,9 @@ Call *new_invite_register(int sip_method, char *data, int datalen, struct pcap_p } call->pcapfilename = call->dirname() + (opt_newdir ? "/ALL/" : "/") + call->get_fbasename_safe() + ".pcap"; if(!file_exists(str2)) { - call->set_f_pcap(pcap_dump_open(handle, str2)); + call->set_f_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); if(call->get_f_pcap() == NULL) { - syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(handle)); + syslog(LOG_NOTICE,"pcap [%s] cannot be opened: %s\n", str2, pcap_geterr(HANDLE_FOR_PCAP_SAVE)); } if(verbosity > 3) { syslog(LOG_NOTICE,"pcap_filename: [%s]\n", str2); @@ -1547,7 +1578,7 @@ Call *process_packet(unsigned int saddr, int source, unsigned int daddr, int des } return NULL; } - + // check if the packet is SIP ports or SKINNY ports if(sipportmatrix[source] || sipportmatrix[dest]) { @@ -2773,7 +2804,7 @@ Call *process_packet(unsigned int saddr, int source, unsigned int daddr, int des if((call->flags & (FLAG_SAVESIP | FLAG_SAVEREGISTER | FLAG_SAVERTP)) || (call->isfax && opt_saveudptl)) { sprintf(str2, "%s/%s.pcap", call->dirname().c_str(), s); if(!file_exists(str2)) { - call->set_f_pcap(pcap_dump_open(handle, str2)); + call->set_f_pcap(pcap_dump_open(HANDLE_FOR_PCAP_SAVE, str2)); call->pcapfilename = call->dirname() + "/" + call->get_fbasename_safe() + ".pcap"; } else { if(verbosity > 0) { @@ -2970,6 +3001,7 @@ void *pcap_read_thread_func(void *arg) { int res; int was_rtp; unsigned int packets = 0; + bool useTcpReassembly; res = 0; @@ -3027,6 +3059,7 @@ void *pcap_read_thread_func(void *arg) { header_ip = (struct iphdr2 *) ((char*)header_ip + sizeof(iphdr2)); } header_udp = &header_udp_tmp; + useTcpReassembly = false; if (header_ip->protocol == IPPROTO_UDP) { // prepare packet pointers header_udp = (struct udphdr2 *) ((char *) header_ip + sizeof(*header_ip)); @@ -3034,14 +3067,22 @@ void *pcap_read_thread_func(void *arg) { datalen = (int)(pp->header.caplen - ((char*)data - (char*)packet)); istcp = 0; } else if (header_ip->protocol == IPPROTO_TCP) { - istcp = 1; - // prepare packet pointers header_tcp = (struct tcphdr *) ((char *) header_ip + sizeof(*header_ip)); - data = (char *) header_tcp + (header_tcp->doff * 4); - datalen = (int)(pp->header.caplen - ((char*)data - (char*)packet)); - - header_udp->source = header_tcp->source; - header_udp->dest = header_tcp->dest; + // dokončit nezbytné paměťové operace pro udržení obsahu paketu !!!! + // zatím reassemblování v módu bez pb zakázáno + /* + if(opt_enable_tcpreassembly && (httpportmatrix[htons(header_tcp->source)] || httpportmatrix[htons(header_tcp->dest)])) { + tcpReassembly->push(&pp->header, header_ip, packet); + + useTcpReassembly = true; + } else */{ + istcp = 1; + // prepare packet pointers + data = (char *) header_tcp + (header_tcp->doff * 4); + datalen = (int)(pp->header.caplen - ((char*)data - (char*)packet)); + header_udp->source = header_tcp->source; + header_udp->dest = header_tcp->dest; + } } else { //packet is not UDP and is not TCP, we are not interested, go to the next packet // - interested only for ipaccount @@ -3067,9 +3108,11 @@ void *pcap_read_thread_func(void *arg) { mirrorip->send((char *)header_ip, (int)(pp->header.caplen - ((char*)header_ip - (char*)packet))); } int voippacket = 0; - process_packet(header_ip->saddr, htons(header_udp->source), header_ip->daddr, htons(header_udp->dest), - data, datalen, handle, &pp->header, packet, istcp, 0, 1, &was_rtp, header_ip, &voippacket, 0, - NULL, 0); + if(!useTcpReassembly || opt_enable_tcpreassembly != 2) { + process_packet(header_ip->saddr, htons(header_udp->source), header_ip->daddr, htons(header_udp->dest), + data, datalen, handle, &pp->header, packet, istcp, 0, 1, &was_rtp, header_ip, &voippacket, 0, + NULL, 0); + } // if packet was VoIP add it to ipaccount if(opt_ipaccount) { @@ -3513,8 +3556,9 @@ void readdump_libpcap(pcap_t *handle) { data = (char *) header_tcp + (header_tcp->doff * 4); datalen = (int)(header->caplen - ((unsigned long) data - (unsigned long) packet)); //if (datalen == 0 || !(sipportmatrix[htons(header_tcp->source)] || sipportmatrix[htons(header_tcp->dest)])) { - if (!(sipportmatrix[htons(header_tcp->source)] || sipportmatrix[htons(header_tcp->dest)]) - and !(opt_skinny && (htons(header_tcp->source) == 2000 || htons(header_tcp->dest) == 2000))) { + if (!(sipportmatrix[htons(header_tcp->source)] || sipportmatrix[htons(header_tcp->dest)]) && + !(opt_enable_tcpreassembly && (httpportmatrix[htons(header_tcp->source)] || httpportmatrix[htons(header_tcp->dest)])) && + !(opt_skinny && (htons(header_tcp->source) == 2000 || htons(header_tcp->dest) == 2000))) { // not interested in TCP packet other than SIP port if(opt_ipaccount == 0) { if(destroy) { @@ -3547,7 +3591,8 @@ void readdump_libpcap(pcap_t *handle) { } /* check for duplicate packets (md5 is expensive operation - enable only if you really need it */ - if(datalen > 0 and opt_dup_check and prevmd5s != NULL and (traillen < datalen)) { + if(datalen > 0 && opt_dup_check && prevmd5s != NULL && (traillen < datalen) && + !(opt_enable_tcpreassembly && (httpportmatrix[htons(header_tcp->source)] || httpportmatrix[htons(header_tcp->dest)]))) { MD5_Init(&ctx); MD5_Update(&ctx, data, MAX(0, (unsigned long)datalen - traillen)); MD5_Final(md5, &ctx); diff --git a/sql_db.cpp b/sql_db.cpp index 530096c65..8d788be66 100644 --- a/sql_db.cpp +++ b/sql_db.cpp @@ -290,10 +290,10 @@ string SqlDb::_escape(const char *inputString) { return(inputString); } -string SqlDb::insertQuery(string table, SqlDb_row row) { +string SqlDb::insertQuery(string table, SqlDb_row row, bool enableSqlStringInContent) { string query = "INSERT INTO " + table + " ( " + row.implodeFields(this->getFieldSeparator(), this->getFieldBorder()) + - " ) VALUES ( " + row.implodeContent(this->getContentSeparator(), this->getContentBorder(), this->enableSqlStringInContent) + " )"; + " ) VALUES ( " + row.implodeContent(this->getContentSeparator(), this->getContentBorder(), enableSqlStringInContent || this->enableSqlStringInContent) + " )"; return(query); } @@ -859,6 +859,7 @@ void *MySqlStore_process_storing(void *storeProcess_addr) { MySqlStore_process::MySqlStore_process(int id, const char *host, const char *user, const char *password, const char *database) { this->id = id; this->terminated = false; + this->ignoreTerminating = false; this->sqlDb = new SqlDb_mysql(); this->sqlDb->setConnectParameters(host, user, password, database); this->sqlDb->connect(); @@ -923,7 +924,7 @@ void MySqlStore_process::store() { } } } - if(terminating) { + if(terminating && !this->ignoreTerminating) { break; } sleep(1); @@ -939,6 +940,10 @@ void MySqlStore_process::unlock() { pthread_mutex_unlock(&this->lock_mutex); } +void MySqlStore_process::setIgnoreTerminating(bool ignoreTerminating) { + this->ignoreTerminating = ignoreTerminating; +} + MySqlStore::MySqlStore(const char *host, const char *user, const char *password, const char *database) { this->host = host; this->user = user; @@ -968,6 +973,11 @@ void MySqlStore::unlock(int id) { process->unlock(); } +void MySqlStore::setIgnoreTerminating(int id, bool ignoreTerminating) { + MySqlStore_process* process = this->find(id); + process->setIgnoreTerminating(ignoreTerminating); +} + MySqlStore_process *MySqlStore::find(int id) { MySqlStore_process* process = this->processes[id]; if(process) { diff --git a/sql_db.h b/sql_db.h index 93ed0ba8f..3e18052e0 100644 --- a/sql_db.h +++ b/sql_db.h @@ -74,7 +74,7 @@ class SqlDb { virtual bool query(string query) = 0; virtual void prepareQuery(string *query); virtual SqlDb_row fetchRow() = 0; - virtual string insertQuery(string table, SqlDb_row row); + virtual string insertQuery(string table, SqlDb_row row, bool enableSqlStringInContent = false); virtual int insert(string table, SqlDb_row row); virtual int getIdOrInsert(string table, string idField, string uniqueField, SqlDb_row row); virtual int getInsertId() = 0; @@ -268,6 +268,7 @@ class MySqlStore_process { void store(); void lock(); void unlock(); + void setIgnoreTerminating(bool ignoreTerminating); int getId() { return(this->id); } @@ -281,6 +282,7 @@ class MySqlStore_process { SqlDb *sqlDb; queue query_buff; bool terminated; + bool ignoreTerminating; }; class MySqlStore { @@ -290,6 +292,7 @@ class MySqlStore { void query(const char *query_str, int id); void lock(int id); void unlock(int id); + void setIgnoreTerminating(int id, bool ignoreTerminating); MySqlStore_process *find(int id); private: map processes; diff --git a/tcpreassembly.cpp b/tcpreassembly.cpp new file mode 100644 index 000000000..05f41f678 --- /dev/null +++ b/tcpreassembly.cpp @@ -0,0 +1,1652 @@ +#include +#include + +#include "tcpreassembly.h" +#include "sql_db.h" + +using namespace std; + + +#define ENABLE_UNLOCK_PACKET_IN_OK false + +extern char opt_pb_read_from_file[256]; + +bool globalDebug = true; +bool debug_packet = globalDebug && true; +bool debug_rslt = globalDebug && true; +bool debug_data = globalDebug && true; +bool debug_check_ok = globalDebug && true; +bool debug_check_ok_process = globalDebug && true; +u_int16_t debug_counter = 0; +u_int16_t debug_limit_counter = 0; +u_int16_t debug_port = 0; +u_int32_t debug_seq = 0; + + +TcpReassemblyData::~TcpReassemblyData() { + for(size_t i = 0; i < this->request.size(); i++) { + this->request[i].destroy(); + } + for(size_t i = 0; i < this->response.size(); i++) { + this->response[i].destroy(); + } + for(size_t i = 0; i < this->expectContinue.size(); i++) { + this->expectContinue[i].destroy(); + } + for(size_t i = 0; i < this->expectContinueResponse.size(); i++) { + this->expectContinueResponse[i].destroy(); + } + this->forceAppendExpectContinue = false; +} + +bool TcpReassemblyData::isFill() { + return(this->request.size()); +} + + +void TcpReassemblyStream_packet_var::push(TcpReassemblyStream_packet packet) { + map::iterator iter; + iter = this->queue.find(packet.next_seq); + if(iter == this->queue.end()) { + this->queue[packet.next_seq] = packet; + packet.lock_packet(); + } +} + +void TcpReassemblyStream::push(TcpReassemblyStream_packet packet) { + map::iterator iter; + iter = this->queue.find(packet.header_tcp.seq); + if(debug_seq && packet.header_tcp.seq == debug_seq) { + cout << " -- XXX DEBUG SEQ XXX" << endl; + } + this->queue[packet.header_tcp.seq].push(packet); + if(packet.datalen) { + exists_data = true; + } + this->last_packet_at_from_header = packet.time.tv_sec * 1000 + packet.time.tv_usec / 1000; +} + +int TcpReassemblyStream::ok(bool crazySequence, bool enableSimpleCmpMaxNextSeq, u_int32_t maxNextSeq, + bool enableCheckCompleteContent, TcpReassemblyStream *prevHttpStream, bool enableDebug) { + if(this->is_ok) { + return(1); + } + this->cleanPacketsState(); + if(!this->queue.begin()->second.getNextSeqCheck()) { + if(enableDebug) { + cout << " --- ERR - reassembly failed (1)"; + } + return(0); + } + map::iterator iter_var; + int _counter = 0; + bool waitForPsh = this->_only_check_psh ? true : false; + while(true) { + u_int32_t seq = this->ok_packets.size() ? + this->ok_packets.back()[1] : + (crazySequence ? this->min_seq : this->first_seq); + iter_var = this->queue.find(seq); + if(iter_var == this->queue.end()) { + if(!this->ok_packets.size()) { + if(enableDebug) { + cout << " --- ERR - reassembly failed (2)"; + } + return(0); + } else { + this->queue[this->ok_packets.back()[0]].queue[this->ok_packets.back()[1]].state = TcpReassemblyStream_packet::FAIL; + this->ok_packets.pop_back(); + if(enableDebug) { + cout << "<"; + } + } + } else { + u_int32_t next_seq = iter_var->second.getNextSeqCheck(); + if(next_seq) { + this->ok_packets.push_back(d_u_int32_t(iter_var->first, next_seq)); + if(enableCheckCompleteContent) { + this->saveCompleteData(false, true, prevHttpStream); + if(this->http_ok) { + this->is_ok = true; + this->completed = true; + if(ENABLE_UNLOCK_PACKET_IN_OK) { + this->unlockPackets(); + } + this->detect_ok_max_next_seq = next_seq; + return(1); + } else { + this->cleanCompleteData(true); + } + } + this->queue[this->ok_packets.back()[0]].queue[this->ok_packets.back()[1]].state = TcpReassemblyStream_packet::CHECK; + if(enableDebug) { + cout << "-"; + } + if(waitForPsh ? + this->queue[this->ok_packets.back()[0]].queue[this->ok_packets.back()[1]].header_tcp.psh : + ((maxNextSeq && next_seq == maxNextSeq) || + (maxNextSeq && next_seq == maxNextSeq - 1) || + (this->last_seq && next_seq == this->last_seq) || + (this->last_seq && next_seq == this->last_seq - 1) || + (enableSimpleCmpMaxNextSeq && next_seq == this->max_next_seq) || + (!crazySequence && next_seq == this->max_next_seq && next_seq == this->getLastSeqFromNextStream()))) { + if(!this->queue[this->ok_packets.back()[0]].queue[this->ok_packets.back()[1]].header_tcp.psh) { + waitForPsh = true; + } else { + if(!waitForPsh && this->_force_wait_for_next_psh) { + waitForPsh = true; + } else { + this->is_ok = true; + this->saveCompleteData(ENABLE_UNLOCK_PACKET_IN_OK); + if(!this->_force_wait_for_next_psh) { + this->detect_ok_max_next_seq = next_seq; + } + return(1); + } + } + } else if(enableDebug && debug_check_ok_process) { + cout << " " + << "next_seq: " << next_seq << " !== " + << "last_seq: " << (this->last_seq ? this->last_seq : maxNextSeq) + << " "; + } + } else if(this->ok_packets.size()) { + this->queue[this->ok_packets.back()[0]].queue[this->ok_packets.back()[1]].state = TcpReassemblyStream_packet::FAIL; + this->ok_packets.pop_back(); + if(enableDebug) { + cout << "<"; + } + } else { + if(enableDebug) { + cout << " --- ERR - reassembly failed (3)"; + } + return(0); + } + } + if(++_counter > 500) { + break; + } + } + if(enableDebug) { + cout << " --- ERR - reassembly failed (4)"; + } + return(0); +} + +bool TcpReassemblyStream::ok2_ec(u_int32_t nextAck, bool enableDebug) { + map::iterator iter; + iter = this->link->queue_by_ack.find(nextAck); + if(iter == this->link->queue_by_ack.end()) { + return(false); + } + TcpReassemblyStream *nextStream = iter->second; + + if(this->ack == 766596997) { + cout << "-- ***** --"; + } + + nextStream->_only_check_psh = true; + if(!nextStream->ok(true, false, 0, + false, NULL, enableDebug)) { + return(false); + } + this->_force_wait_for_next_psh = true; + if(!this->ok(true, false, this->detect_ok_max_next_seq, + false, NULL, enableDebug)) { + nextStream->is_ok = false; + nextStream->cleanCompleteData(true); + return(false); + } + if(this->checkOkPost(nextStream)) { + this->http_ok_expect_continue_post = true; + nextStream->http_ok_expect_continue_data = true; + return(true); + } else { + nextStream->is_ok = false; + nextStream->cleanCompleteData(true); + this->is_ok = false; + this->cleanCompleteData(true); + return(false); + } + return(false); +} + +u_char *TcpReassemblyStream::complete(u_int32_t *datalen, timeval *time, bool check, bool unlockPackets) { + if(!check && !this->is_ok) { + *datalen = 0; + this->completed = true; + return(NULL); + } + u_char *data = NULL; + *datalen = 0; + time->tv_sec = 0; + time->tv_usec = 0; + u_int32_t databuff_len = 0; + for(size_t i = 0; i < this->ok_packets.size(); i++) { + TcpReassemblyStream_packet packet = this->queue[this->ok_packets[i][0]].queue[this->ok_packets[i][1]]; + if(packet.datalen) { + if(!time->tv_sec) { + *time = packet.time; + } + if(!data) { + databuff_len = max(packet.datalen + 1, 10000u); + data = new u_char[databuff_len]; + + } else if(databuff_len < *datalen + packet.datalen) { + databuff_len = max(*datalen, databuff_len) + max(packet.datalen + 1, 10000u); + u_char* newdata = new u_char[databuff_len]; + memcpy(newdata, data, *datalen); + delete [] data; + data = newdata; + } + memcpy(data + *datalen, packet.data, min(packet.datalen, packet.datacaplen)); + if(packet.datacaplen < packet.datalen) { + memset(data + *datalen + packet.datalen, ' ', packet.datalen - packet.datacaplen); + } + *datalen += packet.datalen; + } + } + if(*datalen) { + data[*datalen] = 0; + } + if(!check) { + this->completed = true; + if(unlockPackets) { + this->unlockPackets(); + } + } + return(data); +} + +bool TcpReassemblyStream::saveCompleteData(bool unlockPackets, bool check, TcpReassemblyStream *prevHttpStream) { + if(this->is_ok || check) { + if(this->complete_data) { + return(true); + } else { + u_char *data; + u_int32_t datalen; + timeval time; + data = this->complete(&datalen, &time, check, unlockPackets); + if(data) { + this->complete_data = new TcpReassemblyDataItem(data, datalen, time); + if(datalen > 5 && !memcmp(data, "POST ", 5)) { + this->http_type = HTTP_TYPE_POST; + } else if(datalen > 4 && !memcmp(data, "GET ", 4)) { + this->http_type = HTTP_TYPE_GET; + } else if(datalen > 5 && !memcmp(data, "HEAD ", 5)) { + this->http_type = HTTP_TYPE_HEAD; + } else if(datalen > 4 && !memcmp(data, "HTTP", 4)) { + this->http_type = HTTP_TYPE_HTTP; + } + this->http_content_length = 0; + this->http_ok = false; + this->http_expect_continue = false; + if(this->http_type) { + char *pointToContentLength = strcasestr((char*)data, "Content-Length:"); + if(pointToContentLength) { + this->http_content_length = atol(pointToContentLength + 15); + } + char *pointToEndHeader = strstr((char*)data, "\r\n\r\n"); + if(pointToEndHeader) { + if(this->http_content_length) { + if(!this->_ignore_expect_continue && + strcasestr((char*)data, "Expect: 100-continue")) { + if(((u_char*)pointToEndHeader - data) + 4 == datalen) { + this->http_ok = true; + } + this->http_expect_continue = true; + } else if(((u_char*)pointToEndHeader - data) + 4 + http_content_length == datalen) { + this->http_ok = true; + } + } else { + if(((u_char*)pointToEndHeader - data) + 4 == datalen) { + this->http_ok = true; + } + } + } + } else if(prevHttpStream && prevHttpStream->http_type == HTTP_TYPE_POST && prevHttpStream->http_expect_continue) { + if(datalen == prevHttpStream->http_content_length) { + this->http_ok = true; + } + } + return(true); + } + } + } + return(false); +} + +void TcpReassemblyStream::cleanCompleteData(bool destroy) { + if(this->complete_data) { + if(destroy) { + this->complete_data->destroy(); + } + delete this->complete_data; + this->complete_data = NULL; + } +} + +TcpReassemblyDataItem TcpReassemblyStream::getCompleteData(bool clean) { + TcpReassemblyDataItem complete_data = *this->complete_data; + if(clean) { + this->cleanCompleteData(); + } + return(complete_data); +} + +void TcpReassemblyStream::unlockPackets() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + this->queue[iter->first].unlockPackets(); + } +} + +void TcpReassemblyStream::printContent(int level) { + map::iterator iter; + int counter = 0; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + cout << fixed + << setw(level * 5) << "" + << setw(3) << (++counter) << " " + << "ack: " << iter->first + << " items: " << iter->second.queue.size() + << endl; + } +} + +bool TcpReassemblyStream::checkOkPost(TcpReassemblyStream *nextStream) { + if(!(this->complete_data && this->complete_data->data && this->complete_data->datalen)) { + return(false); + } + bool rslt = false; + u_int32_t datalen = this->complete_data->datalen; + bool useNextStream = false; + if(nextStream && + nextStream->complete_data && nextStream->complete_data->data && nextStream->complete_data->datalen) { + datalen += nextStream->complete_data->datalen; + useNextStream = true; + } + char *data = new char[datalen + 1]; + memcpy(data, this->complete_data->data, this->complete_data->datalen); + if(useNextStream) { + memcpy(data + this->complete_data->datalen, nextStream->complete_data->data, nextStream->complete_data->datalen); + } + data[datalen] = 0; + if(datalen > 5 && !memcmp(data, "POST ", 5)) { + this->http_type = HTTP_TYPE_POST; + char *pointToContentLength = strcasestr((char*)data, "Content-Length:"); + this->http_content_length = pointToContentLength ? atol(pointToContentLength + 15) : 0; + char *pointToEndHeader = strstr((char*)data, "\r\n\r\n"); + if(pointToEndHeader && + (pointToEndHeader - data) + 4 + this->http_content_length == datalen) { + this->http_ok = true; + rslt = true; + } + } + delete [] data; + return(rslt); + +} + +/* +bool TcpReassemblyStream::checkCompleteContent() { + if(!this->complete_data) { + return(false); + } + u_char *data = this->complete_data->data; + u_int32_t datalen = this->complete_data->datalen; + bool http = (datalen > 5 && !memcmp(data, "POST ", 5)) || + (datalen > 4 && !memcmp(data, "GET ", 4)) || + (datalen > 5 && !memcmp(data, "HEAD ", 5)); + if(http) { + if(!memcmp(data + datalen - 4, "\r\n\r\n", 4)) { + return(true); + } + } + return(false); +} + +bool TcpReassemblyStream::checkContentIsHttpRequest() { + if(!this->complete_data) { + return(false); + } + u_char *data = this->complete_data->data; + u_int32_t datalen = this->complete_data->datalen; + bool http = (datalen > 5 && !memcmp(data, "POST ", 5)) || + (datalen > 4 && !memcmp(data, "GET ", 4)) || + (datalen > 5 && !memcmp(data, "HEAD ", 5)); + if(http) { + return(true); + } + return(false); +} +*/ + +u_int32_t TcpReassemblyStream::getLastSeqFromNextStream() { + TcpReassemblyStream *stream = this->link->findStreamBySeq(this->ack); + if(stream) { + return(stream->ack); + } + return(0); +} + + +bool TcpReassemblyLink::streamIterator::init() { + this->stream = NULL; + this->state = STATE_NA; + if(this->findSynSent()) { + return(true); + } + return(this->findFirstDataToDest()); +} + +bool TcpReassemblyLink::streamIterator::next() { + map::iterator iter; + TcpReassemblyStream *stream; + switch(this->state) { + case STATE_SYN_SENT: + iter = link->queue_flags_by_ack.find(this->stream->min_seq + 1); + if(iter != link->queue_flags_by_ack.end()) { + this->stream = iter->second; + this->state = STATE_SYN_RECV; + return(true); + } else { + return(this->findFirstDataToDest()); + } + break; + case STATE_SYN_RECV: + iter = link->queue_by_ack.find(this->stream->min_seq + 1); + if(iter != link->queue_by_ack.end()) { + this->stream = iter->second; + this->state = STATE_SYN_OK; + return(true); + } else { + return(this->findFirstDataToDest()); + } + break; + case STATE_SYN_OK: + case STATE_SYN_FORCE_OK: + stream = link->findStreamByMinSeq(this->stream->ack); + if(stream && + stream->ack != this->stream->min_seq) { + this->stream = stream; + this->state = STATE_SYN_OK; + return(true); + } + break; + default: + break; + } + return(false); +} + +bool TcpReassemblyLink::streamIterator::nextAckInDirection() { + map::iterator iter; + for(iter = link->queue_by_ack.begin(); iter != link->queue_by_ack.end(); iter++) { + if(iter->second->direction == this->stream->direction && + iter->second->ack > this->stream->ack) { + this->stream = iter->second; + return(true); + } + } + return(false); +} + +bool TcpReassemblyLink::streamIterator::nextAckInReverseDirection() { + map::iterator iter; + for(iter = link->queue_by_ack.begin(); iter != link->queue_by_ack.end(); iter++) { + if(iter->second->direction != this->stream->direction && + iter->second->ack > this->stream->max_next_seq) { + this->stream = iter->second; + return(true); + } + } + return(false); +} + +bool TcpReassemblyLink::streamIterator::nextSeqInDirection() { + TcpReassemblyStream *stream = this->link->findStreamByMinSeq(this->stream->max_next_seq, true); + if(stream && + stream->direction == this->stream->direction) { + this->stream = stream; + return(true); + } + return(false); +} + +void TcpReassemblyLink::streamIterator::print() { + cout << "iterator"; + if(this->stream) { + cout << " ack: " << this->stream->ack + << " state: " << this->state; + } else { + cout << " - no stream"; + } +} + +u_int32_t TcpReassemblyLink::streamIterator::getMaxNextSeq() { + TcpReassemblyStream *stream = link->findStreamByMinSeq(this->stream->ack); + if(stream) { + return(stream->ack); + } + stream = link->findStreamByMinSeq(this->stream->max_next_seq, true, this->stream->ack, this->stream->direction); + if(stream) { + return(stream->min_seq); + } + stream = link->findFlagStreamByAck(this->stream->ack); + if(stream) { + return(stream->min_seq); + } + stream = link->findFinalFlagStreamByAck(this->stream->max_next_seq, + this->stream->direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + TcpReassemblyStream::DIRECTION_TO_SOURCE : + TcpReassemblyStream::DIRECTION_TO_DEST); + if(stream) { + return(this->stream->max_next_seq); + } + stream = link->findFinalFlagStreamBySeq(this->stream->min_seq, this->stream->direction); + if(stream) { + return(stream->min_seq); + } + return(0); +} + +bool TcpReassemblyLink::streamIterator::findSynSent() { + map::iterator iter; + for(iter = link->queue_flags_by_ack.begin(); iter != link->queue_flags_by_ack.end(); iter++) { + if(iter->second->type == TcpReassemblyStream::TYPE_SYN_SENT) { + this->stream = iter->second; + this->state = STATE_SYN_SENT; + return(true); + } + } + return(false); +} + +bool TcpReassemblyLink::streamIterator::findFirstDataToDest() { + map::iterator iter; + for(iter = link->queue_by_ack.begin(); iter != link->queue_by_ack.end(); iter++) { + if(iter->second->direction == TcpReassemblyStream::DIRECTION_TO_DEST && + iter->second->type == TcpReassemblyStream::TYPE_DATA) { + this->stream = iter->second; + this->state = STATE_SYN_FORCE_OK; + return(true); + } + } + return(false); +} + + +TcpReassemblyLink::~TcpReassemblyLink() { + this->lock_queue(); + while(this->queue.size()) { + TcpReassemblyStream *stream = this->queue.front(); + this->queue.pop_front(); + this->queue_by_ack.erase(stream->ack); + if(debug_packet) { + cout << " destroy (" << stream->ack << ")" << endl; + } + delete stream; + } + map::iterator iter; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); ) { + delete iter->second; + this->queue_by_ack.erase(iter++); + } + for(iter = this->queue_flags_by_ack.begin(); iter != this->queue_flags_by_ack.end(); ) { + delete iter->second; + this->queue_flags_by_ack.erase(iter++); + } + for(iter = this->queue_nul_by_ack.begin(); iter != this->queue_nul_by_ack.end(); ) { + delete iter->second; + this->queue_nul_by_ack.erase(iter++); + } + this->unlock_queue(); +} + +bool TcpReassemblyLink::push_normal( + TcpReassemblyStream::eDirection direction, + timeval time, tcphdr header_tcp, + u_char *data, u_int32_t datalen, u_int32_t datacaplen, + pcap_block_store *block_store, int block_store_index) { + bool rslt = false; + switch(this->state) { + case STATE_NA: + if(direction == TcpReassemblyStream::DIRECTION_TO_DEST && + header_tcp.syn && !header_tcp.ack) { + this->first_seq_to_dest = header_tcp.seq + 1; + this->state = STATE_SYN_SENT; + rslt = true; + } + break; + case STATE_SYN_SENT: + if(direction == TcpReassemblyStream::DIRECTION_TO_SOURCE && + header_tcp.syn && header_tcp.ack) { + this->first_seq_to_source = header_tcp.seq + 1; + this->state = STATE_SYN_RECV; + rslt = true; + } + break; + case STATE_SYN_RECV: + if(direction == TcpReassemblyStream::DIRECTION_TO_DEST && + !header_tcp.syn && header_tcp.ack) { + this->state = STATE_SYN_OK; + rslt = true; + } + break; + case STATE_SYN_OK: + case STATE_SYN_FORCE_OK: + if(header_tcp.rst) { + this->rst = true; + this->state = STATE_RESET; + rslt = true; + } + case STATE_RESET: + if(header_tcp.fin) { + if(direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + this->fin_to_dest = true; + this->setLastSeq(TcpReassemblyStream::DIRECTION_TO_SOURCE, + header_tcp.ack_seq); + } else { + this->fin_to_source = true; + this->setLastSeq(TcpReassemblyStream::DIRECTION_TO_SOURCE, + header_tcp.seq); + } + if(this->fin_to_dest && this->fin_to_source) { + this->state = STATE_CLOSE; + } + rslt = true; + } + break; + case STATE_CLOSE: + case STATE_CLOSED: + if(this->rst && header_tcp.fin && + direction == TcpReassemblyStream::DIRECTION_TO_SOURCE) { + this->setLastSeq(TcpReassemblyStream::DIRECTION_TO_SOURCE, + header_tcp.seq); + } + rslt = true; + break; + case STATE_CRAZY: + return(false); + } + if(state == STATE_SYN_OK || + state == STATE_SYN_FORCE_OK || + (state >= STATE_RESET && + !header_tcp.fin && !header_tcp.rst)) { + if(datalen > 0) { + TcpReassemblyStream_packet packet; + packet.time = time; + packet.header_tcp = header_tcp; + packet.next_seq = packet.header_tcp.seq + datalen; + packet.data = data; + packet.datalen = datalen; + packet.datacaplen = datacaplen; + packet.block_store = block_store; + packet.block_store_index = block_store_index; + this->pushpacket(direction, packet); + if(debug_packet) { + cout << " -- DATA" << endl; + } + } else { + if(this->last_ack && header_tcp.ack != this->last_ack) { + TcpReassemblyStream *prevStreamByLastAck = this->queue_by_ack[this->last_ack]; + if(prevStreamByLastAck && !prevStreamByLastAck->last_seq && + prevStreamByLastAck->direction == direction) { + prevStreamByLastAck->last_seq = header_tcp.seq; + } + } + } + rslt = true; + } + if(this->state == STATE_RESET || this->state == STATE_CLOSE) { + if(debug_check_ok && this->queue.size()) { + cout << " "; + } + int rslt_check_ok = this->okQueue(false, debug_check_ok); + if(debug_check_ok && this->queue.size()) { + cout << endl; + } + if(debug_rslt) { + cout << " -- RSLT: "; + if(rslt_check_ok <= 0) { + if(!this->queue.size()) { + cout << "EMPTY"; + } else { + cout << "ERRRRRRRRRRRRRRRROOOOOORRRRRRRRRR"; + if(this->rst) { + cout << " - RST"; + } + } + } else { + cout << "OK"; + } + cout << " " << this->port_src << " / " << this->port_dst; + cout << endl; + } + if(rslt_check_ok > 0) { + this->complete(); + this->state = STATE_CLOSED; + } + } + return(rslt); +} + +bool TcpReassemblyLink::push_crazy( + TcpReassemblyStream::eDirection direction, + timeval time, tcphdr header_tcp, + u_char *data, u_int32_t datalen, u_int32_t datacaplen, + pcap_block_store *block_store, int block_store_index) { + /*if(!(datalen > 0 || + header_tcp.syn || header_tcp.fin || header_tcp.rst)) { + return(false); + }*/ + direction = header_tcp.dest == this->port_dst ? + TcpReassemblyStream::DIRECTION_TO_DEST : + TcpReassemblyStream::DIRECTION_TO_SOURCE; + if(this->direction_confirm < 2) { + TcpReassemblyStream::eDirection checked_direction = direction; + if(this->direction_confirm < 2 && header_tcp.syn) { + if(header_tcp.ack) { + checked_direction = TcpReassemblyStream::DIRECTION_TO_SOURCE; + } else { + checked_direction = TcpReassemblyStream::DIRECTION_TO_DEST; + } + this->direction_confirm = 2; + } + if(!this->direction_confirm && + ((datalen > 5 && !memcmp(data, "POST ", 5)) || + (datalen > 4 && !memcmp(data, "GET ", 4)) || + (datalen > 5 && !memcmp(data, "HEAD ", 5)))) { + checked_direction = TcpReassemblyStream::DIRECTION_TO_DEST; + this->direction_confirm = 1; + } + if(checked_direction != direction) { + direction = checked_direction; + this->switchDirection(); + } + } + TcpReassemblyStream_packet packet; + packet.time = time; + packet.header_tcp = header_tcp; + packet.next_seq = packet.header_tcp.seq + datalen; + packet.data = data; + packet.datalen = datalen; + packet.datacaplen = datacaplen; + packet.block_store = block_store; + packet.block_store_index = block_store_index; + TcpReassemblyStream *stream; + map::iterator iter; + this->lock_queue(); + for(int i = 0; i < 3; i++) { + if(i == 0 ? datalen > 0 : + i == 1 ? header_tcp.syn || header_tcp.fin || header_tcp.rst : + datalen == 0 && !(header_tcp.syn || header_tcp.fin || header_tcp.rst)) { + map *queue = i == 0 ? &this->queue_by_ack : + i == 1 ? &this->queue_flags_by_ack : + &this->queue_nul_by_ack; + iter = queue->find(packet.header_tcp.ack_seq); + if(iter == queue->end()) { + stream = new TcpReassemblyStream(this); + stream->direction = direction; + stream->ack = packet.header_tcp.ack_seq; + if(i == 1) { + stream->type = header_tcp.syn ? (header_tcp.ack ? + TcpReassemblyStream::TYPE_SYN_RECV : + TcpReassemblyStream::TYPE_SYN_SENT) : + header_tcp.fin ? TcpReassemblyStream::TYPE_FIN : + TcpReassemblyStream::TYPE_RST; + } + (*queue)[stream->ack] = stream; + if(header_tcp.rst) { + this->rst = true; + } + if(header_tcp.fin) { + if(direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + this->fin_to_dest = true; + } else { + this->fin_to_source = true; + } + } + } else { + stream = iter->second; + } + stream->push(packet); + if(!stream->min_seq || + packet.header_tcp.seq < stream->min_seq) { + stream->min_seq = packet.header_tcp.seq; + } + if(packet.next_seq > stream->max_next_seq) { + stream->max_next_seq = packet.next_seq; + } + } + } + this->unlock_queue(); + this->last_packet_at = getTimeMS(); + this->last_packet_at_from_header = time.tv_sec * 1000 + time.tv_usec / 1000; + if(!this->created_at_from_header) { + this->created_at_from_header = this->last_packet_at_from_header; + } + if((this->rst || this->fin_to_dest || this->fin_to_source) && + !this->link_is_ok) { + bool _cout = false; + if(this->exists_data) { + int countDataStream = this->okQueue(false, debug_check_ok); + if(countDataStream > 1) { + this->complete(false, true); + if(debug_rslt) { + cout << "RSLT: OK (" << countDataStream << ")"; + _cout = true; + } + this->link_is_ok = 1; + // - 1 - prošlo tímto + // - 2 - není už co k vyřízení - zatím se nastavuje jen po complete all + } + } + if(_cout) { + if(debug_packet) { + in_addr ip; + ip.s_addr = this->ip_src; + string ip_src = inet_ntoa(ip); + ip.s_addr = this->ip_dst; + string ip_dst = inet_ntoa(ip); + cout << " / " + << ip_src << " / " << this->port_src + << " -> " + << ip_dst << " / " << this->port_dst; + } + cout << endl; + } + } + return(true); +} + +void TcpReassemblyLink::pushpacket(TcpReassemblyStream::eDirection direction, + TcpReassemblyStream_packet packet) { + TcpReassemblyStream *stream; + map::iterator iter; + this->lock_queue(); + iter = this->queue_by_ack.find(packet.header_tcp.ack_seq); + if(iter == this->queue_by_ack.end()) { + TcpReassemblyStream *prevStreamByLastAck = NULL; + if(this->queue.size()) { + prevStreamByLastAck = this->queue_by_ack[this->last_ack]; + } + stream = new TcpReassemblyStream(this); + stream->direction = direction; + stream->ack = packet.header_tcp.ack_seq; + if(prevStreamByLastAck && direction == prevStreamByLastAck->direction) { + prevStreamByLastAck->last_seq = packet.header_tcp.seq; + stream->first_seq = prevStreamByLastAck->last_seq; + } else { + stream->first_seq = this->queue.size() ? + prevStreamByLastAck->ack : + (direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + this->first_seq_to_dest : + this->first_seq_to_source); + this->setLastSeq(direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + TcpReassemblyStream::DIRECTION_TO_SOURCE : + TcpReassemblyStream::DIRECTION_TO_DEST, + packet.header_tcp.ack_seq); + } + this->queue_by_ack[stream->ack] = stream; + this->queue.push_back(stream); + if(debug_packet) { + cout << " -- NEW STREAM (" << stream->ack << ")" + << " - first_seq: " << stream->first_seq + << endl; + } + } else { + stream = iter->second; + } + stream->push(packet); + if(!stream->min_seq || + packet.header_tcp.seq < stream->min_seq) { + stream->min_seq = packet.header_tcp.seq; + } + if(packet.next_seq > stream->max_next_seq) { + stream->max_next_seq = packet.next_seq; + } + this->last_ack = stream->ack; + this->unlock_queue(); + this->last_packet_at = getTimeMS(); +} + +void TcpReassemblyLink::printContent(int level) { + map::iterator iter; + int counter = 0; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); iter++) { + in_addr ip; + ip.s_addr = this->ip_src; + string ip_src = inet_ntoa(ip); + ip.s_addr = this->ip_dst; + string ip_dst = inet_ntoa(ip); + cout << fixed + << setw(level * 5) << "" + << setw(3) << (++counter) << " " + << setw(15) << ip_src << "/" << setw(6) << this->port_src + << " -> " + << setw(15) << ip_dst << "/" << setw(6) << this->port_dst + << endl; + iter->second->printContent(level + 1); + } +} + +void TcpReassemblyLink::cleanup(u_int64_t act_time_from_header) { + /* + map::iterator iter; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); ) { + if(iter->second->last_packet_at_from_header && + act_time_from_header > iter->second->last_packet_at_from_header + 10 * 1000) { + delete iter->second; + this->queue_by_ack.erase(iter++); + } else { + ++iter; + } + } + */ +} + +void TcpReassemblyLink::setLastSeq(TcpReassemblyStream::eDirection direction, + u_int32_t lastSeq) { + int index = this->queue.size(); + if(index > 0 && this->queue[index - 1]->direction == direction) { + index = index - 1; + } else if(index > 1 && this->queue[index - 2]->direction == direction) { + index = index - 2; + } else { + return; + } + this->queue[index]->last_seq = lastSeq; + if(debug_packet) { + cout << " -- set last seq: " << lastSeq << endl; + } +} + +/* +int TcpReassemblyLink::okQueue_normal(bool final, bool enableDebug) { + bool rslt; + size_t size = this->queue.size(); + for(size_t i = 0; i < size; i++) { + if(enableDebug) { + cout << "|"; + } + rslt = this->queue[i]->ok(false, i == size - 1 && (this->rst || this->fin_to_dest || this->fin_to_source), 0, + false, enableDebug); + if(rslt <= 0) { + return(rslt); + } + } + return(rslt); +} +*/ + +int TcpReassemblyLink::okQueue_crazy(bool final, bool enableDebug) { + streamIterator iter = this->createIterator(); + if(!this->direction_confirm) { + return(-2); + } + if(!iter.stream) { + return(-10); + } + this->ok_streams.clear(); + int countDataStream = 0; + for(int pass = 0; pass < (final ? 3 : 1) && !countDataStream; pass++) { + if(pass > 0) { + iter.init(); + } + TcpReassemblyStream *lastHttpStream = NULL; + while(true) { + if(pass == 1 && + iter.state == STATE_SYN_FORCE_OK) { + if(!iter.nextAckInDirection()) { + break; + } + } + + if(iter.stream->ack == 784212552) { + cout << " -- ***** -- "; + } + + if(enableDebug && debug_check_ok_process) { + iter.print(); + cout << " "; + } + if(iter.state >= STATE_SYN_OK) { + u_int32_t maxNextSeq = iter.getMaxNextSeq(); + if((maxNextSeq || true/*pass == 2*/) && + iter.stream->exists_data) { + if(enableDebug) { + cout << "|"; + } + if(iter.stream->ok(true, maxNextSeq == 0, maxNextSeq, + true/*pass == 2*/, lastHttpStream, enableDebug)) { + bool existsAckInStream = false; + for(size_t i = 0; i < this->ok_streams.size(); i++) { + if(this->ok_streams[i]->ack == iter.stream->ack) { + existsAckInStream = true; + break; + } + } + if(!existsAckInStream) { + this->ok_streams.push_back(iter.stream); + ++countDataStream; + if(iter.stream->http_ok) { + lastHttpStream = iter.stream; + } + } + } else if(pass == 2) { + if(iter.nextSeqInDirection()) { + continue; + } + } + } + } + if(enableDebug && debug_check_ok_process) { + cout << endl; + } + if(!iter.next()) { + bool completeExpectContinue = false; + cout << "**" << endl; + cout << "**" << iter.stream->complete_data << endl; + if(iter.stream->direction == TcpReassemblyStream::DIRECTION_TO_SOURCE && + iter.stream->complete_data && iter.stream->complete_data->data && + iter.stream->complete_data->datalen == 25 && + !memcmp(iter.stream->complete_data->data, "HTTP/1.1 100 Continue\r\n\r\n", 25) && + this->ok_streams.size() > 1 && + this->ok_streams[this->ok_streams.size() - 2]->http_expect_continue && + this->ok_streams[this->ok_streams.size() - 2]->http_content_length && + iter.stream->ack > this->ok_streams[this->ok_streams.size() - 2]->min_seq && + iter.stream->ack < this->ok_streams[this->ok_streams.size() - 2]->max_next_seq) { + TcpReassemblyDataItem dataItem = this->ok_streams[this->ok_streams.size() - 2]->getCompleteData(true); + this->ok_streams[this->ok_streams.size() - 2]->is_ok = false; + this->ok_streams[this->ok_streams.size() - 2]->_ignore_expect_continue = true; + if(this->ok_streams[this->ok_streams.size() - 2]->ok(true, false, 0, + true, NULL, false)) { + completeExpectContinue = true; + dataItem.destroy(); + iter.stream = this->ok_streams[this->ok_streams.size() - 2]; + if(!iter.nextAckInDirection()) { + break; + } + } + if(!completeExpectContinue && + this->ok_streams[this->ok_streams.size() - 2]->detect_ok_max_next_seq) { + if(this->ok_streams[this->ok_streams.size() - 2]->ok2_ec(iter.stream->max_next_seq)) { + this->ok_streams.push_back(this->queue_by_ack[iter.stream->max_next_seq]); + completeExpectContinue = true; + dataItem.destroy(); + iter.stream = this->ok_streams[this->ok_streams.size() - 1]; + iter.next(); + } + } + if(!completeExpectContinue) { + this->ok_streams[this->ok_streams.size() - 2]->is_ok = true; + this->ok_streams[this->ok_streams.size() - 2]->_ignore_expect_continue = false; + this->ok_streams[this->ok_streams.size() - 2]->complete_data = new TcpReassemblyDataItem(); + this->ok_streams[this->ok_streams.size() - 2]->complete_data->setFrom(dataItem); + } + } + if(!completeExpectContinue) { + if(iter.stream->direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + if(!iter.nextAckInDirection()) { + break; + } + } else if(iter.stream->direction == TcpReassemblyStream::DIRECTION_TO_SOURCE) { + if(!iter.nextAckInReverseDirection()) { + break; + } + } else { + break; + } + } + } + } + } + return(iter.state < STATE_SYN_OK ? -1 : countDataStream); +} + +/* +void TcpReassemblyLink::complete_normal() { + this->lock_queue(); + size_t size = this->queue.size(); + while(true) { + size_t countRequest = 0; + size_t countRslt = 0; + bool ok = true; + while(this->completed_offset + countRequest < size && + this->queue[this->completed_offset + countRequest]->direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + if(!this->queue[this->completed_offset + countRequest]->ok()) { + ok = false; + break; + } + ++countRequest; + } + if(!countRequest || !ok) { + break; + } + while(this->completed_offset + countRequest + countRslt < size && + this->queue[this->completed_offset + countRequest + countRslt]->direction == TcpReassemblyStream::DIRECTION_TO_SOURCE) { + if(!this->queue[this->completed_offset + countRequest + countRslt]->ok()) { + ok = false; + break; + } + ++countRslt; + } + if(!countRslt || !ok) { + break; + } + for(size_t i = 0; i < countRequest + countRslt; i++) { + TcpReassemblyStream *stream = this->queue[this->completed_offset + i]; + u_char *data; + u_int32_t datalen; + timeval time; + data = stream->complete(&datalen, &time); + if(data) { + if(debug_data) { + cout << endl; + if(i == 0) { + cout << "** REQUEST **" << endl << endl; + } else if (i == countRequest) { + cout << "** RSLT **" << endl << endl; + } + cout << data << endl << endl; + } + delete [] data; + } + } + this->completed_offset += countRequest + countRslt; + } + this->unlock_queue(); +} +*/ + +void TcpReassemblyLink::complete_crazy(bool final, bool eraseCompletedStreams) { + this->lock_queue(); + while(true) { + size_t size_ok_streams = this->ok_streams.size(); + TcpReassemblyData *reassemblyData = NULL; + size_t skip_offset = 0; + while(skip_offset < size_ok_streams && + this->ok_streams[skip_offset]->direction != TcpReassemblyStream::DIRECTION_TO_DEST) { + this->ok_streams[skip_offset + completed_offset]->completed_finally = true; + ++skip_offset; + } + size_t old_skip_offset; + do { + old_skip_offset = skip_offset; + while(skip_offset < size_ok_streams && this->ok_streams[skip_offset + completed_offset]->completed_finally) { + ++skip_offset; + } + while(skip_offset < size_ok_streams && + this->ok_streams[skip_offset]->direction != TcpReassemblyStream::DIRECTION_TO_DEST) { + this->ok_streams[skip_offset + completed_offset]->completed_finally = true; + ++skip_offset; + } + while(skip_offset < size_ok_streams && + !this->ok_streams[skip_offset]->http_type) { + this->ok_streams[skip_offset + completed_offset]->completed_finally = true; + ++skip_offset; + } + } while(skip_offset > old_skip_offset); + size_t countRequest = 0; + size_t countRslt = 0; + bool ok = true; + bool postExpectContinueInFirstRequest = false; + bool forceExpectContinue = false; + while(skip_offset + countRequest < size_ok_streams && + this->ok_streams[skip_offset + countRequest]->direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + + if(this->ok_streams[skip_offset + countRequest]->ack == 766596997) { + cout << "-- ***** --" << endl; + } + + ++countRequest; + if(countRequest == 1) { + TcpReassemblyDataItem dataItem = this->ok_streams[skip_offset]->getCompleteData(); + u_char *data = dataItem.data; + u_int32_t datalen = dataItem.datalen; + if(data && datalen > 24 && + !memcmp(data, "POST ", 5) && + strcasestr((char*)data, "Expect: 100-continue")) { + postExpectContinueInFirstRequest = true; + } else { + break; + } + } + if(countRequest == 2 && postExpectContinueInFirstRequest) { + TcpReassemblyDataItem dataItem = this->ok_streams[skip_offset + 1]->getCompleteData(); + u_char *data = dataItem.data; + u_int32_t datalen = dataItem.datalen; + if(data && datalen > 0 && data[0] == '{') { + forceExpectContinue = true; + break; + } else { + --countRequest; + break; + } + } + } + if(!countRequest || !ok) { + break; + } + while(skip_offset + countRequest + countRslt < size_ok_streams && + this->ok_streams[skip_offset + countRequest + countRslt]->direction == TcpReassemblyStream::DIRECTION_TO_SOURCE) { + ++countRslt; + } + if(!(final || forceExpectContinue || countRslt) || !ok) { + break; + } + if(postExpectContinueInFirstRequest && !forceExpectContinue) { + if(skip_offset + countRequest + countRslt + 1 <= size_ok_streams) { + // OK + } else { + break; + } + } + reassemblyData = new TcpReassemblyData; + bool existsSeparateExpectContinueData = false; + for(size_t i = 0; i < countRequest + countRslt; i++) { + TcpReassemblyStream *stream = this->ok_streams[skip_offset + i]; + TcpReassemblyDataItem dataItem = stream->getCompleteData(true); + u_char *data = dataItem.data; + u_int32_t datalen = dataItem.datalen; + timeval time = dataItem.time; + if(data) { + + if(this->ok_streams[skip_offset + i]->ack == 356712669) { + cout << "-- ***** --" << endl; + } + + if(i == countRequest - 1 && + datalen > 24 && + !memcmp(data, "POST ", 5) && + strcasestr((char*)data, "Expect: 100-continue")) { + if(skip_offset + countRequest + countRslt + 1 <= size_ok_streams) { + if(this->ok_streams[skip_offset + countRequest + countRslt]->http_ok_expect_continue_data) { + existsSeparateExpectContinueData = true; + reassemblyData->forceAppendExpectContinue = true; + } else { + TcpReassemblyDataItem dataItem = this->ok_streams[skip_offset + countRequest + countRslt]->getCompleteData(); + if(dataItem.data && dataItem.data[0] == '{') { + existsSeparateExpectContinueData = true; + } + } + } + } + if(debug_data) { + cout << endl; + if(i == 0) { + cout << "** REQUEST **"; + } else if (i == countRequest) { + cout << "** RSLT **"; + } + if(i == 0 || i == countRequest) { + cout << endl << endl; + } + cout << " ack: " << this->ok_streams[skip_offset + i]->ack << endl << endl; + cout << data << endl << endl; + } + if(i < countRequest) { + reassemblyData->addRequest(data, datalen, time); + } else { + reassemblyData->addResponse(data, datalen, time); + } + this->ok_streams[skip_offset + i]->completed_finally = true; + } + } + if(existsSeparateExpectContinueData && + skip_offset + countRequest + countRslt + 1 <= size_ok_streams && + this->ok_streams[skip_offset + countRequest + countRslt]->direction == TcpReassemblyStream::DIRECTION_TO_DEST) { + TcpReassemblyDataItem dataItem = this->ok_streams[skip_offset + countRequest + countRslt]->getCompleteData(true); + /* + if(!ENABLE_UNLOCK_PACKET_IN_OK && + countRequest == 1 && this->ok_streams[skip_offset]->http_ok && + this->ok_streams[skip_offset]->http_expect_continue && + this->ok_streams[skip_offset]->http_content_length && + (dataItem.datalen > this->ok_streams[skip_offset]->http_content_length + 1 || + dataItem.datalen < this->ok_streams[skip_offset]->http_content_length -1)) { + this->ok_streams[skip_offset + countRequest + countRslt]->is_ok = false; + this->ok_streams[skip_offset + countRequest + countRslt]->complete_data = NULL; + if(this->ok_streams[skip_offset + countRequest + countRslt]->ok(true, false, 0, + true, this->ok_streams[skip_offset], false)) { + cout << "-- REPAIR STREAM --" << endl; + dataItem.destroy(); + dataItem = this->ok_streams[skip_offset + countRequest + countRslt]->getCompleteData(true); + } + } + */ + u_char *data = dataItem.data; + u_int32_t datalen = dataItem.datalen; + timeval time = dataItem.time; + if(data) { + if(debug_data) { + cout << endl; + cout << "** EXPECT CONTINUE **"; + cout << endl << endl; + cout << " ack: " << this->ok_streams[skip_offset + countRequest + countRslt]->ack << endl << endl; + cout << data << endl << endl; + } + reassemblyData->addExpectContinue(data, datalen, time); + this->ok_streams[skip_offset + countRequest + countRslt]->completed_finally = true; + } + if(skip_offset + countRequest + countRslt + 2 <= size_ok_streams && + this->ok_streams[skip_offset + countRequest + countRslt + 1]->direction == TcpReassemblyStream::DIRECTION_TO_SOURCE) { + dataItem = this->ok_streams[skip_offset + countRequest + countRslt + 1]->getCompleteData(true); + data = dataItem.data; + datalen = dataItem.datalen; + time = dataItem.time; + if(debug_data) { + cout << endl; + cout << "** EXPECT CONTINUE RSLT **"; + cout << endl << endl; + cout << " ack: " << this->ok_streams[skip_offset + countRequest + countRslt + 1]->ack << endl << endl; + cout << data << endl << endl; + } + reassemblyData->addExpectContinueResponse(data, datalen, time); + this->ok_streams[skip_offset + countRequest + countRslt + 1]->completed_finally = true; + } + } + if(reassemblyData->isFill()) { + if(reassembly->dataCallback) { + reassembly->dataCallback->processData( + this->ip_src, this->ip_dst, + this->port_src, this->port_dst, + reassemblyData); + reassemblyData = NULL; + } + if(eraseCompletedStreams) { + while(this->ok_streams.size() && this->ok_streams[0]->completed_finally) { + this->ok_streams[0]->is_ok = false; + this->ok_streams[0]->completed = false; + this->ok_streams.erase(this->ok_streams.begin()); + } + } + skip_offset = 0; + } + if(reassemblyData) { + delete reassemblyData; + } + } + this->unlock_queue(); +} + +TcpReassemblyLink::streamIterator TcpReassemblyLink::createIterator() { + streamIterator iterator(this); + return(iterator); +} + +void TcpReassemblyLink::switchDirection() { + this->lock_queue(); + u_int32_t tmp = this->ip_src; + this->ip_src = this->ip_dst; + this->ip_dst = tmp; + tmp = this->port_src; + this->port_src = this->port_dst; + this->port_dst = tmp; + tmp = this->fin_to_source; + this->fin_to_source = this->fin_to_dest; + this->fin_to_dest = tmp; + map::iterator iter; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); iter++) { + iter->second->direction = iter->second->direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + TcpReassemblyStream::DIRECTION_TO_SOURCE : + TcpReassemblyStream::DIRECTION_TO_DEST; + } + for(iter = this->queue_flags_by_ack.begin(); iter != this->queue_flags_by_ack.end(); iter++) { + iter->second->direction = iter->second->direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + TcpReassemblyStream::DIRECTION_TO_SOURCE : + TcpReassemblyStream::DIRECTION_TO_DEST; + } + for(iter = this->queue_nul_by_ack.begin(); iter != this->queue_nul_by_ack.end(); iter++) { + iter->second->direction = iter->second->direction == TcpReassemblyStream::DIRECTION_TO_DEST ? + TcpReassemblyStream::DIRECTION_TO_SOURCE : + TcpReassemblyStream::DIRECTION_TO_DEST; + } + this->unlock_queue();; +} + + +TcpReassembly::~TcpReassembly() { + this->cleanup(true); +} + +void TcpReassembly::push(pcap_pkthdr *header, iphdr2 *header_ip, u_char *packet, + pcap_block_store *block_store, int block_store_index) { + if(debug_limit_counter && debug_counter > debug_limit_counter) { + return; + } + + tcphdr *header_tcp_pointer; + tcphdr header_tcp; + u_char *data; + u_int32_t datalen; + u_int32_t datacaplen; + + header_tcp_pointer = (tcphdr*)((u_char*)header_ip + sizeof(*header_ip)); + data = (u_char*)header_tcp_pointer + (header_tcp_pointer->doff << 2); + datalen = htons(header_ip->tot_len) - sizeof(*header_ip) - (header_tcp_pointer->doff << 2); + datacaplen = header->caplen - ((u_char*)data - packet); + header_tcp = *header_tcp_pointer; + header_tcp.source = htons(header_tcp.source); + header_tcp.dest = htons(header_tcp.dest); + header_tcp.seq = htonl(header_tcp.seq); + header_tcp.ack_seq = htonl(header_tcp.ack_seq); + u_int32_t next_seq = header_tcp.seq + datalen; + + if(debug_port) { + if(header_tcp.source != debug_port && header_tcp.dest != debug_port) { + return; + } + } + + this->last_time = getTimeMS(); + + if(debug_seq && header_tcp.seq == debug_seq) { + cout << " -- XXX DEBUG SEQ XXX" << endl; + } + + this->act_time_from_header = header->ts.tv_sec * 1000 + header->ts.tv_usec / 1000; + + TcpReassemblyLink *link = NULL; + map::iterator iter; + TcpReassemblyStream::eDirection direction = TcpReassemblyStream::DIRECTION_TO_DEST; + TcpReassemblyLink_id id(header_ip->saddr, header_ip->daddr, header_tcp.source, header_tcp.dest); + this->lock_links(); + iter = this->links.find(id); + if(iter != this->links.end()) { + link = iter->second; + if(!this->enableCrazySequence && + link->state == TcpReassemblyLink::STATE_SYN_SENT && + this->enableHttpForceInit && + ((datalen > 5 && !memcmp(data, "POST ", 5)) || + (datalen > 4 && !memcmp(data, "GET ", 4)))) { + link->state = TcpReassemblyLink::STATE_SYN_FORCE_OK; + } + } else { + id.reverse(); + iter = this->links.find(id); + if(iter != this->links.end()) { + link = iter->second; + direction = TcpReassemblyStream::DIRECTION_TO_SOURCE; + } else if(!this->enableCrazySequence && + header_tcp.syn && !header_tcp.ack) { + id.reverse(); + link = new TcpReassemblyLink(this, header_ip->saddr, header_ip->daddr, header_tcp.source, header_tcp.dest); + this->links[id] = link; + } else if(this->enableCrazySequence || + (this->enableHttpForceInit && + ((datalen > 5 && !memcmp(data, "POST ", 5)) || + (datalen > 4 && !memcmp(data, "GET ", 4))))) { + id.reverse(); + link = new TcpReassemblyLink(this, header_ip->saddr, header_ip->daddr, header_tcp.source, header_tcp.dest); + this->links[id] = link; + if(this->enableCrazySequence) { + link->state = TcpReassemblyLink::STATE_CRAZY; + } else { + link->state = TcpReassemblyLink::STATE_SYN_FORCE_OK; + link->first_seq_to_dest = header_tcp.seq; + } + } + } + if(link) { + link->push(direction, header->ts, header_tcp, + data, datalen, datacaplen, + block_store, block_store_index); + } + this->unlock_links(); + + if(debug_packet) { + in_addr ip; + ip.s_addr = header_ip->saddr; + string ip_src = inet_ntoa(ip); + ip.s_addr = header_ip->daddr; + string ip_dst = inet_ntoa(ip); + string _data; + if(datalen) { + char *__data = new char[datalen + 1]; + memcpy(__data, data, datalen); + __data[datalen] = 0; + _data = __data; + delete [] __data; + _data = _data.substr(0, 1000); + for(size_t i = 0; i < _data.length(); i++) { + if(_data[i] == 13 || _data[i] == 10) { + _data[i] = '\\'; + } + if(_data[i] < 32) { + _data.resize(i); + } + } + } + cout << fixed + << sqlDateTimeString(header->ts.tv_sec) << "." << setw(6) << header->ts.tv_usec + << " : " + << setw(15) << ip_src << "/" << setw(6) << header_tcp.source + << " -> " + << setw(15) << ip_dst << "/" << setw(6) << header_tcp.dest + << " " + << (header_tcp.fin ? 'F' : '-') + << (header_tcp.syn ? 'S' : '-') + << (header_tcp.rst ? 'R' : '-') + << (header_tcp.psh ? 'P' : '-') + << (header_tcp.ack ? 'A' : '-') + << (header_tcp.urg ? 'U' : '-') + << " " + << " len: " << setw(5) << datalen + << " seq: " << setw(12) << header_tcp.seq + << " next seq: " << setw(12) << next_seq + << " ack: " << setw(12) << header_tcp.ack_seq + << " data: " << _data + << endl; + ++debug_counter; + + /* + if(strstr((char*)data, "CHANNEL_CREATE")) { + cout << "-- ***** --" << endl; + } + */ + + } + + static u_int32_t _counter; + if(!((_counter++) % 100)) { + this->cleanup(); + } +} + +void TcpReassembly::cleanup(bool all) { + if(all) { + cout << "cleanup all" << endl; + } + map::iterator iter; + //u_int64_t act_time = getTimeMS(); + this->lock_links(); + if(all && opt_pb_read_from_file[0]) { + cout << "COUNT REST LINKS: " << this->links.size() << endl; + } + for(iter = this->links.begin(); iter != this->links.end(); ) { + if(all || + //act_time > max(iter->second->created_at, iter->second->last_packet_at) + 10 * 60000 || + (iter->second->last_packet_at_from_header && + this->act_time_from_header > iter->second->last_packet_at_from_header + 2 * 1000)) { + if(!iter->second->link_is_ok < 2) { + bool _cout = false; + if(!iter->second->exists_data) { + if(debug_rslt) { + cout << "RSLT: EMPTY"; + _cout = true; + } + } else { + int countDataStream = iter->second->okQueue(true, debug_check_ok); + if(countDataStream > 0) { + iter->second->complete(true, true); + iter->second->link_is_ok = 2; + } + if(debug_rslt) { + if(countDataStream < 0) { + cout << (countDataStream == -1 ? "RSLT: MISSING REQUEST" : + (countDataStream == -2 ? "RSLT: DIRECTION NOT CONFIRMED" : + "RSLT: EMPTY OR OTHER ERROR")); + } + else if(countDataStream > 1) { + cout << "RSLT: OK (" << countDataStream << ")"; + } else if(countDataStream > 0) { + cout << "RSLT: ONLY REQUEST (" << countDataStream << ")"; + } else { + cout << "RSLT: ERRRRRRRROR"; + } + _cout = true; + } + } + if(_cout) { + if(debug_packet) { + in_addr ip; + ip.s_addr = iter->second->ip_src; + string ip_src = inet_ntoa(ip); + ip.s_addr = iter->second->ip_dst; + string ip_dst = inet_ntoa(ip); + cout << " clean " + << ip_src << " / " << iter->second->port_src + << " -> " + << ip_dst << " / " << iter->second->port_dst; + } + cout << endl; + } + } + if(all || + !iter->second->existsUncompletedDataStream() || + (iter->second->last_packet_at_from_header && + this->act_time_from_header > iter->second->last_packet_at_from_header + 20 * 1000)) { + delete iter->second; + this->links.erase(iter++); + } else { + ++iter; + } + } else { + iter->second->cleanup(this->act_time_from_header); + ++iter; + } + } + + if(this->doPrintContent) { + this->printContent(); + this->doPrintContent = false; + } + this->unlock_links(); +} + +/* +bool TcpReassembly::enableStop() { + return(getTimeMS() - this->last_time > 20 * 1000); +} +*/ + +void TcpReassembly::printContent() { + map::iterator iter; + int counter = 0; + for(iter = this->links.begin(); iter != this->links.end(); iter++) { + cout << fixed << setw(3) << (++counter) << " " + << endl; + iter->second->printContent(1); + } +} diff --git a/tcpreassembly.h b/tcpreassembly.h new file mode 100644 index 000000000..995b23a9e --- /dev/null +++ b/tcpreassembly.h @@ -0,0 +1,567 @@ +#ifndef TCP_REASSEMBLY_H +#define TCP_REASSEMBLY_H + +#include + +#include "sniff.h" +#include "pcap_queue_block.h" + +class TcpReassemblyDataItem { +public: + TcpReassemblyDataItem() { + this->data = NULL; + this->datalen = 0; + this->time.tv_sec = 0; + this->time.tv_usec = 0; + } + TcpReassemblyDataItem(u_char *data, u_int32_t datalen, timeval time) { + this->data = data; + this->datalen = datalen; + this->time = time; + } + void setFrom(TcpReassemblyDataItem &dataItem) { + this->data = dataItem.data; + this->datalen = dataItem.datalen; + this->time = dataItem.time; + } + void destroy() { + if(this->data) { + delete [] this->data; + this->data = NULL; + } + } +public: + u_char *data; + u_int32_t datalen; + timeval time; +}; + +class TcpReassemblyData { +public: + TcpReassemblyData() { + this->forceAppendExpectContinue = false; + } + ~TcpReassemblyData(); + void addRequest(u_char *data, u_int32_t datalen, timeval time) { + request.push_back(TcpReassemblyDataItem(data, datalen, time)); + } + void addResponse(u_char *data, u_int32_t datalen, timeval time) { + response.push_back(TcpReassemblyDataItem(data, datalen, time)); + } + void addExpectContinue(u_char *data, u_int32_t datalen, timeval time) { + expectContinue.push_back(TcpReassemblyDataItem(data, datalen, time)); + } + void addExpectContinueResponse(u_char *data, u_int32_t datalen, timeval time) { + expectContinueResponse.push_back(TcpReassemblyDataItem(data, datalen, time)); + } + bool isFill(); +public: + vector request; + vector response; + vector expectContinue; + vector expectContinueResponse; + bool forceAppendExpectContinue; +}; + +class TcpReassemblyProcessData { +public: + virtual void processData(u_int32_t ip_src, u_int32_t ip_dst, + u_int16_t port_src, u_int16_t port_dst, + TcpReassemblyData *data) = 0; +}; + +struct TcpReassemblyLink_id { + TcpReassemblyLink_id(u_int32_t ip_src = 0, u_int32_t ip_dst = 0, + u_int16_t port_src = 0, u_int16_t port_dst = 0) { + this->ip_src = ip_src; + this->ip_dst = ip_dst; + this->port_src = port_src; + this->port_dst = port_dst; + } + void reverse() { + u_int32_t tmp = this->ip_src; + this->ip_src = this->ip_dst; + this->ip_dst = tmp; + tmp = this->port_src; + this->port_src = this->port_dst; + this->port_dst = tmp; + } + u_int32_t ip_src; + u_int32_t ip_dst; + u_int16_t port_src; + u_int16_t port_dst; + bool operator < (const TcpReassemblyLink_id& other) const { + return((this->ip_src < other.ip_src) ? 1 : (this->ip_src > other.ip_src) ? 0 : + (this->ip_dst < other.ip_dst) ? 1 : (this->ip_dst > other.ip_dst) ? 0 : + (this->port_src < other.port_src) ? 1 : (this->port_src > other.port_src) ? 0 : + (this->port_dst < other.port_dst)); + } +}; + +class TcpReassemblyStream_packet { +public: + enum eState { + NA = 0, + CHECK, + FAIL + }; + TcpReassemblyStream_packet() { + state = NA; + //locked_packet = false; + } +private: + void lock_packet() { + if(this->block_store/* && !locked_packet*/) { + //static int countLock; + //cout << "COUNT LOCK: " << (++countLock) << endl; + this->block_store->lock_packet(this->block_store_index); + //locked_packet = true; + } + } + void unlock_packet() { + if(this->block_store/* && locked_packet*/) { + //static int countUnlock; + //cout << "COUNT UNLOCK: " << (++countUnlock) << endl; + this->block_store->unlock_packet(this->block_store_index); + //locked_packet = false; + } + } + void cleanState() { + this->state = NA; + } +private: + timeval time; + tcphdr header_tcp; + u_int32_t next_seq; + u_char *data; + u_int32_t datalen; + u_int32_t datacaplen; + pcap_block_store *block_store; + int block_store_index; + eState state; + //bool locked_packet; +friend class TcpReassemblyStream_packet_var; +friend class TcpReassemblyStream; +friend class TcpReassemblyLink; +}; + +class TcpReassemblyStream_packet_var { +public: + ~TcpReassemblyStream_packet_var() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + iter->second.unlock_packet(); + } + } + void push(TcpReassemblyStream_packet packet); + u_int32_t getNextSeqCheck() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + if(iter->second.datalen && + (iter->second.state == TcpReassemblyStream_packet::NA || + iter->second.state == TcpReassemblyStream_packet::CHECK)) { + return(iter->second.next_seq); + } + } + return(0); + } +private: + void unlockPackets() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + iter->second.unlock_packet(); + } + } + void cleanState() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + iter->second.cleanState(); + } + } +private: + map queue; +friend class TcpReassemblyStream; +friend class TcpReassemblyLink; +}; + +class TcpReassemblyStream { +public: + enum eDirection { + DIRECTION_NA = 0, + DIRECTION_TO_DEST, + DIRECTION_TO_SOURCE + }; + enum eType { + TYPE_DATA, + TYPE_SYN_SENT, + TYPE_SYN_RECV, + TYPE_FIN, + TYPE_RST + }; + enum eHttpType { + HTTP_TYPE_NA = 0, + HTTP_TYPE_POST, + HTTP_TYPE_GET, + HTTP_TYPE_HEAD, + HTTP_TYPE_HTTP + }; + TcpReassemblyStream(class TcpReassemblyLink *link) { + direction = DIRECTION_TO_DEST; + type = TYPE_DATA; + ack = 0; + first_seq = 0; + last_seq = 0; + min_seq = 0; + max_next_seq = 0; + is_ok = false; + completed = false; + completed_finally = false; + exists_data = false; + complete_data = NULL; + http_type = HTTP_TYPE_NA; + http_content_length = 0; + http_ok = false; + http_expect_continue = false; + http_ok_expect_continue_post = false; + http_ok_expect_continue_data = false; + detect_ok_max_next_seq = 0; + _ignore_expect_continue = false; + _only_check_psh = false; + _force_wait_for_next_psh = false; + last_packet_at_from_header = 0; + this->link = link; + } + ~TcpReassemblyStream() { + this->cleanCompleteData(true); + } + void push(TcpReassemblyStream_packet packet); + int ok(bool crazySequence = false, bool enableSimpleCmpMaxNextSeq = false, u_int32_t maxNextSeq = 0, + bool enableCheckCompleteContent = false, TcpReassemblyStream *prevHttpStream = NULL, bool enableDebug = false); + bool ok2_ec(u_int32_t nextAck, bool enableDebug = false); + u_char *complete(u_int32_t *datalen, timeval *time, bool check = false, bool unlockPackets = true); + bool saveCompleteData(bool unlockPackets = true, bool check = false, TcpReassemblyStream *prevHttpStream = NULL); + void cleanCompleteData(bool destroy = false); + TcpReassemblyDataItem getCompleteData(bool clean = false); + void unlockPackets(); + void printContent(int level = 0); + bool checkOkPost(TcpReassemblyStream *nextStream = NULL); +private: + bool checkCompleteContent(); + bool checkContentIsHttpRequest(); + void cleanPacketsState() { + map::iterator iter; + for(iter = this->queue.begin(); iter != this->queue.end(); iter++) { + iter->second.cleanState(); + } + this->ok_packets.clear(); + } + u_int32_t getLastSeqFromNextStream(); + eDirection direction; + eType type; + u_int32_t ack; + u_int32_t first_seq; + u_int32_t last_seq; + u_int32_t min_seq; + u_int32_t max_next_seq; + map queue; + deque ok_packets; + bool is_ok; + bool completed; + bool completed_finally; + bool exists_data; + TcpReassemblyDataItem *complete_data; + eHttpType http_type; + u_int32_t http_content_length; + bool http_ok; + bool http_expect_continue; + bool http_ok_expect_continue_post; + bool http_ok_expect_continue_data; + u_int32_t detect_ok_max_next_seq; + bool _ignore_expect_continue; + bool _only_check_psh; + bool _force_wait_for_next_psh; + u_int64_t last_packet_at_from_header; + TcpReassemblyLink *link; +friend class TcpReassemblyLink; +}; + +class TcpReassemblyLink { +public: + enum eState { + STATE_NA = 0, + STATE_SYN_SENT, + STATE_SYN_RECV, + STATE_SYN_OK, + STATE_SYN_FORCE_OK, + STATE_RESET, + STATE_CLOSE, + STATE_CLOSED, + STATE_CRAZY + }; + class streamIterator { + public: + streamIterator(TcpReassemblyLink *link) { + this->link = link; + this->init(); + } + bool init(); + bool next(); + bool nextAckInDirection(); + bool nextAckInReverseDirection(); + bool nextSeqInDirection(); + void print(); + u_int32_t getMaxNextSeq(); + private: + bool findSynSent(); + bool findFirstDataToDest(); + public: + TcpReassemblyStream *stream; + eState state; + private: + TcpReassemblyLink *link; + }; + TcpReassemblyLink(class TcpReassembly *reassembly, + u_int32_t ip_src = 0, u_int32_t ip_dst = 0, + u_int16_t port_src = 0, u_int16_t port_dst = 0) { + this->reassembly = reassembly; + this->ip_src = ip_src; + this->ip_dst = ip_dst; + this->port_src = port_src; + this->port_dst = port_dst; + this->state = STATE_NA; + this->first_seq_to_dest = 0; + this->first_seq_to_source = 0; + this->rst = false; + this->fin_to_dest = false; + this->fin_to_source = false; + this->_sync_queue = 0; + this->created_at = getTimeMS(); + this->last_packet_at = 0; + this->created_at_from_header = 0; + this->last_packet_at_from_header = 0; + this->last_ack = 0; + this->exists_data = false; + this->link_is_ok = 0; + this->completed_offset = 0; + this->direction_confirm = 0; + } + ~TcpReassemblyLink(); + bool push(TcpReassemblyStream::eDirection direction, + timeval time, tcphdr header_tcp, + u_char *data, u_int32_t datalen, u_int32_t datacaplen, + pcap_block_store *block_store, int block_store_index) { + if(datalen) { + this->exists_data = true; + } + if(this->state == STATE_CRAZY) { + return(this->push_crazy( + direction, time, header_tcp, + data, datalen, datacaplen, + block_store, block_store_index)); + } else { + return(this->push_normal( + direction, time, header_tcp, + data, datalen, datacaplen, + block_store, block_store_index)); + } + } + bool push_normal( + TcpReassemblyStream::eDirection direction, + timeval time, tcphdr header_tcp, + u_char *data, u_int32_t datalen, u_int32_t datacaplen, + pcap_block_store *block_store, int block_store_index); + bool push_crazy( + TcpReassemblyStream::eDirection direction, + timeval time, tcphdr header_tcp, + u_char *data, u_int32_t datalen, u_int32_t datacaplen, + pcap_block_store *block_store, int block_store_index); + int okQueue(bool final = false, bool enableDebug = false) { + if(this->state == STATE_CRAZY) { + return(this->okQueue_crazy(final, enableDebug)); + } else { + /* + return(this->okQueue_normal(final, enableDebug)); + */ + } + return(0); + } + int okQueue_normal(bool final = false, bool enableDebug = false); + int okQueue_crazy(bool final = false, bool enableDebug = false); + void complete(bool final = false, bool eraseCompletedStreams = false) { + if(this->state == STATE_CRAZY) { + this->complete_crazy(final, eraseCompletedStreams); + } else { + /* + this->complete_normal(); + */ + } + } + void complete_normal(); + void complete_crazy(bool final = false, bool eraseCompletedStreams = false); + streamIterator createIterator(); + TcpReassemblyStream *findStreamBySeq(u_int32_t seq) { + for(size_t i = 0; i < this->queue.size(); i++) { + map::iterator iter; + iter = this->queue[i]->queue.find(seq); + if(iter != this->queue[i]->queue.end()) { + return(this->queue[i]); + } + } + return(NULL); + } + TcpReassemblyStream *findStreamByMinSeq(u_int32_t seq, bool dataOnly = false, + u_int32_t not_ack = 0, TcpReassemblyStream::eDirection direction = TcpReassemblyStream::DIRECTION_NA) { + map::iterator iter; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); iter++) { + if(iter->second->min_seq == seq && + (!not_ack || iter->second->ack != not_ack) && + (direction == TcpReassemblyStream::DIRECTION_NA || iter->second->direction == direction)) { + return(iter->second); + } + } + if(!dataOnly && this->queue_nul_by_ack.size()) { + iter = this->queue_nul_by_ack.end(); + do { + --iter; + if(iter->second->min_seq == seq && + (!not_ack || iter->second->ack != not_ack) && + (direction == TcpReassemblyStream::DIRECTION_NA || iter->second->direction == direction)) { + return(iter->second); + } + } while(iter != this->queue_nul_by_ack.begin()); + } + return(NULL); + } + TcpReassemblyStream *findFlagStreamByAck(u_int32_t ack) { + map::iterator iter; + iter = this->queue_flags_by_ack.find(ack); + if(iter != this->queue_flags_by_ack.end()) { + return(iter->second); + } + return(NULL); + } + TcpReassemblyStream *findFinalFlagStreamByAck(u_int32_t ack, TcpReassemblyStream::eDirection direction) { + map::iterator iter; + iter = this->queue_flags_by_ack.find(ack); + if(iter != this->queue_flags_by_ack.end() && + iter->second->direction == direction) { + return(iter->second); + } + return(NULL); + } + TcpReassemblyStream *findFinalFlagStreamBySeq(u_int32_t seq, TcpReassemblyStream::eDirection direction) { + map::iterator iter; + for(iter = this->queue_flags_by_ack.begin(); iter != this->queue_flags_by_ack.end(); iter++) { + if(iter->second->direction == direction && + iter->second->min_seq >= seq) { + return(iter->second); + } + } + return(NULL); + } + bool existsUncompletedDataStream() { + map::iterator iter; + for(iter = this->queue_by_ack.begin(); iter != this->queue_by_ack.end(); iter++) { + if(iter->second->exists_data && + !iter->second->completed) { + return(true); + } + } + return(false); + } + void cleanup(u_int64_t act_time_from_header); + void printContent(int level = 0); +private: + void lock_queue() { + while(__sync_lock_test_and_set(&this->_sync_queue, 1)); + } + void unlock_queue() { + __sync_lock_release(&this->_sync_queue); + } + void pushpacket(TcpReassemblyStream::eDirection direction, + TcpReassemblyStream_packet packet); + void setLastSeq(TcpReassemblyStream::eDirection direction, + u_int32_t lastSeq); + void switchDirection(); +private: + TcpReassembly *reassembly; + u_int32_t ip_src; + u_int32_t ip_dst; + u_int16_t port_src; + u_int16_t port_dst; + eState state; + u_int32_t first_seq_to_dest; + u_int32_t first_seq_to_source; + bool rst; + bool fin_to_dest; + bool fin_to_source; + map queue_by_ack; + map queue_flags_by_ack; + map queue_nul_by_ack; + deque queue; + volatile int _sync_queue; + u_int64_t created_at; + u_int64_t last_packet_at; + u_int64_t created_at_from_header; + u_int64_t last_packet_at_from_header; + u_int32_t last_ack; + bool exists_data; + int link_is_ok; + size_t completed_offset; + int direction_confirm; + vector ok_streams; +friend class TcpReassembly; +friend class TcpReassemblyStream; +}; + +class TcpReassembly { +public: + TcpReassembly() { + this->_sync_links = 0; + this->enableHttpForceInit = false; + this->enableCrazySequence = false; + this->dataCallback = NULL; + this->act_time_from_header = 0; + this->last_time = 0; + this->doPrintContent = false; + } + ~TcpReassembly(); + void push(pcap_pkthdr *header, iphdr2 *header_ip, u_char *packet, + pcap_block_store *block_store = NULL, int block_store_index = 0); + void cleanup(bool all = false); + void setEnableHttpForceInit(bool enableHttpForceInit = true) { + this->enableHttpForceInit = enableHttpForceInit; + } + void setEnableCrazySequence(bool enableCrazySequence = true) { + this->enableCrazySequence = enableCrazySequence; + } + void setDataCallback(TcpReassemblyProcessData *dataCallback) { + this->dataCallback = dataCallback; + } + /* + bool enableStop(); + */ + void printContent(); + void setDoPrintContent() { + this->doPrintContent = true; + } +private: + void lock_links() { + while(__sync_lock_test_and_set(&this->_sync_links, 1)); + } + void unlock_links() { + __sync_lock_release(&this->_sync_links); + } +private: + map links; + volatile int _sync_links; + bool enableHttpForceInit; + bool enableCrazySequence; + TcpReassemblyProcessData *dataCallback; + u_int64_t act_time_from_header; + u_int64_t last_time; + bool doPrintContent; +friend class TcpReassemblyLink; +}; + +#endif diff --git a/tools.h b/tools.h index 5ced18a82..71a62a10e 100644 --- a/tools.h +++ b/tools.h @@ -47,4 +47,18 @@ struct dstring std::string str[2]; }; +struct d_u_int32_t +{ + d_u_int32_t() { + } + d_u_int32_t(u_int32_t val1, u_int32_t val2) { + val[0] = val1; + val[1] = val2; + } + u_int32_t operator [] (int indexVal) { + return(val[indexVal]); + } + u_int32_t val[2]; +}; + #endif diff --git a/voipmonitor.cpp b/voipmonitor.cpp index c85714070..d34c27545 100644 --- a/voipmonitor.cpp +++ b/voipmonitor.cpp @@ -56,6 +56,8 @@ #include "ipaccount.h" #include "pcap_queue.h" #include "generator.h" +#include "tcpreassembly.h" +#include "http.h" #if defined(QUEUE_MUTEX) || defined(QUEUE_NONBLOCK) extern "C" { @@ -211,6 +213,7 @@ char opt_keycheck[1024] = ""; char opt_convert_char[64] = ""; int opt_skinny = 0; int opt_read_from_file = 0; +char opt_pb_read_from_file[256] = ""; int opt_dscp = 0; int opt_cdrproxy = 1; int opt_enable_lua_tables = 0; @@ -218,7 +221,9 @@ int opt_generator = 0; int opt_generator_channels = 1; int opt_skipdefault = 0; int opt_filesclean = 1; +int opt_enable_tcpreassembly = 0; int opt_allow_zerossrc = 0; +int opt_convert_dlt_sll_to_en10 = 0; unsigned int opt_maxpoolsize = 0; unsigned int opt_maxpooldays = 0; @@ -346,6 +351,7 @@ pthread_t cleanspool_thread; // ID of worker clean thread int terminating; // if set to 1, worker thread will terminate int terminating2; // if set to 1, worker thread will terminate char *sipportmatrix; // matrix of sip ports to monitor +char *httpportmatrix; // matrix of http ports to monitor char *ipaccountportmatrix; queue mysqlquery; @@ -360,6 +366,7 @@ pcap_packet *qring; #endif pcap_t *handle = NULL; // pcap handler +pcap_t *handle_dead_EN10MB = NULL; read_thread *threads; @@ -384,6 +391,10 @@ MySqlStore *sqlStore = NULL; char mac[32] = ""; +TcpReassembly *tcpReassembly; +HttpData *httpData; + + void mysqlquerypush(string q) { pthread_mutex_lock(&mysqlquery_lock); mysqlquery.push(q); @@ -560,7 +571,7 @@ void clean_maxpoolsize() { if(opt_maxpoolsize == 0) { return; } - + if(debugclean) cout << "clean_maxpoolsize\n"; // check total size @@ -1774,6 +1785,16 @@ int load_config(char *fname) { } } + // http ports + if (ini.GetAllValues("general", "httpport", values)) { + CSimpleIni::TNamesDepend::const_iterator i = values.begin(); + // reset default port + httpportmatrix[5060] = 0; + for (; i != values.end(); ++i) { + httpportmatrix[atoi(i->pItem)] = 1; + } + } + // ipacc ports if (ini.GetAllValues("general", "ipaccountport", values)) { CSimpleIni::TNamesDepend::const_iterator i = values.begin(); @@ -2355,6 +2376,14 @@ int load_config(char *fname) { if((value = ini.GetValue("general", "mirror_bind_dlt", NULL))) { opt_pcap_queue_receive_dlt = atoi(value); } + + if((value = ini.GetValue("general", "tcpreassembly", NULL))) { + opt_enable_tcpreassembly = strcmp(value, "only") ? yesno(value) : 2; + } + + if((value = ini.GetValue("general", "convert_dlt_sll_to_en10", NULL))) { + opt_convert_dlt_sll_to_en10 = yesno(value); + } /* @@ -2562,6 +2591,7 @@ int main(int argc, char *argv[]) { sipportmatrix = (char*)calloc(1, sizeof(char) * 65537); // set default SIP port to 5060 sipportmatrix[5060] = 1; + httpportmatrix = (char*)calloc(1, sizeof(char) * 65537); pthread_mutex_init(&mysqlquery_lock, NULL); @@ -2758,11 +2788,15 @@ int main(int argc, char *argv[]) { verbosity = atoi(optarg); break; case 'r': - strcpy(fname, optarg); - opt_read_from_file = 1; - opt_scanpcapdir[0] = '\0'; - //opt_cachedir[0] = '\0'; - opt_pcap_queue = 0; + if(!strncmp(optarg, "pb:", 3)) { + strcpy(opt_pb_read_from_file, optarg + 3); + } else { + strcpy(fname, optarg); + opt_read_from_file = 1; + opt_scanpcapdir[0] = '\0'; + //opt_cachedir[0] = '\0'; + opt_pcap_queue = 0; + } break; case 'c': opt_nocdr = 1; @@ -3083,6 +3117,22 @@ int main(int argc, char *argv[]) { signal(SIGTERM,sigterm_handler); calltable = new Calltable; + + if(opt_enable_tcpreassembly) { + bool setHttpPorts = false; + for(int i = 0; i < 65537; i++) { + if(httpportmatrix[i]) { + setHttpPorts = true; + } + } + if(setHttpPorts) { + tcpReassembly = new TcpReassembly; + tcpReassembly->setEnableHttpForceInit(); + tcpReassembly->setEnableCrazySequence(); + httpData = new HttpData; + tcpReassembly->setDataCallback(httpData); + } + } // preparing pcap reading and pcap filters @@ -3158,6 +3208,9 @@ int main(int argc, char *argv[]) { return(2); } } + if(opt_convert_dlt_sll_to_en10) { + handle_dead_EN10MB = pcap_open_dead(DLT_EN10MB, 65535); + } } else { // if reading file rtp_threaded = 0; @@ -3265,7 +3318,7 @@ int main(int argc, char *argv[]) { if (opt_fork){ daemonize(); } - + pthread_create(&cleanspool_thread, NULL, clean_spooldir, NULL); // start thread processing queued cdr - supressed if run as sender @@ -3444,6 +3497,10 @@ int main(int argc, char *argv[]) { delete pcapQueueR; } else { + + if(opt_pb_read_from_file[0] && opt_enable_tcpreassembly) { + sqlStore->setIgnoreTerminating(STORE_PROC_ID_HTTP, true); + } PcapQueue_readFromInterface *pcapQueueI = new PcapQueue_readFromInterface("interface"); pcapQueueI->setInterfaceName(ifname); @@ -3463,18 +3520,36 @@ int main(int argc, char *argv[]) { while(!terminating) { if(_counter && !(_counter % 10)) { pcapQueueQ->pcapStat(10); + if(tcpReassembly) { + tcpReassembly->setDoPrintContent(); + } } sleep(1); ++_counter; } pcapQueueI->terminate(); - sleep(1); + sleep(opt_pb_read_from_file[0] && opt_enable_tcpreassembly ? 30 : 1); pcapQueueQ->terminate(); sleep(1); + if(tcpReassembly) { + delete tcpReassembly; + tcpReassembly = NULL; + } + if(httpData) { + delete httpData; + httpData = NULL; + } + delete pcapQueueI; delete pcapQueueQ; + + if(opt_pb_read_from_file[0] && opt_enable_tcpreassembly) { + sqlStore->setIgnoreTerminating(STORE_PROC_ID_HTTP, false); + sleep(2); + } + } } else { @@ -3527,8 +3602,13 @@ int main(int argc, char *argv[]) { } // close handler - if(opt_scanpcapdir[0] == '\0' && !opt_pcap_queue) { - pcap_close(handle); + if(opt_scanpcapdir[0] == '\0') { + if(!opt_pcap_queue) { + pcap_close(handle); + } + if(handle_dead_EN10MB) { + pcap_close(handle_dead_EN10MB); + } } // flush all queues @@ -3552,6 +3632,13 @@ int main(int argc, char *argv[]) { delete call; calls--; } + + if(tcpReassembly) { + delete tcpReassembly; + } + if(httpData) { + delete httpData; + } if(!opt_nocdr) { pthread_mutex_lock(&mysqlquery_lock); @@ -3564,6 +3651,7 @@ int main(int argc, char *argv[]) { } free(sipportmatrix); + free(httpportmatrix); if(opt_ipaccount) { free(ipaccountportmatrix); } diff --git a/voipmonitor.h b/voipmonitor.h index 428cfd478..2795c7d33 100644 --- a/voipmonitor.h +++ b/voipmonitor.h @@ -18,6 +18,7 @@ #define TYPE_RTCP 3 #define TYPE_SKINNY 4 +#define STORE_PROC_ID_HTTP 71 #define STORE_PROC_ID_IPACC_1 81 #define STORE_PROC_ID_IPACC_2 82 #define STORE_PROC_ID_IPACC_3 83