Skip to content

Commit

Permalink
Support UDP tracker
Browse files Browse the repository at this point in the history
It shares UDP listening port with IPv4 DHT. At the moment, in order to
enable UDP tracker support, enable IPv4 DHT.
  • Loading branch information
tatsuhiro-t committed Feb 24, 2013
1 parent b782a56 commit d687416
Show file tree
Hide file tree
Showing 29 changed files with 2,271 additions and 102 deletions.
10 changes: 10 additions & 0 deletions src/BtAnnounce.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
#include <string>

#include "a2time.h"
#include "SharedHandle.h"

namespace aria2 {

class UDPTrackerRequest;

class BtAnnounce {
public:
virtual ~BtAnnounce() {}
Expand All @@ -65,6 +68,10 @@ class BtAnnounce {
*/
virtual std::string getAnnounceUrl() = 0;

virtual SharedHandle<UDPTrackerRequest>
createUDPTrackerRequest(const std::string& remoteAddr, uint16_t remotePort,
uint16_t localPort) = 0;

/**
* Tells that the announce process has just started.
*/
Expand Down Expand Up @@ -96,6 +103,9 @@ class BtAnnounce {
virtual void processAnnounceResponse(const unsigned char* trackerResponse,
size_t trackerResponseLength) = 0;

virtual void processUDPTrackerResponse
(const SharedHandle<UDPTrackerRequest>& req) = 0;

/**
* Returns true if no more announce is needed.
*/
Expand Down
10 changes: 9 additions & 1 deletion src/BtRegistry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@
#include "BtProgressInfoFile.h"
#include "bittorrent_helper.h"
#include "LpdMessageReceiver.h"
#include "UDPTrackerClient.h"
#include "NullHandle.h"

namespace aria2 {

BtRegistry::BtRegistry()
: tcpPort_(0)
: tcpPort_(0),
udpPort_(0)
{}

BtRegistry::~BtRegistry() {}
Expand Down Expand Up @@ -107,6 +109,12 @@ void BtRegistry::setLpdMessageReceiver
lpdMessageReceiver_ = receiver;
}

void BtRegistry::setUDPTrackerClient
(const SharedHandle<UDPTrackerClient>& tracker)
{
udpTrackerClient_ = tracker;
}

BtObject::BtObject
(const SharedHandle<DownloadContext>& downloadContext,
const SharedHandle<PieceStorage>& pieceStorage,
Expand Down
19 changes: 19 additions & 0 deletions src/BtRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BtRuntime;
class BtProgressInfoFile;
class DownloadContext;
class LpdMessageReceiver;
class UDPTrackerClient;

struct BtObject {
SharedHandle<DownloadContext> downloadContext;
Expand Down Expand Up @@ -80,7 +81,10 @@ class BtRegistry {
private:
std::map<a2_gid_t, SharedHandle<BtObject> > pool_;
uint16_t tcpPort_;
// This is IPv4 port for DHT and UDP tracker. No IPv6 udpPort atm.
uint16_t udpPort_;
SharedHandle<LpdMessageReceiver> lpdMessageReceiver_;
SharedHandle<UDPTrackerClient> udpTrackerClient_;
public:
BtRegistry();
~BtRegistry();
Expand Down Expand Up @@ -118,11 +122,26 @@ class BtRegistry {
return tcpPort_;
}

void setUdpPort(uint16_t port)
{
udpPort_ = port;
}
uint16_t getUdpPort() const
{
return udpPort_;
}

void setLpdMessageReceiver(const SharedHandle<LpdMessageReceiver>& receiver);
const SharedHandle<LpdMessageReceiver>& getLpdMessageReceiver() const
{
return lpdMessageReceiver_;
}

void setUDPTrackerClient(const SharedHandle<UDPTrackerClient>& tracker);
const SharedHandle<UDPTrackerClient>& getUDPTrackerClient() const
{
return udpTrackerClient_;
}
};

} // namespace aria2
Expand Down
1 change: 1 addition & 0 deletions src/BtSetup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "DHTMessageReceiver.h"
#include "DHTMessageFactory.h"
#include "DHTMessageCallback.h"
#include "UDPTrackerClient.h"
#include "BtProgressInfoFile.h"
#include "BtAnnounce.h"
#include "BtRuntime.h"
Expand Down
73 changes: 64 additions & 9 deletions src/DHTInteractionCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,16 @@
#include "LogFactory.h"
#include "DHTMessageCallback.h"
#include "DHTNode.h"
#include "DHTConnection.h"
#include "UDPTrackerClient.h"
#include "UDPTrackerRequest.h"
#include "fmt.h"
#include "wallclock.h"

namespace aria2 {

// TODO This name of this command is misleading, because now it also
// handles UDP trackers as well as DHT.
DHTInteractionCommand::DHTInteractionCommand(cuid_t cuid, DownloadEngine* e)
: Command(cuid),
e_(e)
Expand Down Expand Up @@ -77,23 +83,60 @@ void DHTInteractionCommand::disableReadCheckSocket(const SharedHandle<SocketCore

bool DHTInteractionCommand::execute()
{
if(e_->getRequestGroupMan()->downloadFinished() || e_->isHaltRequested()) {
// We need to keep this command alive while TrackerWatcherCommand
// needs this.
if(e_->getRequestGroupMan()->downloadFinished() ||
(e_->isHaltRequested() && udpTrackerClient_->getNumWatchers() == 0)) {
return true;
} else if(e_->isForceHaltRequested()) {
udpTrackerClient_->failAll();
return true;
}

taskQueue_->executeTask();

while(1) {
SharedHandle<DHTMessage> m = receiver_->receiveMessage();
if(!m) {
break;
std::string remoteAddr;
uint16_t remotePort;
unsigned char data[64*1024];
try {
while(1) {
ssize_t length = connection_->receiveMessage(data, sizeof(data),
remoteAddr, remotePort);
if(length <= 0) {
break;
}
if(data[0] == 'd') {
// udp tracker response does not start with 'd', so assume
// this message belongs to DHT. nothrow.
receiver_->receiveMessage(remoteAddr, remotePort, data, length);
} else {
// this may be udp tracker response. nothrow.
udpTrackerClient_->receiveReply(data, length, remoteAddr, remotePort,
global::wallclock());
}
}
} catch(RecoverableException& e) {
A2_LOG_INFO_EX("Exception thrown while receiving UDP message.", e);
}
receiver_->handleTimeout();
try {
dispatcher_->sendMessages();
} catch(RecoverableException& e) {
A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
udpTrackerClient_->handleTimeout(global::wallclock());
dispatcher_->sendMessages();
while(!udpTrackerClient_->getPendingRequests().empty()) {
// no throw
ssize_t length = udpTrackerClient_->createRequest(data, sizeof(data),
remoteAddr, remotePort,
global::wallclock());
if(length == -1) {
break;
}
try {
// throw
connection_->sendMessage(data, length, remoteAddr, remotePort);
udpTrackerClient_->requestSent(global::wallclock());
} catch(RecoverableException& e) {
A2_LOG_INFO_EX("Exception thrown while sending UDP tracker request.", e);
udpTrackerClient_->requestFail(UDPT_ERR_NETWORK);
}
}
e_->addCommand(this);
return false;
Expand All @@ -114,4 +157,16 @@ void DHTInteractionCommand::setTaskQueue(const SharedHandle<DHTTaskQueue>& taskQ
taskQueue_ = taskQueue;
}

void DHTInteractionCommand::setConnection
(const SharedHandle<DHTConnection>& connection)
{
connection_ = connection;
}

void DHTInteractionCommand::setUDPTrackerClient
(const SharedHandle<UDPTrackerClient>& udpTrackerClient)
{
udpTrackerClient_ = udpTrackerClient;
}

} // namespace aria2
9 changes: 9 additions & 0 deletions src/DHTInteractionCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class DHTMessageReceiver;
class DHTTaskQueue;
class DownloadEngine;
class SocketCore;
class DHTConnection;
class UDPTrackerClient;

class DHTInteractionCommand:public Command {
private:
Expand All @@ -53,6 +55,8 @@ class DHTInteractionCommand:public Command {
SharedHandle<DHTMessageReceiver> receiver_;
SharedHandle<DHTTaskQueue> taskQueue_;
SharedHandle<SocketCore> readCheckSocket_;
SharedHandle<DHTConnection> connection_;
SharedHandle<UDPTrackerClient> udpTrackerClient_;
public:
DHTInteractionCommand(cuid_t cuid, DownloadEngine* e);

Expand All @@ -69,6 +73,11 @@ class DHTInteractionCommand:public Command {
void setMessageReceiver(const SharedHandle<DHTMessageReceiver>& receiver);

void setTaskQueue(const SharedHandle<DHTTaskQueue>& taskQueue);

void setConnection(const SharedHandle<DHTConnection>& connection);

void setUDPTrackerClient
(const SharedHandle<UDPTrackerClient>& udpTrackerClient);
};

} // namespace aria2
Expand Down
23 changes: 8 additions & 15 deletions src/DHTMessageReceiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,11 @@ DHTMessageReceiver::DHTMessageReceiver

DHTMessageReceiver::~DHTMessageReceiver() {}

SharedHandle<DHTMessage> DHTMessageReceiver::receiveMessage()
SharedHandle<DHTMessage> DHTMessageReceiver::receiveMessage
(const std::string& remoteAddr, uint16_t remotePort, unsigned char *data,
size_t length)
{
std::string remoteAddr;
uint16_t remotePort;
unsigned char data[64*1024];
try {
ssize_t length = connection_->receiveMessage(data, sizeof(data),
remoteAddr,
remotePort);
if(length <= 0) {
return SharedHandle<DHTMessage>();
}
bool isReply = false;
SharedHandle<ValueBase> decoded = bencode2::decode(data, length);
const Dict* dict = downcast<Dict>(decoded);
Expand All @@ -87,21 +80,21 @@ SharedHandle<DHTMessage> DHTMessageReceiver::receiveMessage()
} else {
A2_LOG_INFO(fmt("Malformed DHT message. Missing 'y' key. From:%s:%u",
remoteAddr.c_str(), remotePort));
return handleUnknownMessage(data, sizeof(data), remoteAddr, remotePort);
return handleUnknownMessage(data, length, remoteAddr, remotePort);
}
} else {
A2_LOG_INFO(fmt("Malformed DHT message. This is not a bencoded directory."
" From:%s:%u",
remoteAddr.c_str(), remotePort));
return handleUnknownMessage(data, sizeof(data), remoteAddr, remotePort);
return handleUnknownMessage(data, length, remoteAddr, remotePort);
}
if(isReply) {
std::pair<SharedHandle<DHTResponseMessage>,
SharedHandle<DHTMessageCallback> > p =
tracker_->messageArrived(dict, remoteAddr, remotePort);
if(!p.first) {
// timeout or malicious? message
return handleUnknownMessage(data, sizeof(data), remoteAddr, remotePort);
return handleUnknownMessage(data, length, remoteAddr, remotePort);
}
onMessageReceived(p.first);
if(p.second) {
Expand All @@ -114,14 +107,14 @@ SharedHandle<DHTMessage> DHTMessageReceiver::receiveMessage()
if(*message->getLocalNode() == *message->getRemoteNode()) {
// drop message from localnode
A2_LOG_INFO("Received DHT message from localnode.");
return handleUnknownMessage(data, sizeof(data), remoteAddr, remotePort);
return handleUnknownMessage(data, length, remoteAddr, remotePort);
}
onMessageReceived(message);
return message;
}
} catch(RecoverableException& e) {
A2_LOG_INFO_EX("Exception thrown while receiving DHT message.", e);
return handleUnknownMessage(data, sizeof(data), remoteAddr, remotePort);
return handleUnknownMessage(data, length, remoteAddr, remotePort);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/DHTMessageReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class DHTMessageReceiver {

~DHTMessageReceiver();

SharedHandle<DHTMessage> receiveMessage();
SharedHandle<DHTMessage> receiveMessage
(const std::string& remoteAddr, uint16_t remotePort, unsigned char *data,
size_t length);

void handleTimeout();

Expand Down
15 changes: 13 additions & 2 deletions src/DHTSetup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
#include "DHTRegistry.h"
#include "DHTBucketRefreshTask.h"
#include "DHTMessageCallback.h"
#include "UDPTrackerClient.h"
#include "BtRegistry.h"
#include "prefs.h"
#include "Option.h"
#include "SocketCore.h"
Expand Down Expand Up @@ -176,6 +178,8 @@ void DHTSetup::setup
factory->setTokenTracker(tokenTracker.get());
factory->setLocalNode(localNode);

// For now, UDPTrackerClient was enabled along with DHT
SharedHandle<UDPTrackerClient> udpTrackerClient(new UDPTrackerClient());
// assign them into DHTRegistry
if(family == AF_INET) {
DHTRegistry::getMutableData().localNode = localNode;
Expand All @@ -187,6 +191,8 @@ void DHTSetup::setup
DHTRegistry::getMutableData().messageDispatcher = dispatcher;
DHTRegistry::getMutableData().messageReceiver = receiver;
DHTRegistry::getMutableData().messageFactory = factory;
e->getBtRegistry()->setUDPTrackerClient(udpTrackerClient);
e->getBtRegistry()->setUdpPort(localNode->getPort());
} else {
DHTRegistry::getMutableData6().localNode = localNode;
DHTRegistry::getMutableData6().routingTable = routingTable;
Expand Down Expand Up @@ -244,6 +250,8 @@ void DHTSetup::setup
command->setMessageReceiver(receiver);
command->setTaskQueue(taskQueue);
command->setReadCheckSocket(connection->getSocket());
command->setConnection(connection);
command->setUDPTrackerClient(udpTrackerClient);
tempCommands->push_back(command);
}
{
Expand Down Expand Up @@ -282,12 +290,15 @@ void DHTSetup::setup
}
commands.insert(commands.end(), tempCommands->begin(), tempCommands->end());
tempCommands->clear();
} catch(RecoverableException& e) {
} catch(RecoverableException& ex) {
A2_LOG_ERROR_EX(fmt("Exception caught while initializing DHT functionality."
" DHT is disabled."),
e);
ex);
if(family == AF_INET) {
DHTRegistry::clearData();
e->getBtRegistry()->setUDPTrackerClient
(SharedHandle<UDPTrackerClient>());
e->getBtRegistry()->setUdpPort(0);
} else {
DHTRegistry::clearData6();
}
Expand Down
Loading

0 comments on commit d687416

Please sign in to comment.