Skip to content

Commit

Permalink
added SocketRpcChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
Niall Ryan authored and Niall Ryan committed Jun 4, 2010
1 parent 25b877a commit d359a04
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cpp/ProtoBufRemote/ProtoBufRemote.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
<ClInclude Include="RpcMessage.pb.h" />
<ClInclude Include="RpcServer.h" />
<ClInclude Include="RpcService.h" />
<ClInclude Include="SocketRpcChannel.h" />
</ItemGroup>
<ItemGroup>
<CustomBuild Include="RpcMessage.proto">
Expand All @@ -111,6 +112,7 @@
<ClCompile Include="RpcMessage.pb.cc" />
<ClCompile Include="RpcServer.cpp" />
<ClCompile Include="RpcService.cpp" />
<ClCompile Include="SocketRpcChannel.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
Expand Down
6 changes: 6 additions & 0 deletions Cpp/ProtoBufRemote/ProtoBufRemote.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
<ClInclude Include="Generators.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="SocketRpcChannel.h">
<Filter>Source Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="RpcMessage.proto">
Expand Down Expand Up @@ -89,5 +92,8 @@
<ClCompile Include="MutableParameterList.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="SocketRpcChannel.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
12 changes: 4 additions & 8 deletions Cpp/ProtoBufRemote/RpcChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@

namespace ProtoBufRemote {

RpcChannel::RpcChannel()
: m_controller(NULL)
RpcChannel::RpcChannel(RpcController* controller)
: m_controller(controller)
{
}

RpcChannel::RpcChannel(RpcController& controller)
: m_controller(&controller)
{
m_controller->SetChannel(this);
if (m_controller)
m_controller->SetChannel(this);
}

}
5 changes: 2 additions & 3 deletions Cpp/ProtoBufRemote/RpcChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ class RpcMessage;
class RpcChannel
{
public:
RpcChannel();

RpcChannel(RpcController& controller);
RpcChannel(RpcController* controller);
virtual ~RpcChannel() { }

virtual void Send(const RpcMessage& message) = 0;

Expand Down
2 changes: 1 addition & 1 deletion Cpp/ProtoBufRemote/RpcController.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RpcController

virtual void Send(const RpcMessage& message);

void Receive(const RpcMessage& message);
virtual void Receive(const RpcMessage& message);

void SetChannel(RpcChannel* channel) { m_channel = channel; }
void SetClient(RpcClient* client) { m_client = client; }
Expand Down
167 changes: 167 additions & 0 deletions Cpp/ProtoBufRemote/SocketRpcChannel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@

#include "ProtoBufRemote/SocketRpcChannel.h"

#include "ProtoBufRemote/RpcController.h"
#include "ProtoBufRemote/RpcMessage.pb.h"


namespace ProtoBufRemote {

SocketRpcChannel::SocketRpcChannel(RpcController* controller, SOCKET socket)
: RpcChannel(controller), m_socket(socket)
{
m_sendEvent = WSACreateEvent();
m_terminateEvent = WSACreateEvent();
}

SocketRpcChannel::~SocketRpcChannel()
{
CloseAndJoin();
WSACloseEvent(m_sendEvent);
WSACloseEvent(m_terminateEvent);
}

void SocketRpcChannel::Start()
{
m_thread = boost::thread(&SocketRpcChannel::Run, this);
}

void SocketRpcChannel::CloseAndJoin()
{
WSASetEvent(m_terminateEvent);
m_thread.join();
}

void SocketRpcChannel::Send(const RpcMessage& message)
{
QueuedMessageData data;
unsigned int messageSize = message.ByteSize();
data.m_size = messageSize + sizeof(int);
data.m_data = new char[data.m_size];
*reinterpret_cast<unsigned int*>(data.m_data) = messageSize;
message.SerializeToArray(data.m_data+sizeof(int), messageSize);

boost::lock_guard<boost::mutex> lock(m_sendMutex);
m_sendMessages.push(data);
WSASetEvent(m_sendEvent);
}

void SocketRpcChannel::Run()
{
WSAEVENT selectEvent = WSACreateEvent();
WSAEventSelect(m_socket, selectEvent, FD_READ|FD_WRITE);

bool isSending = false;
unsigned int sendPos = 0;
QueuedMessageData currentSendMessage;

bool isReceiving = false;
unsigned int receivePos = 0;
unsigned int receiveSize = sizeof(int);
std::vector<char> receiveBuffer;
receiveBuffer.resize(1024);
const unsigned int maxAllowedMessageSize = 1024*1024;

WSAEVENT events[3] = { m_terminateEvent, selectEvent, m_sendEvent };
for ( ;; )
{
int waitResult = WSAWaitForMultipleEvents(isSending ? 2 : 3, events, FALSE, WSA_INFINITE, FALSE);
if (waitResult == 0)
break;

bool isSendReady = false;
bool isReceiveReady = false;
if (waitResult == 2)
{
assert(!isSending);
boost::lock_guard<boost::mutex> lock(m_sendMutex);
if (!m_sendMessages.empty())
{
currentSendMessage = m_sendMessages.front();
m_sendMessages.pop();
isSending = true;
isSendReady = true; //send immediately
sendPos = 0;
}
}
else
{
WSANETWORKEVENTS networkEvents;
int result = WSAEnumNetworkEvents(m_socket, selectEvent, &networkEvents);
isSendReady = (networkEvents.lNetworkEvents & FD_WRITE) ? true : false;
isReceiveReady = (networkEvents.lNetworkEvents & FD_READ) ? true : false;
}

if (isSending && isSendReady)
{
int bytesSent = send(m_socket, &currentSendMessage.m_data[sendPos], currentSendMessage.m_size-sendPos, 0);
if (bytesSent != SOCKET_ERROR)
{
sendPos += bytesSent;

if (sendPos >= currentSendMessage.m_size)
{
delete[] currentSendMessage.m_data;

boost::lock_guard<boost::mutex> lock(m_sendMutex);
if (m_sendMessages.empty())
{
WSAResetEvent(m_sendEvent);
isSending = false;
}
else
{
currentSendMessage = m_sendMessages.front();
m_sendMessages.pop();
sendPos = 0;
}
}
}
}

if (isReceiveReady)
{
int bytesReceived = recv(m_socket, &receiveBuffer[receivePos], receiveSize-receivePos, 0);
if (bytesReceived == 0)
break;
if (bytesReceived != SOCKET_ERROR)
{
receivePos += bytesReceived;

if (receivePos >= receiveSize)
{
if (isReceiving)
{
RpcMessage message;
message.ParseFromArray(&receiveBuffer[0], receiveSize);
m_controller->Receive(message);

isReceiving = false;
receiveSize = sizeof(int);
receivePos = 0;
}
else
{
isReceiving = true;
receiveSize = *reinterpret_cast<unsigned int*>(&receiveBuffer[0]);
receivePos = 0;

if (receiveSize > maxAllowedMessageSize)
{
assert(false);
break; //better to abort rather than allocate based on bad data
}
if (receiveBuffer.size() < receiveSize)
receiveBuffer.resize(receiveSize);
}
}
}
}
}

closesocket(m_socket);
m_socket = INVALID_SOCKET;
WSACloseEvent(selectEvent);
}

}
43 changes: 43 additions & 0 deletions Cpp/ProtoBufRemote/SocketRpcChannel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PROTOBUFREMOTE_SOCKETRPCCHANNEL_H
#define PROTOBUFREMOTE_SOCKETRPCCHANNEL_H 1

#include <winsock2.h>
#include <queue>
#include <boost/thread.hpp>
#include "ProtoBufRemote/RpcChannel.h"

namespace ProtoBufRemote {

class SocketRpcChannel : RpcChannel
{
public:
SocketRpcChannel(RpcController* controller, SOCKET socket);
virtual ~SocketRpcChannel();

void Start();

void CloseAndJoin();

virtual void Send(const RpcMessage& message);

private:
void Run();

SOCKET m_socket;
boost::thread m_thread;
WSAEVENT m_sendEvent;
WSAEVENT m_terminateEvent;

boost::mutex m_sendMutex;

struct QueuedMessageData
{
unsigned int m_size;
char* m_data;
};
std::queue<QueuedMessageData> m_sendMessages;
};

}

#endif
2 changes: 2 additions & 0 deletions Cpp/ProtoBufRemoteTest/MockRpcChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
class MockRpcChannel : public ProtoBufRemote::RpcChannel
{
public:
MockRpcChannel() : ProtoBufRemote::RpcChannel(NULL) { }

MOCK_METHOD1(Send, void(const ProtoBufRemote::RpcMessage&));
};

Expand Down
9 changes: 5 additions & 4 deletions Cpp/ProtoBufRemoteTest/ProtoBufRemoteTest.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<IntDir>obj\$(Configuration)\</IntDir>
<LibraryPath>$(GmockDebugLibPath);$(ProtoBufDebugLibPath);C:\Program Files\Microsoft DirectX SDK %28February 2010%29\Lib\x86;$(LibraryPath)</LibraryPath>
<LibraryPath>$(GmockDebugLibPath);$(ProtoBufDebugLibPath);$(BoostLibPath);$(LibraryPath)</LibraryPath>
<IncludePath>$(GtestIncludePath);$(GmockIncludePath);$(ProtoBufIncludePath);$(BoostIncludePath);C:\Program Files\Microsoft DirectX SDK %28February 2010%29\Include;$(IncludePath)</IncludePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<OutDir>$(ProjectDir)obj\$(Configuration)\</OutDir>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<IntDir>obj\$(Configuration)\</IntDir>
<LibraryPath>$(GmockReleaseLibPath);$(ProtoBufReleaseLibPath);C:\Program Files\Microsoft DirectX SDK %28February 2010%29\Lib\x86;$(LibraryPath)</LibraryPath>
<LibraryPath>$(GmockReleaseLibPath);$(ProtoBufReleaseLibPath);$(BoostLibPath);$(LibraryPath)</LibraryPath>
<IncludePath>$(GtestIncludePath);$(GmockIncludePath);$(ProtoBufIncludePath);$(BoostIncludePath);C:\Program Files\Microsoft DirectX SDK %28February 2010%29\Include;$(IncludePath)</IncludePath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
Expand All @@ -61,7 +61,7 @@
</ClCompile>
<Link>
<GenerateDebugInformation>true</GenerateDebugInformation>
<AdditionalDependencies>gmock.lib;libprotobuf.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies>gmock.lib;libprotobuf.lib;ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<SubSystem>Console</SubSystem>
</Link>
</ItemDefinitionGroup>
Expand All @@ -78,7 +78,7 @@
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<AdditionalDependencies>gmock.lib;libprotobuf.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies>gmock.lib;libprotobuf.lib;ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<SubSystem>Console</SubSystem>
</Link>
</ItemDefinitionGroup>
Expand All @@ -89,6 +89,7 @@
<ClCompile Include="RpcClientTest.cpp" />
<ClCompile Include="RpcControllerTest.cpp" />
<ClCompile Include="RpcServerTest.cpp" />
<ClCompile Include="SocketRpcChannelTest.cpp" />
<ClCompile Include="StubGeneratorTest.cpp" />
</ItemGroup>
<ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions Cpp/ProtoBufRemoteTest/ProtoBufRemoteTest.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
<ClCompile Include="StubGeneratorTest.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="SocketRpcChannelTest.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="MockRpcController.h">
Expand Down
Loading

0 comments on commit d359a04

Please sign in to comment.