Skip to content

Commit

Permalink
update to pcap2mgen
Browse files Browse the repository at this point in the history
  • Loading branch information
bebopagogo committed Dec 13, 2019
1 parent 994f090 commit f90b51c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 62 deletions.
11 changes: 0 additions & 11 deletions src/common/mgen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,17 +1080,6 @@ void Mgen::UpdateRecvAnalytics(const ProtoTime& theTime, MgenMsg* theMsg, Protoc
if (NULL != controller)
controller->OnUpdateReport(theTime, report);
report.Log(log_file, theTime, theTime, local_time);
/*// locally print updated report info (TBD - do REPORT log event for received flows)
time_t tvSec = theTime.sec();
struct tm* timePtr = gmtime(&tvSec);
fprintf(stdout, "%02d:%02d:%02d.%06lu ", timePtr->tm_hour,
timePtr->tm_min, timePtr->tm_sec, theTime.usec());
fprintf(stdout, "%s/%hu->", theMsg->GetSrcAddr().GetHostString(), theMsg->GetSrcAddr().GetPort());
fprintf(stdout, "%s/%hu,%lu ", theMsg->GetDstAddr().GetHostString(), theMsg->GetDstAddr().GetPort(), (unsigned long)theMsg->GetFlowId());
fprintf(stdout, "rate>%lg kbps ", analytic->GetReportRateAverage()*8.0e-03);
fprintf(stdout, "loss>%lg ", analytic->GetReportLossFraction()*100.0);
fprintf(stdout, "latency>%lg,%lg,%lg\n", analytic->GetReportLatencyAverage(),
analytic->GetReportLatencyMin(), analytic->GetReportLatencyMax());*/
}

} // end Mgen::UpdateRecvAnalytics()
Expand Down
103 changes: 59 additions & 44 deletions src/common/mgenFlow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,19 @@ void MgenFlow::Reset()

bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
{

// This code (and MgenTransport) needs to be redone because
// the code is way too complicated for the simple jobs it
// should be doing, The MgenFlow tx_timer should only be used
// to queue messages to the underlying transport and the flow
// sequence number should only be incremented when a message
// is successfully enqueued. We could pause the timer upon
// enqueue failure and the underlying MgenTransport would reactivate
// the timer when there is space in the transport queue available.
// It looks like transport handoff of some sort has been implemented that
// is perhaps when a flow attribute change gets the flow assigned to
// a different socket? E.g., a different source port, TOS, etc?


if (!flow_transport || ((message_limit >= 0) && (messages_sent >= message_limit)))
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
Expand Down Expand Up @@ -1274,9 +1286,9 @@ bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
}
else
{
// Don't turn on timer if we have an unlimited rate
if (!pattern.UnlimitedRate())
return GetNextInterval();
// Don't turn on timer if we have an unlimited rate
if (!pattern.UnlimitedRate())
return GetNextInterval();
}
}

Expand All @@ -1285,34 +1297,36 @@ bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
// readiness. Otherwise service any pending flows
// the transport might have first using socket output
// notification to send as fast as possible
if ((flow_transport && flow_transport->HasPendingFlows())
&& (!pattern.UnlimitedRate() && !socket_error))

{
// If we have an unlimited transmission rate and have had
// a socket error don't start output notification in this
// case we will use a tx_timer to prevent thrashing until
// the condition is resolved
flow_transport->StartOutputNotification();

if (queue_limit > 0)
{
pending_messages++;
flow_transport->AppendFlow(this);
}
// If we've exceeded our queue limit, turn off the timer
// we'll restart it when the queue gets below the limit,
// unless we have an unlimited queue size (queue_limit -1)
// or we're sending packets as fast as possible
if ((queue_limit > 0 && pending_messages >= queue_limit)
&& (!pattern.UnlimitedRate())) // ljt should we allow queue limits for unlimited rates?
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
return false; // don't want to fail twice!
}
else
return GetNextInterval();
}
if ((flow_transport && flow_transport->HasPendingFlows())
&& (!pattern.UnlimitedRate() && !socket_error))

{
// If we have an unlimited transmission rate and have had
// a socket error don't start output notification in this
// case we will use a tx_timer to prevent thrashing until
// the condition is resolved
flow_transport->StartOutputNotification();

if (queue_limit > 0)
{
pending_messages++;
flow_transport->AppendFlow(this);
}
// If we've exceeded our queue limit, turn off the timer
// we'll restart it when the queue gets below the limit,
// unless we have an unlimited queue size (queue_limit -1)
// or we're sending packets as fast as possible
if ((queue_limit > 0 && pending_messages >= queue_limit)
&& (!pattern.UnlimitedRate())) // ljt should we allow queue limits for unlimited rates?
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
return false; // don't want to fail twice!
}
else
{
return GetNextInterval();
}
}

if (!flow_transport)
{
Expand All @@ -1331,19 +1345,20 @@ bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
if (tx_timer.IsActive()) tx_timer.Deactivate();
return false;
}
else
// else if we have an unlimited rate and a socket_error
// schedule a transmission timer at 100 milliseconds
// to prevent thrashing
if (pattern.UnlimitedRate() && socket_error)
{
tx_timer.SetInterval(0.001);
if (!tx_timer.IsActive())
timer_mgr.ActivateTimer(tx_timer);
return true;
else if (pattern.UnlimitedRate() && socket_error)
{
// we have an unlimited rate and a socket_error
// schedule a transmission timer at 100 milliseconds
// to prevent thrashing
tx_timer.SetInterval(0.001);
if (!tx_timer.IsActive())
timer_mgr.ActivateTimer(tx_timer);
return true;
}
else
return GetNextInterval();
else
{
return GetNextInterval();
}

} // end MgenFlow::OnTxTimeout()

Expand Down
17 changes: 13 additions & 4 deletions src/common/mgenTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ bool MgenTransport::SendPendingMessage()
{
// Send pending messages until we hit congestion
// or clear the queue...

while (IsOpen() && !IsTransmitting() && pending_current)
{
int msgLimit = pending_current->GetMessageLimit();
Expand All @@ -218,7 +217,8 @@ bool MgenTransport::SendPendingMessage()
if ((pending_current->GetPending() > 0) ||
(pending_current->UnlimitedRate() && sendMore))
{
if (!pending_current->SendMessage()) return false;
if (!pending_current->SendMessage())
return false;
}

// Restart flow timer if we're below the queue limit
Expand Down Expand Up @@ -253,8 +253,9 @@ bool MgenTransport::SendPendingMessage()

// If we've sent all pending messages,
// remove flow from pending list.

if (!pending_current->GetPending() &&
(!pending_current->UnlimitedRate() || !sendMore))
(!pending_current->UnlimitedRate() || !sendMore))
{
RemoveFromPendingList(); //ljt remove this function
// or replace remove flow
Expand Down Expand Up @@ -1015,11 +1016,19 @@ MessageStatus MgenUdpTransport::SendMessage(MgenMsg& theMsg, const ProtoAddress&
theMsg.WriteChecksum(txChecksum,(unsigned char*)txBuffer,(UINT32)len);

bool result = socket.SendTo((char*)txBuffer,len,dstAddr);

// Note on BSD systems (incl. Mac OSX) UDP sockets don't really block.
// On some BSD systems, an ENOBUFS will occur but OSX always acts like the
// packet was sent ... so somewhere in the MGEN code we need to handle
// OSX differently ... Probably the best strategy would be to always go
// back to select() call ... (i.e. ProtoDispatcher::Wait())

// If result is true but numBytes == 0
// we had an EWOULDBLOCK condition
if (result && len == 0)
{
return MSG_SEND_BLOCKED;
}

// We had some other socket failure
if (!result)
Expand All @@ -1030,7 +1039,7 @@ MessageStatus MgenUdpTransport::SendMessage(MgenMsg& theMsg, const ProtoAddress&
else
#endif // !_WIN32_WCE
DMSG(PL_WARN,"MgenUdpTransport::SendMessage() socket.SendTo() error: %s\n", GetErrorString());
return MSG_SEND_FAILED;
return MSG_SEND_FAILED;
}

LogEvent(SEND_EVENT, &theMsg,theMsg.GetTxTime(), txBuffer);
Expand Down
6 changes: 3 additions & 3 deletions src/common/pcap2mgen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ int main(int argc, char* argv[])
if (!udpPkt.InitFromPacket(ipPkt)) continue; // not a UDP packet

MgenMsg msg;
if (!msg.Unpack((char*)udpPkt.GetPayload(), udpPkt.GetPayloadLength(), false, false))
if (!msg.Unpack(udpPkt.AccessPayload(), udpPkt.GetPayloadLength(), false, false))
{
fprintf(stderr, "pcap2mgen warning: UDP packet not an MGEN packet?\n");
continue;
Expand All @@ -163,7 +163,7 @@ int main(int argc, char* argv[])

if (trace)
{
fprintf(outfile, "%lu.%lu ", hdr.ts.tv_sec, hdr.ts.tv_usec);
fprintf(outfile, "%lu.%lu ", (unsigned long)hdr.ts.tv_sec, (unsigned long)hdr.ts.tv_usec);
ProtoAddress ethAddr;
ethPkt.GetSrcAddr(ethAddr);
fprintf(outfile, "esrc>%s ", ethAddr.GetHostString());
Expand All @@ -172,7 +172,7 @@ int main(int argc, char* argv[])
}
// TBD - Add option to log REPORT events only? Embed MGEN analytic, too?
// Should we make "flush" true by default?
msg.LogRecvEvent(outfile, false, false, true, false, true, (char*)udpPkt.AccessPayload(), false, hdr.ts);
msg.LogRecvEvent(outfile, false, false, true, false, true, udpPkt.AccessPayload(), false, hdr.ts);
} // end while (pcap_next())

if (stdin != infile) fclose(infile);
Expand Down

0 comments on commit f90b51c

Please sign in to comment.