Skip to content

Commit

Permalink
Pass in initial events to EventableSocketChannel constructor and upda…
Browse files Browse the repository at this point in the history
…te events when state changes
  • Loading branch information
tmm1 committed Aug 4, 2009
1 parent b080730 commit f5d3f07
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
12 changes: 8 additions & 4 deletions java/src/com/rubyeventmachine/EmReactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void isAcceptable (SelectionKey k) throws IOException {
while ((sn = ss.accept()) != null) {
sn.configureBlocking(false);
long b = createBinding();
EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector);
EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector, SelectionKey.OP_READ);
Connections.put(b, ec);
eventCallback (((Long)k.attachment()).longValue(), EM_CONNECTION_ACCEPTED, null, b);
}
Expand Down Expand Up @@ -330,7 +330,7 @@ public long connectTcpServer (String bindAddr, int bindPort, String address, int
if (bindAddr != null)
sc.socket().bind(new InetSocketAddress (bindAddr, bindPort));

EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);
EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector, 0);

if (sc.connect (new InetSocketAddress (address, port))) {
// Connection returned immediately. Can happen with localhost connections.
Expand Down Expand Up @@ -389,10 +389,14 @@ public Object[] getPeerName (long sig) {

public long attachChannel (SocketChannel sc, boolean watch_mode) throws ClosedChannelException {
long b = createBinding();
EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);
EventableSocketChannel ec;

if (watch_mode)
if (watch_mode) {
ec = new EventableSocketChannel (sc, b, mySelector, 0);
ec.setWatchOnly();
} else {
ec = new EventableSocketChannel (sc, b, mySelector, SelectionKey.OP_READ);
}

Connections.put (b, ec);
return b;
Expand Down
65 changes: 29 additions & 36 deletions java/src/com/rubyeventmachine/EventableSocketChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import java.security.*;

public class EventableSocketChannel implements EventableChannel {
// TODO, must refactor this to permit channels that aren't sockets.
Selector selector;
SelectionKey channelKey;
SocketChannel channel;

long binding;
Selector selector;
LinkedList<ByteBuffer> outboundQ;

boolean bCloseScheduled;
boolean bConnectPending;
boolean bWatchOnly;
Expand All @@ -61,7 +63,7 @@ public class EventableSocketChannel implements EventableChannel {
SSLEngine sslEngine;
SSLContext sslContext;

public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel) throws ClosedChannelException {
public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel, int ops) throws ClosedChannelException {
channel = sc;
binding = _binding;
selector = sel;
Expand All @@ -72,7 +74,7 @@ public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel) th
bNotifyWritable = false;
outboundQ = new LinkedList<ByteBuffer>();

updateEvents();
channelKey = sc.register(selector, ops, this);
}

public long getBinding() {
Expand All @@ -98,23 +100,22 @@ public void close() {
}

public void scheduleOutboundData (ByteBuffer bb) {
try {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
if (sslEngine != null) {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
if (sslEngine != null) {
try {
ByteBuffer b = ByteBuffer.allocate(32*1024); // TODO, preallocate this buffer.
sslEngine.wrap(bb, b);
b.flip();
outboundQ.addLast(b);
} catch (SSLException e) {
throw new RuntimeException ("ssl error");
}
else {
outboundQ.addLast(bb);
}
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ | (bConnectPending ? SelectionKey.OP_CONNECT : 0), this);
}
} catch (ClosedChannelException e) {
throw new RuntimeException ("no outbound data");
} catch (SSLException e) {
throw new RuntimeException ("no outbound data");
else {
outboundQ.addLast(bb);
}

updateEvents();
}
}

Expand Down Expand Up @@ -166,10 +167,7 @@ public boolean writeOutboundData(){
}

if (outboundQ.isEmpty()) {
try {
channel.register(selector, SelectionKey.OP_READ, this);
} catch (ClosedChannelException e) {
}
updateEvents();
}

// ALWAYS drain the outbound queue before triggering a connection close.
Expand All @@ -179,8 +177,8 @@ public boolean writeOutboundData(){
}

public void setConnectPending() throws ClosedChannelException {
channel.register(selector, SelectionKey.OP_CONNECT, this);
bConnectPending = true;
updateEvents();
}

/**
Expand All @@ -191,24 +189,20 @@ public void setConnectPending() throws ClosedChannelException {
public boolean finishConnecting() throws ClosedChannelException {
try {
channel.finishConnect();
}
catch (IOException e) {
} catch (IOException e) {
return false;
}
bConnectPending = false;
channel.register(selector, SelectionKey.OP_READ | (outboundQ.isEmpty() ? 0 : SelectionKey.OP_WRITE), this);
updateEvents();
return true;
}

public void scheduleClose (boolean afterWriting) {
// TODO: What the hell happens here if bConnectPending is set?
if (!afterWriting)
outboundQ.clear();
try {
channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this);
} catch (ClosedChannelException e) {
throw new RuntimeException ("unable to schedule close"); // TODO, get rid of this.
}

updateEvents();
bCloseScheduled = true;
}
public void startTls() {
Expand Down Expand Up @@ -278,13 +272,16 @@ public void setNotifyWritable (boolean mode) {
private void updateEvents() {
int events = 0;

if (bWatchOnly) {
if (bWatchOnly)
{
if (bNotifyReadable)
events |= SelectionKey.OP_READ;

if (bNotifyWritable)
events |= SelectionKey.OP_WRITE;
} else {
}
else
{
events |= SelectionKey.OP_READ;

if (bConnectPending)
Expand All @@ -294,11 +291,7 @@ private void updateEvents() {
events |= SelectionKey.OP_WRITE;
}

try {
channel.register(selector, events, this);
} catch (ClosedChannelException e) {
// what should this do? scheduleClose/detach?
System.err.println("ERROR: ClosedChannelException");
}
if (channelKey.interestOps() != events)
channelKey.interestOps(events);
}
}

0 comments on commit f5d3f07

Please sign in to comment.