Skip to content

Commit

Permalink
Carefully detach and clean up attached channels
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm1 committed Aug 7, 2009
1 parent 9059fe6 commit e8df7f2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 23 deletions.
32 changes: 18 additions & 14 deletions java/src/com/rubyeventmachine/EmReactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class EmReactor {
private HashMap<Long, ServerSocketChannel> Acceptors;
private ArrayList<Long> NewConnections;
private ArrayList<Long> UnboundConnections;
private ArrayList<EventableSocketChannel> DetachedConnections;

private boolean bRunReactor;
private long BindingIndex;
Expand All @@ -68,6 +69,7 @@ public EmReactor() {
Acceptors = new HashMap<Long, ServerSocketChannel>();
NewConnections = new ArrayList<Long>();
UnboundConnections = new ArrayList<Long>();
DetachedConnections = new ArrayList<EventableSocketChannel>();

BindingIndex = 0;
loopBreaker = new AtomicBoolean();
Expand Down Expand Up @@ -112,6 +114,13 @@ public void run() {
}

void addNewConnections() {
ListIterator<EventableSocketChannel> iter = DetachedConnections.listIterator(0);
while (iter.hasNext()) {
EventableSocketChannel ec = iter.next();
ec.cleanup();
}
DetachedConnections.clear();

ListIterator<Long> iter2 = NewConnections.listIterator(0);
while (iter2.hasNext()) {
long b = iter2.next();
Expand Down Expand Up @@ -485,15 +494,12 @@ public Object[] getPeerName (long sig) {

public long attachChannel (SocketChannel sc, boolean watch_mode) {
long b = createBinding();
EventableSocketChannel ec;

if (watch_mode) {
ec = new EventableSocketChannel (sc, b, mySelector, 0);
ec.setWatchOnly();
} else {
ec = new EventableSocketChannel (sc, b, mySelector, SelectionKey.OP_READ);
}
EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);

ec.setAttached();
if (watch_mode)
ec.setWatchOnly();

Connections.put (b, ec);
NewConnections.add (b);
Expand All @@ -503,14 +509,12 @@ public long attachChannel (SocketChannel sc, boolean watch_mode) {

public SocketChannel detachChannel (long sig) {
EventableSocketChannel ec = (EventableSocketChannel) Connections.get (sig);
UnboundConnections.add (sig);

SocketChannel sc = ec.getChannel();
try {
sc.register(mySelector, 0, null);
} catch (ClosedChannelException e) {
if (ec != null) {
UnboundConnections.add (sig);
return ec.getChannel();
} else {
return null;
}
return sc;
}

public void setNotifyReadable (long sig, boolean mode) {
Expand Down
47 changes: 46 additions & 1 deletion java/src/com/rubyeventmachine/EventableSocketChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.Socket;
import javax.net.ssl.*;
import javax.net.ssl.SSLEngineResult.*;
import java.lang.reflect.Field;

import java.security.*;

Expand Down Expand Up @@ -102,14 +103,58 @@ public void close() {
channelKey = null;
}

if (bWatchOnly)
if (bAttached) {
// attached channels are copies, so reset the file descriptor to prevent java from close()ing it
Field f;
FileDescriptor fd;

try {
/* do _NOT_ clobber fdVal here, it will break epoll/kqueue on jdk6!
* channelKey.cancel() above does not occur until the next call to select
* and if fdVal is gone, we will continue to get events for this fd.
*
* instead, remove fdVal in cleanup(), which is processed via DetachedConnections,
* after UnboundConnections but before NewConnections.
*/

f = channel.getClass().getDeclaredField("fd");
f.setAccessible(true);
fd = (FileDescriptor) f.get(channel);

f = fd.getClass().getDeclaredField("fd");
f.setAccessible(true);
f.set(fd, -1);
} catch (java.lang.NoSuchFieldException e) {
e.printStackTrace();
} catch (java.lang.IllegalAccessException e) {
e.printStackTrace();
}

return;
}

try {
channel.close();
} catch (IOException e) {
}
}

public void cleanup() {
if (bAttached) {
Field f;
try {
f = channel.getClass().getDeclaredField("fdVal");
f.setAccessible(true);
f.set(channel, -1);
} catch (java.lang.NoSuchFieldException e) {
e.printStackTrace();
} catch (java.lang.IllegalAccessException e) {
e.printStackTrace();
}
}

channel = null;
}

public void scheduleOutboundData (ByteBuffer bb) {
if (!bCloseScheduled && bb.remaining() > 0) {
Expand Down
11 changes: 3 additions & 8 deletions lib/jeventmachine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,9 @@ def self.attach_fd fileno, watch_mode
@em.attachChannel(ch,watch_mode)
end
def self.detach_fd sig
ch = @em.detachChannel(sig)
fileno = ch.get_field 'fdVal'

fd = ch.get_field 'fd'
fd.set_field 'fd', -1
ch.set_field 'fdVal', -1

fileno
if ch = @em.detachChannel(sig)
ch.get_field 'fdVal'
end
end

def self.set_notify_readable sig, mode
Expand Down

0 comments on commit e8df7f2

Please sign in to comment.