Skip to content

Commit

Permalink
Fix a regression which could lead to GenericFutureListeners never bee…
Browse files Browse the repository at this point in the history
…n notifed. Part of [netty#2186].

This regression was introduced by commit c97f2d2
  • Loading branch information
Norman Maurer committed Feb 20, 2014
1 parent 97662a6 commit ecc8fb1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@ private void notifyListeners() {
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
notifyListener0(this, l);
this.listeners = null;
}
} finally {
this.listeners = null;
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.junit.Test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -81,26 +83,9 @@ public void operationComplete(Future<Void> future) throws Exception {

@Test
public void testListenerNotifyOrder() throws Exception {
SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(null, Executors.defaultThreadFactory(), true) {
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}

if (confirmShutdown()) {
break;
}
}
}
};

EventExecutor executor = new TestEventExecutor();
final BlockingQueue<FutureListener<Void>> listeners = new LinkedBlockingQueue<FutureListener<Void>>();
int runs = 20000;
int runs = 100000;

for (int i = 0; i < runs; i++) {
final Promise<Void> promise = new DefaultPromise<Void>(executor);
Expand Down Expand Up @@ -148,4 +133,67 @@ public void run() {
}
executor.shutdownGracefully().sync();
}

@Test
public void testListenerNotifyLater() throws Exception {
// Testing first execution path in DefaultPromise
testListenerNotifyLater(1);

// Testing second execution path in DefaultPromise
testListenerNotifyLater(2);
}

private static void testListenerNotifyLater(final int numListenersBefore) throws Exception {
EventExecutor executor = new TestEventExecutor();
int expectedCount = numListenersBefore + 2;
final CountDownLatch latch = new CountDownLatch(expectedCount);
final FutureListener<Void> listener = new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
latch.countDown();
}
};
final Promise<Void> promise = new DefaultPromise<Void>(executor);
executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < numListenersBefore; i++) {
promise.addListener(listener);
}
promise.setSuccess(null);

GlobalEventExecutor.INSTANCE.execute(new Runnable() {
@Override
public void run() {
promise.addListener(listener);
}
});
promise.addListener(listener);
}
});

assertTrue("Should have notifed " + expectedCount + " listeners", latch.await(5, TimeUnit.SECONDS));
executor.shutdownGracefully().sync();
}

private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, Executors.defaultThreadFactory(), true);
}

@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}

if (confirmShutdown()) {
break;
}
}
}
}
}

0 comments on commit ecc8fb1

Please sign in to comment.