Skip to content

Commit

Permalink
add EM::Connection#proxied_bytes for byte transfer stats with #proxy_…
Browse files Browse the repository at this point in the history
…incoming_to
  • Loading branch information
tmm1 committed Mar 5, 2012
1 parent 6ae0e1c commit df92251
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 1 deletion.
14 changes: 14 additions & 0 deletions ext/cmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,20 @@ extern "C" void evma_stop_proxy (const unsigned long from)
ed->StopProxy();
}

/******************
evma_proxied_bytes
*******************/

extern "C" unsigned long evma_proxied_bytes (const unsigned long from)
{
ensure_eventmachine("evma_proxied_bytes");
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
if (ed)
return ed->GetProxiedBytes();
else
return 0;
}


/***************************
evma_get_heartbeat_interval
Expand Down
4 changes: 4 additions & 0 deletions ext/ed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
UnbindReasonCode (0),
ProxyTarget(NULL),
ProxiedFrom(NULL),
ProxiedBytes(0),
MaxOutboundBufSize(0),
MyEventMachine (em),
PendingConnectTimeout(20000000),
Expand Down Expand Up @@ -247,6 +248,7 @@ void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long
StopProxy();
ProxyTarget = ed;
BytesToProxy = length;
ProxiedBytes = 0;
ed->SetProxiedFrom(this, bufsize);
return;
}
Expand Down Expand Up @@ -293,6 +295,7 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
if (BytesToProxy > 0) {
unsigned long proxied = min(BytesToProxy, (unsigned long) size);
ProxyTarget->SendOutboundData(buf, proxied);
ProxiedBytes += (unsigned long) proxied;
BytesToProxy -= proxied;
if (BytesToProxy == 0) {
StopProxy();
Expand All @@ -303,6 +306,7 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
}
} else {
ProxyTarget->SendOutboundData(buf, size);
ProxiedBytes += (unsigned long) size;
}
} else {
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size);
Expand Down
2 changes: 2 additions & 0 deletions ext/ed.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class EventableDescriptor: public Bindable_t

virtual void StartProxy(const unsigned long, const unsigned long, const unsigned long);
virtual void StopProxy();
virtual unsigned long GetProxiedBytes(){ return ProxiedBytes; };
virtual void SetProxiedFrom(EventableDescriptor*, const unsigned long);
virtual int SendOutboundData(const char*,int){ return -1; }
virtual bool IsPaused(){ return bPaused; }
Expand Down Expand Up @@ -116,6 +117,7 @@ class EventableDescriptor: public Bindable_t
unsigned long BytesToProxy;
EventableDescriptor *ProxyTarget;
EventableDescriptor *ProxiedFrom;
unsigned long ProxiedBytes;

unsigned long MaxOutboundBufSize;

Expand Down
1 change: 1 addition & 0 deletions ext/eventmachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ extern "C" {

void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long, const unsigned long);
void evma_stop_proxy(const unsigned long);
unsigned long evma_proxied_bytes(const unsigned long);

int evma_set_rlimit_nofile (int n_files);

Expand Down
15 changes: 15 additions & 0 deletions ext/rubymain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,20 @@ static VALUE t_stop_proxy (VALUE self, VALUE from)
return Qnil;
}

/***************
t_proxied_bytes
****************/

static VALUE t_proxied_bytes (VALUE self, VALUE from)
{
try{
return ULONG2NUM(evma_proxied_bytes(NUM2ULONG (from)));
} catch (std::runtime_error e) {
rb_raise (EM_eConnectionError, e.what());
}
return Qnil;
}


/************************
t_get_heartbeat_interval
Expand Down Expand Up @@ -1193,6 +1207,7 @@ extern "C" void Init_rubyeventmachine()

rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 4);
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1);
rb_define_module_function (EmModule, "get_proxied_bytes", (VALUE (*)(...))t_proxied_bytes, 1);

rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1);
rb_define_module_function (EmModule, "unwatch_filename", (VALUE (*)(...))t_unwatch_filename, 1);
Expand Down
6 changes: 6 additions & 0 deletions lib/em/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ def stop_proxying
EventMachine::disable_proxy(self)
end

# The number of bytes proxied to another connection. Reset to zero when
# EventMachine::Connection#proxy_incoming_to is called, and incremented whenever data is proxied.
def proxied_bytes
EventMachine::get_proxied_bytes(self.signature)
end

# EventMachine::Connection#close_connection is called only by user code, and never
# by the event loop. You may call this method against a connection object in any
# callback handler, whether or not the callback was made against the connection
Expand Down
14 changes: 13 additions & 1 deletion tests/test_proxy_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def proxy_target_unbound
end

def unbind
$proxied_bytes = proxied_bytes
@client.close_connection_after_writing
end
end
Expand Down Expand Up @@ -94,7 +95,7 @@ def initialize port
end

def receive_data(data)
EM.connect("127.0.0.1", @port, ProxyConnection, self, data)
@proxy = EM.connect("127.0.0.1", @port, ProxyConnection, self, data)
end
end

Expand Down Expand Up @@ -134,6 +135,17 @@ def test_proxy_connection
assert_equal("I know!", $client_data)
end

def test_proxied_bytes
EM.run {
EM.start_server("127.0.0.1", @port, Server)
EM.start_server("127.0.0.1", @proxy_port, ProxyServer, @port)
EM.connect("127.0.0.1", @proxy_port, Client)
}

assert_equal("I know!", $client_data)
assert_equal("I know!".bytesize, $proxied_bytes)
end

def test_partial_proxy_connection
EM.run {
EM.start_server("127.0.0.1", @port, Server)
Expand Down

0 comments on commit df92251

Please sign in to comment.