Skip to content

Commit

Permalink
tracking bytes sent and bytes received
Browse files Browse the repository at this point in the history
  • Loading branch information
Niall Ryan authored and Niall Ryan committed Jul 5, 2010
1 parent 8934de6 commit a4a3402
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
12 changes: 12 additions & 0 deletions Cpp/ProtoBufRemote/SocketRpcChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ void SocketRpcChannel::CloseAndJoin()
m_thread.join();
}

unsigned int SocketRpcChannel::GetAndClearBytesRead()
{
return _InterlockedExchange(reinterpret_cast<LONG*>(&m_bytesRead), 0);
}

unsigned int SocketRpcChannel::GetAndClearBytesWritten()
{
return _InterlockedExchange(reinterpret_cast<LONG*>(&m_bytesWritten), 0);
}

void SocketRpcChannel::Send(const RpcMessage& message)
{
QueuedMessageData data;
Expand Down Expand Up @@ -107,6 +117,7 @@ void SocketRpcChannel::Run()
else
{
sendPos += bytesSent;
_InterlockedExchangeAdd(reinterpret_cast<LONG*>(&m_bytesWritten), bytesSent);

if (sendPos >= currentSendMessage.m_size)
{
Expand Down Expand Up @@ -146,6 +157,7 @@ void SocketRpcChannel::Run()
else
{
receivePos += bytesReceived;
_InterlockedExchangeAdd(reinterpret_cast<LONG*>(&m_bytesRead), bytesReceived);

if (receivePos >= receiveSize)
{
Expand Down
7 changes: 7 additions & 0 deletions Cpp/ProtoBufRemote/SocketRpcChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class SocketRpcChannel : RpcChannel

virtual void Send(const RpcMessage& message);

unsigned int GetAndClearBytesRead();

unsigned int GetAndClearBytesWritten();

private:
void Run();

Expand All @@ -36,6 +40,9 @@ class SocketRpcChannel : RpcChannel
char* m_data;
};
std::queue<QueuedMessageData> m_sendMessages;

unsigned int m_bytesRead;
unsigned int m_bytesWritten;
};

}
Expand Down
14 changes: 12 additions & 2 deletions DotNet/ProtoBufRemote/StreamRpcChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public class StreamRpcChannel : RpcChannel
private readonly Thread writeThread;
private readonly Stream readStream;
private readonly Stream writeStream;

private long totalBytesRead;
private long totalBytesWritten;

public StreamRpcChannel(RpcController controller, Stream readStream, Stream writeStream)
: base(controller)
Expand All @@ -31,6 +32,10 @@ public StreamRpcChannel(RpcController controller, Stream readStream, Stream writ
writeThread = new Thread(WriteRun);
}

public long TotalBytesRead { get { return Interlocked.Read(ref totalBytesRead); } }

public long TotalBytesWritten { get { return Interlocked.Read(ref totalBytesWritten); } }

public override void Start()
{
readThread.Start();
Expand Down Expand Up @@ -76,6 +81,7 @@ private void ReadRun()
try
{
bytesRead = readStream.Read(buffer, bufferPos, bytesRemaining);
Interlocked.Add(ref totalBytesRead, bytesRead);
if (bytesRead == 0)
break;
}
Expand Down Expand Up @@ -120,6 +126,7 @@ private void WriteRun()
{
var waitHandles = new WaitHandle[] { queueEvent, closeEvent };
bool isTerminated = false;
var memStream = new MemoryStream();
while (!isTerminated)
{
int waitIndex = WaitHandle.WaitAny(waitHandles);
Expand All @@ -130,7 +137,10 @@ private void WriteRun()
RpcMessage message = queuedMessages.Dequeue();
try
{
Serializer.SerializeWithLengthPrefix(writeStream, message, PrefixStyle.Fixed32);
memStream.Position = 0;
Serializer.SerializeWithLengthPrefix(memStream, message, PrefixStyle.Fixed32);
writeStream.Write(memStream.GetBuffer(), 0, (int)memStream.Position);
Interlocked.Add(ref totalBytesWritten, memStream.Position);
}
catch (InvalidOperationException)
{
Expand Down

0 comments on commit a4a3402

Please sign in to comment.