Skip to content

Commit

Permalink
face: correctly handle removed fragments in LpReliability
Browse files Browse the repository at this point in the history
refs #4479

Change-Id: Id5d1aa231ddfc10a14859ef819f6dde0a4111501
  • Loading branch information
eric135 committed Apr 3, 2018
1 parent 3406dc9 commit 971d962
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 48 deletions.
40 changes: 29 additions & 11 deletions daemon/face/lp-reliability.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2017, Regents of the University of California,
* Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
Expand Down Expand Up @@ -82,7 +82,7 @@ LpReliability::handleOutgoing(std::vector<lp::Packet>& frags, lp::Packet&& pkt,
std::forward_as_tuple(frag));
unackedFragsIt->second.sendTime = sendTime;
unackedFragsIt->second.rtoTimer =
scheduler::schedule(m_rto.computeRto(), bind(&LpReliability::onLpPacketLost, this, unackedFragsIt));
scheduler::schedule(m_rto.computeRto(), bind(&LpReliability::onLpPacketLost, this, txSeq));
unackedFragsIt->second.netPkt = netPkt;

if (m_unackedFrags.size() == 1) {
Expand Down Expand Up @@ -127,9 +127,20 @@ LpReliability::processIncomingPacket(const lp::Packet& pkt)
// packet. Potentially increment the start of the window.
onLpPacketAcknowledged(fragIt);

// This set contains TxSequences that have been removed by onLpPacketLost below because they
// were part of a network packet that was removed due to a fragment exceeding retx, as well as
// any other TxSequences removed by onLpPacketLost. This prevents onLpPacketLost from being
// called later for an invalid iterator.
std::set<lp::Sequence> removedLpPackets;

// Resend or fail fragments considered lost. Potentially increment the start of the window.
for (UnackedFrags::iterator txSeqIt : lostLpPackets) {
this->onLpPacketLost(txSeqIt);
for (lp::Sequence txSeq : lostLpPackets) {
if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
auto removedThisTxSeq = this->onLpPacketLost(txSeq);
for (auto removedTxSeq : removedThisTxSeq) {
removedLpPackets.insert(removedTxSeq);
}
}
}
}

Expand Down Expand Up @@ -206,10 +217,10 @@ LpReliability::stopIdleAckTimer()
m_isIdleAckTimerRunning = false;
}

std::vector<LpReliability::UnackedFrags::iterator>
std::vector<lp::Sequence>
LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
{
std::vector<UnackedFrags::iterator> lostLpPackets;
std::vector<lp::Sequence> lostLpPackets;

for (auto it = m_firstUnackedFrag; ; ++it) {
if (it == m_unackedFrags.end()) {
Expand All @@ -224,27 +235,30 @@ LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
unackedFrag.nGreaterSeqAcks++;

if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
lostLpPackets.push_back(it);
lostLpPackets.push_back(it->first);
}
}

return lostLpPackets;
}

void
LpReliability::onLpPacketLost(UnackedFrags::iterator txSeqIt)
std::vector<lp::Sequence>
LpReliability::onLpPacketLost(lp::Sequence txSeq)
{
BOOST_ASSERT(m_unackedFrags.count(txSeqIt->first) > 0);
BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
auto txSeqIt = m_unackedFrags.find(txSeq);

auto& txFrag = txSeqIt->second;
txFrag.rtoTimer.cancel();
auto netPkt = txFrag.netPkt;
std::vector<lp::Sequence> removedThisTxSeq;

// Check if maximum number of retransmissions exceeded
if (txFrag.retxCount >= m_options.maxRetx) {
// Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
if (netPkt->unackedFrags[i] != txSeqIt) {
removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
deleteUnackedFrag(netPkt->unackedFrags[i]);
}
}
Expand All @@ -260,6 +274,7 @@ LpReliability::onLpPacketLost(UnackedFrags::iterator txSeqIt)
onDroppedInterest(Interest(frag));
}

removedThisTxSeq.push_back(txSeqIt->first);
deleteUnackedFrag(txSeqIt);
}
else {
Expand All @@ -284,15 +299,18 @@ LpReliability::onLpPacketLost(UnackedFrags::iterator txSeqIt)
BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
*fragInNetPkt = newTxFragIt;

removedThisTxSeq.push_back(txSeqIt->first);
deleteUnackedFrag(txSeqIt);

// Retransmit fragment
m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt));

// Start RTO timer for this sequence
newTxFrag.rtoTimer = scheduler::schedule(m_rto.computeRto(),
bind(&LpReliability::onLpPacketLost, this, newTxFragIt));
bind(&LpReliability::onLpPacketLost, this, newTxSeq));
}

return removedThisTxSeq;
}

void
Expand Down
11 changes: 6 additions & 5 deletions daemon/face/lp-reliability.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2017, Regents of the University of California,
* Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
Expand Down Expand Up @@ -135,15 +135,16 @@ class LpReliability : noncopyable
/** \brief find and mark as lost fragments where a configurable number of Acks
* (\p m_options.seqNumLossThreshold) have been received for greater TxSequence numbers
* \param ackIt iterator pointing to acknowledged fragment
* \return vector containing iterators to fragments marked lost by this mechanism
* \return vector containing TxSequences of fragments marked lost by this mechanism
*/
std::vector<UnackedFrags::iterator>
std::vector<lp::Sequence>
findLostLpPackets(UnackedFrags::iterator ackIt);

/** \brief resend (or give up on) a lost fragment
* \return vector of the TxSequences of fragments removed due to a network packet being removed
*/
void
onLpPacketLost(UnackedFrags::iterator txSeqIt);
std::vector<lp::Sequence>
onLpPacketLost(lp::Sequence txSeq);

/** \brief remove the fragment with the given sequence number from the map of unacknowledged
* fragments, as well as its associated network packet (if any)
Expand Down
92 changes: 60 additions & 32 deletions tests/daemon/face/lp-reliability.t.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2017, Regents of the University of California,
* Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
Expand Down Expand Up @@ -67,25 +67,25 @@ class DummyLpReliabilityLinkService : public GenericLinkService

private:
void
doSendInterest(const Interest& interest) override
doSendInterest(const Interest&) final
{
BOOST_ASSERT(false);
}

void
doSendData(const Data& data) override
doSendData(const Data&) final
{
BOOST_ASSERT(false);
}

void
doSendNack(const lp::Nack& nack) override
doSendNack(const lp::Nack&) final
{
BOOST_ASSERT(false);
}

void
doReceivePacket(Transport::Packet&& packet) override
doReceivePacket(Transport::Packet&&) final
{
BOOST_ASSERT(false);
}
Expand Down Expand Up @@ -119,12 +119,6 @@ class LpReliabilityFixture : public UnitTestTimeFixture
});
}

LpReliability::UnackedFrags::iterator
getIteratorFromTxSeq(lp::Sequence txSeq)
{
return reliability->m_unackedFrags.find(txSeq);
}

/** \brief make an LpPacket with fragment of specified size
* \param pktNo packet identifier, which can be extracted with \p getPktNo
* \param payloadSize total payload size; if this is less than 4, 4 will be used
Expand Down Expand Up @@ -161,7 +155,7 @@ class LpReliabilityFixture : public UnitTestTimeFixture
return value;
}

public:
protected:
unique_ptr<DummyLpReliabilityLinkService> linkService;
unique_ptr<DummyTransport> transport;
unique_ptr<DummyFace> face;
Expand All @@ -170,8 +164,6 @@ class LpReliabilityFixture : public UnitTestTimeFixture

BOOST_FIXTURE_TEST_SUITE(TestLpReliability, LpReliabilityFixture)

BOOST_AUTO_TEST_SUITE(Sender)

BOOST_AUTO_TEST_CASE(SendNoFragmentField)
{
lp::Packet pkt;
Expand All @@ -191,11 +183,12 @@ BOOST_AUTO_TEST_CASE(SendUnfragmentedRetx)
lp::Packet pkt2 = makeFrag(3000, 30);

linkService->sendLpPackets({pkt1});
BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 1);
lp::Packet cached1(transport->sentPackets.front().packet);
BOOST_REQUIRE(cached1.has<lp::TxSequenceField>());
BOOST_CHECK_EQUAL(cached1.get<lp::TxSequenceField>(), 2);
BOOST_CHECK(!cached1.has<lp::SequenceField>());
lp::Sequence firstTxSeq = cached1.get<lp::TxSequenceField>();
BOOST_CHECK_EQUAL(firstTxSeq, 2);
BOOST_CHECK_EQUAL(getPktNo(cached1), 1024);
BOOST_CHECK_EQUAL(linkService->getCounters().nAcknowledged, 0);
BOOST_CHECK_EQUAL(linkService->getCounters().nRetransmitted, 0);
Expand All @@ -214,7 +207,7 @@ BOOST_AUTO_TEST_CASE(SendUnfragmentedRetx)
BOOST_CHECK(reliability->m_unackedFrags.at(firstTxSeq).netPkt);
BOOST_CHECK(reliability->m_unackedFrags.at(firstTxSeq + 1).netPkt);
BOOST_CHECK_NE(reliability->m_unackedFrags.at(firstTxSeq).netPkt,
reliability->m_unackedFrags.at(firstTxSeq + 1).netPkt);
reliability->m_unackedFrags.at(firstTxSeq + 1).netPkt);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.at(firstTxSeq).retxCount, 0);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.at(firstTxSeq + 1).retxCount, 0);
BOOST_CHECK_EQUAL(reliability->m_firstUnackedFrag->first, firstTxSeq);
Expand Down Expand Up @@ -374,7 +367,7 @@ BOOST_AUTO_TEST_CASE(SendFragmentedRetx)
// 2049 rto: 1000ms, txSeq: 5, started T+250ms, retx 1
// 2050 rto: 1000ms, txSeq: 4, started T+0ms, retx 0
advanceClocks(time::milliseconds(1), 250);
reliability->onLpPacketLost(getIteratorFromTxSeq(3));
reliability->onLpPacketLost(3);

BOOST_CHECK_EQUAL(reliability->m_unackedFrags.count(2), 1);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.count(3), 0);
Expand Down Expand Up @@ -408,7 +401,7 @@ BOOST_AUTO_TEST_CASE(SendFragmentedRetx)
// 2049 rto: 1000ms, txSeq: 6, started T+500ms, retx 2
// 2050 rto: 1000ms, txSeq: 4, started T+0ms, retx 0
advanceClocks(time::milliseconds(1), 250);
reliability->onLpPacketLost(getIteratorFromTxSeq(5));
reliability->onLpPacketLost(5);

BOOST_CHECK_EQUAL(reliability->m_unackedFrags.count(2), 1);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.count(5), 0);
Expand Down Expand Up @@ -442,7 +435,7 @@ BOOST_AUTO_TEST_CASE(SendFragmentedRetx)
// 2049 rto: 1000ms, txSeq: 7, started T+750ms, retx 3
// 2050 rto: 1000ms, txSeq: 4, started T+0ms, retx 0
advanceClocks(time::milliseconds(1), 250);
reliability->onLpPacketLost(getIteratorFromTxSeq(6));
reliability->onLpPacketLost(6);

BOOST_REQUIRE_EQUAL(reliability->m_unackedFrags.count(2), 1);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.count(6), 0);
Expand Down Expand Up @@ -476,7 +469,7 @@ BOOST_AUTO_TEST_CASE(SendFragmentedRetx)
// 2049 rto: expired, removed
// 2050 rto: expired, removed
advanceClocks(time::milliseconds(1), 100);
reliability->onLpPacketLost(getIteratorFromTxSeq(7));
reliability->onLpPacketLost(7);

BOOST_CHECK_EQUAL(reliability->m_unackedFrags.size(), 0);
BOOST_CHECK_EQUAL(reliability->m_ackQueue.size(), 0);
Expand Down Expand Up @@ -515,11 +508,14 @@ BOOST_AUTO_TEST_CASE(AckUnknownTxSeq)
BOOST_CHECK_EQUAL(linkService->getCounters().nDroppedInterests, 0);
}

BOOST_AUTO_TEST_CASE(LossByGreaterAcks) // detect loss by 3x greater Acks, also tests wraparound
BOOST_AUTO_TEST_CASE(LossByGreaterAcks)
{
// Detect loss by 3x greater Acks, also tests wraparound

reliability->m_lastTxSeqNo = 0xFFFFFFFFFFFFFFFE;

// Passed to sendLpPackets individually since they are from separate, non-fragmented network packets
// Passed to sendLpPackets individually since they are
// from separate, non-fragmented network packets
linkService->sendLpPackets({makeFrag(1, 50)});
linkService->sendLpPackets({makeFrag(2, 50)});
linkService->sendLpPackets({makeFrag(3, 50)});
Expand Down Expand Up @@ -653,6 +649,44 @@ BOOST_AUTO_TEST_CASE(LossByGreaterAcks) // detect loss by 3x greater Acks, also
BOOST_CHECK_EQUAL(linkService->getCounters().nDroppedInterests, 0);
}

BOOST_AUTO_TEST_CASE(SkipFragmentsRemovedInRtt)
{
auto opts = linkService->getOptions();
opts.reliabilityOptions.maxRetx = 0; // just to make the test case shorter
opts.reliabilityOptions.seqNumLossThreshold = 3;
linkService->setOptions(opts);

lp::Packet frag1 = makeFrag(5001);
lp::Packet frag2 = makeFrag(5002);
linkService->sendLpPackets({frag1, frag2}); // First packet has 2 fragments
linkService->sendLpPackets({makeFrag(5003)});
linkService->sendLpPackets({makeFrag(5004)});
linkService->sendLpPackets({makeFrag(5005)});

BOOST_CHECK_EQUAL(transport->sentPackets.size(), 5);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.size(), 5);

lp::Sequence firstTxSeq = reliability->m_firstUnackedFrag->first;

// Ack the last 2 packets
lp::Packet ackPkt1;
ackPkt1.add<lp::AckField>(firstTxSeq + 4);
ackPkt1.add<lp::AckField>(firstTxSeq + 3);
reliability->processIncomingPacket(ackPkt1);

BOOST_CHECK_EQUAL(reliability->m_unackedFrags.size(), 3);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.at(firstTxSeq).nGreaterSeqAcks, 2);
BOOST_CHECK_EQUAL(reliability->m_unackedFrags.at(firstTxSeq + 1).nGreaterSeqAcks, 2);

// Ack the third packet (5003)
// This triggers a "loss by greater Acks" for packets 5001 and 5002
lp::Packet ackPkt2;
ackPkt2.add<lp::AckField>(firstTxSeq + 2);
reliability->processIncomingPacket(ackPkt2); // tests crash/assert reported in bug #4479

BOOST_CHECK_EQUAL(reliability->m_unackedFrags.size(), 0);
}

BOOST_AUTO_TEST_CASE(CancelLossNotificationOnAck)
{
reliability->onDroppedInterest.connect([] (const Interest&) {
Expand All @@ -677,10 +711,6 @@ BOOST_AUTO_TEST_CASE(CancelLossNotificationOnAck)
BOOST_CHECK_EQUAL(linkService->getCounters().nDroppedInterests, 0);
}

BOOST_AUTO_TEST_SUITE_END() // Sender

BOOST_AUTO_TEST_SUITE(Receiver)

BOOST_AUTO_TEST_CASE(ProcessIncomingPacket)
{
BOOST_CHECK(!reliability->m_isIdleAckTimerRunning);
Expand Down Expand Up @@ -743,7 +773,7 @@ BOOST_AUTO_TEST_CASE(PiggybackAcksMtu)

BOOST_CHECK(!reliability->m_ackQueue.empty());

for (int i = 0; i < 5; i++) {
for (uint32_t i = 0; i < 5; i++) {
lp::Packet pkt = makeFrag(i, 60);
linkService->sendLpPackets({pkt});

Expand Down Expand Up @@ -923,7 +953,7 @@ BOOST_AUTO_TEST_CASE(IdleAckTimerMtu)

// given Ack of size 6 and MTU of 1500, 249 Acks/IDLE packet
BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 5);
for (int i = 0; i < 4; i++) {
for (size_t i = 0; i < 4; i++) {
lp::Packet sentPkt(transport->sentPackets[i].packet);
BOOST_CHECK(!sentPkt.has<lp::TxSequenceField>());
BOOST_CHECK_EQUAL(sentPkt.count<lp::AckField>(), 249);
Expand Down Expand Up @@ -956,7 +986,7 @@ BOOST_AUTO_TEST_CASE(IdleAckTimerMtu)

// given Ack of size 6 and MTU of 1500, 249 Acks/IDLE packet
BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 10);
for (int i = 5; i < 9; i++) {
for (size_t i = 5; i < 9; i++) {
lp::Packet sentPkt(transport->sentPackets[i].packet);
BOOST_CHECK(!sentPkt.has<lp::TxSequenceField>());
BOOST_CHECK_EQUAL(sentPkt.count<lp::AckField>(), 249);
Expand All @@ -982,7 +1012,7 @@ BOOST_AUTO_TEST_CASE(IdleAckTimerMtu)
// given Ack of size 8 and MTU of 1500, approx 187 Acks/IDLE packet
BOOST_CHECK(!reliability->m_isIdleAckTimerRunning);
BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 16);
for (int i = 10; i < 15; i++) {
for (size_t i = 10; i < 15; i++) {
lp::Packet sentPkt(transport->sentPackets[i].packet);
BOOST_CHECK(!sentPkt.has<lp::TxSequenceField>());
BOOST_CHECK_EQUAL(sentPkt.count<lp::AckField>(), 187);
Expand All @@ -1002,8 +1032,6 @@ BOOST_AUTO_TEST_CASE(IdleAckTimerMtu)
BOOST_CHECK_EQUAL(reliability->m_ackQueue.size(), 0);
}

BOOST_AUTO_TEST_SUITE_END() // Receiver

BOOST_AUTO_TEST_SUITE_END() // TestLpReliability
BOOST_AUTO_TEST_SUITE_END() // Face

Expand Down

0 comments on commit 971d962

Please sign in to comment.