Skip to content

Commit

Permalink
Update TCP testing to Run in Parallel (microsoft#4010)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Dec 16, 2023
1 parent 8f2a8f2 commit 80b4211
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 33 deletions.
16 changes: 13 additions & 3 deletions src/inc/msquic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ struct CxPlatLockDispatch {
void Acquire() noexcept { CxPlatDispatchLockAcquire(&Handle); }
void Release() noexcept { CxPlatDispatchLockRelease(&Handle); }
};

struct CxPlatRwLockDispatch {
CXPLAT_DISPATCH_RW_LOCK Handle;
CxPlatRwLockDispatch() noexcept { CxPlatDispatchRwLockInitialize(&Handle); }
~CxPlatRwLockDispatch() noexcept { CxPlatDispatchRwLockUninitialize(&Handle); }
void AcquireShared() noexcept { CxPlatDispatchRwLockAcquireShared(&Handle); }
void AcquireExclusive() noexcept { CxPlatDispatchRwLockAcquireExclusive(&Handle); }
void ReleaseShared() noexcept { CxPlatDispatchRwLockReleaseShared(&Handle); }
void ReleaseExclusive() noexcept { CxPlatDispatchRwLockReleaseExclusive(&Handle); }
};
#pragma warning(pop)

struct CxPlatPool {
Expand Down Expand Up @@ -134,11 +144,11 @@ class CxPlatPoolT {

#ifdef CXPLAT_HASH_MIN_SIZE

struct HashTable {
struct CxPlatHashTable {
bool Initialized;
CXPLAT_HASHTABLE Table;
HashTable() noexcept { Initialized = CxPlatHashtableInitializeEx(&Table, CXPLAT_HASH_MIN_SIZE); }
~HashTable() noexcept { if (Initialized) { CxPlatHashtableUninitialize(&Table); } }
CxPlatHashTable() noexcept { Initialized = CxPlatHashtableInitializeEx(&Table, CXPLAT_HASH_MIN_SIZE); }
~CxPlatHashTable() noexcept { if (Initialized) { CxPlatHashtableUninitialize(&Table); } }
void Insert(CXPLAT_HASHTABLE_ENTRY* Entry) noexcept { CxPlatHashtableInsert(&Table, Entry, Entry->Signature, nullptr); }
void Remove(CXPLAT_HASHTABLE_ENTRY* Entry) noexcept { CxPlatHashtableRemove(&Table, Entry, nullptr); }
CXPLAT_HASHTABLE_ENTRY* Lookup(uint64_t Signature) noexcept {
Expand Down
2 changes: 1 addition & 1 deletion src/perf/lib/PerfClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct PerfClientConnection {
HQUIC Handle {nullptr};
TcpConnection* TcpConn;
};
HashTable StreamTable;
CxPlatHashTable StreamTable;
uint64_t StreamsCreated {0};
uint64_t StreamsActive {0};
bool WorkerConnComplete {false}; // Indicated completion to worker
Expand Down
48 changes: 23 additions & 25 deletions src/perf/lib/PerfServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ QUIC_STATUS
PerfServer::Start(
_In_ CXPLAT_EVENT* _StopEvent
) {
if (!Server.Start(&LocalAddr)) { // TCP
//printf("TCP Server failed to start!\n");
if (!Server.Start(&LocalAddr)) {
WriteOutput("Warning: TCP Server failed to start!\n");
}

StopEvent = _StopEvent;
Expand Down Expand Up @@ -158,17 +158,10 @@ PerfServer::ListenerCallback(
switch (Event->Type) {
case QUIC_LISTENER_EVENT_NEW_CONNECTION: {
BOOLEAN value = TRUE;
MsQuic->SetParam(
Event->NEW_CONNECTION.Connection,
QUIC_PARAM_CONN_DISABLE_1RTT_ENCRYPTION,
sizeof(value),
&value);
MsQuic->SetParam(Event->NEW_CONNECTION.Connection, QUIC_PARAM_CONN_DISABLE_1RTT_ENCRYPTION, sizeof(value), &value);
QUIC_CONNECTION_CALLBACK_HANDLER Handler =
[](HQUIC Conn, void* Context, QUIC_CONNECTION_EVENT* Event) -> QUIC_STATUS {
return ((PerfServer*)Context)->
ConnectionCallback(
Conn,
Event);
return ((PerfServer*)Context)->ConnectionCallback(Conn, Event);
};
MsQuic->SetCallbackHandler(Event->NEW_CONNECTION.Connection, (void*)Handler, this);
Status = MsQuic->ConnectionSetConfiguration(Event->NEW_CONNECTION.Connection, Configuration);
Expand Down Expand Up @@ -324,7 +317,7 @@ PerfServer::SendTcpResponse(

uint64_t BytesLeftToSend = Context->ResponseSize - Context->BytesSent;

auto SendData = new(std::nothrow) TcpSendData();
auto SendData = TcpSendDataAllocator.Alloc();
SendData->StreamId = (uint32_t)Context->Entry.Signature;
SendData->Open = Context->BytesSent == 0 ? 1 : 0;
SendData->Buffer = DataBuffer->Buffer;
Expand Down Expand Up @@ -352,7 +345,7 @@ PerfServer::TcpAcceptCallback(
)
{
auto This = (PerfServer*)Server->Context;
Connection->Context = This;
Connection->Context = This->TcpConnectionContextAllocator.Alloc(This);
}

_IRQL_requires_max_(DISPATCH_LEVEL)
Expand All @@ -365,6 +358,9 @@ PerfServer::TcpConnectCallback(
{
if (!IsConnected) {
Connection->Close();
auto This = (TcpConnectionContext*)Connection->Context;
auto Server = This->Server;
Server->TcpConnectionContextAllocator.Free(This);
}
}

Expand All @@ -381,10 +377,11 @@ PerfServer::TcpReceiveCallback(
uint8_t* Buffer
)
{
auto This = (PerfServer*)Connection->Context;
auto This = (TcpConnectionContext*)Connection->Context;
auto Server = This->Server;
StreamContext* Stream;
if (Open) {
if ((Stream = This->StreamContextAllocator.Alloc(This, false, false)) != nullptr) {
if ((Stream = Server->StreamContextAllocator.Alloc(Server, false, false)) != nullptr) {
Stream->Entry.Signature = StreamID;
Stream->IdealSendBuffer = 1; // TCP uses send buffering, so just set to 1.
This->StreamTable.Insert(&Stream->Entry);
Expand All @@ -402,30 +399,30 @@ PerfServer::TcpReceiveCallback(
}
if (Abort) {
Stream->ResponseSize = 0; // Reset to make sure we stop sending more
auto SendData = new(std::nothrow) TcpSendData();
auto SendData = Server->TcpSendDataAllocator.Alloc();
SendData->StreamId = StreamID;
SendData->Open = Open ? TRUE : FALSE;
SendData->Abort = TRUE;
SendData->Buffer = This->DataBuffer->Buffer;
SendData->Buffer = Server->DataBuffer->Buffer;
SendData->Length = 0;
Connection->Send(SendData);

} else if (Fin) {
if (Stream->ResponseSizeSet && Stream->ResponseSize != 0) {
This->SendTcpResponse(Stream, Connection);
Server->SendTcpResponse(Stream, Connection);
} else {
auto SendData = new(std::nothrow) TcpSendData();
auto SendData = Server->TcpSendDataAllocator.Alloc();
SendData->StreamId = StreamID;
SendData->Open = TRUE;
SendData->Fin = TRUE;
SendData->Buffer = This->DataBuffer->Buffer;
SendData->Buffer = Server->DataBuffer->Buffer;
SendData->Length = 0;
Connection->Send(SendData);
}
Stream->RecvShutdown = true;
if (Stream->SendShutdown) {
This->StreamTable.Remove(&Stream->Entry);
This->StreamContextAllocator.Free(Stream);
Server->StreamContextAllocator.Free(Stream);
}
}
}
Expand All @@ -438,23 +435,24 @@ PerfServer::TcpSendCompleteCallback(
TcpSendData* SendDataChain
)
{
auto This = (PerfServer*)Connection->Context;
auto This = (TcpConnectionContext*)Connection->Context;
auto Server = This->Server;
while (SendDataChain) {
auto Data = SendDataChain;
auto Entry = This->StreamTable.Lookup(Data->StreamId);
if (Entry) {
auto Stream = CXPLAT_CONTAINING_RECORD(Entry, StreamContext, Entry);
Stream->OutstandingBytes -= Data->Length;
This->SendTcpResponse(Stream, Connection);
Server->SendTcpResponse(Stream, Connection);
if ((Data->Fin || Data->Abort) && !Stream->SendShutdown) {
Stream->SendShutdown = true;
if (Stream->RecvShutdown) {
This->StreamTable.Remove(&Stream->Entry);
This->StreamContextAllocator.Free(Stream);
Server->StreamContextAllocator.Free(Stream);
}
}
}
SendDataChain = SendDataChain->Next;
delete Data;
Server->TcpSendDataAllocator.Free(Data);
}
}
13 changes: 11 additions & 2 deletions src/perf/lib/PerfServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ class PerfServer {

private:

struct TcpConnectionContext {
PerfServer* Server;
CxPlatHashTable StreamTable;
TcpConnectionContext(PerfServer* Server) : Server(Server) { }
};

CxPlatPoolT<TcpConnectionContext> TcpConnectionContextAllocator;

struct StreamContext {
StreamContext(
PerfServer* Server, bool Unidirectional, bool BufferedIo) :
Expand All @@ -76,6 +84,9 @@ class PerfServer {
QUIC_BUFFER LastBuffer;
};

CxPlatPoolT<StreamContext> StreamContextAllocator;
CxPlatPoolT<TcpSendData> TcpSendDataAllocator;

QUIC_STATUS
ListenerCallback(
_In_ MsQuicListener* Listener,
Expand Down Expand Up @@ -137,11 +148,9 @@ class PerfServer {
CXPLAT_EVENT* StopEvent {nullptr};
QUIC_BUFFER* DataBuffer {nullptr};
uint8_t PrintStats {FALSE};
CxPlatPoolT<StreamContext> StreamContextAllocator;

TcpEngine Engine;
TcpServer Server;
HashTable StreamTable;

uint32_t CibirIdLength {0};
uint8_t CibirId[7]; // {offset, values}
Expand Down
4 changes: 2 additions & 2 deletions src/perf/lib/Tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ TcpConnection::TcpConnection(
}
}
QuicAddrSetPort(&Route.RemoteAddress, ServerPort);
Engine->AddConnection(this, 0); // TODO - Correct index
Engine->AddConnection(this, (uint16_t)CxPlatProcCurrentNumber());
Initialized = true;
if (QUIC_FAILED(
CxPlatSocketCreateTcp(
Expand Down Expand Up @@ -389,7 +389,7 @@ TcpConnection::TcpConnection(
this);
Initialized = true;
IndicateAccept = true;
Engine->AddConnection(this, 0); // TODO - Correct index
Engine->AddConnection(this, (uint16_t)CxPlatProcCurrentNumber());
Queue();
}

Expand Down

0 comments on commit 80b4211

Please sign in to comment.