Skip to content

Commit

Permalink
Added a mechanism to listen for put completions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Randgalt committed Jan 30, 2012
1 parent 0f0723b commit 8670ecd
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.listen.Listenable;
import com.netflix.curator.framework.listen.ListenerContainer;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -110,6 +112,27 @@ public void putMulti(MultiItem<T> items, int priority) throws Exception
queue.internalPut(null, items, queue.makeItemPath() + priorityHex);
}

/**
* Return the manager for put listeners
*
* @return put listener container
*/
public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
{
return queue.getPutListenerContainer();
}

/**
* Return the most recent message count from the queue. This is useful for debugging/information
* purposes only.
*
* @return count (can be 0)
*/
public int getLastMessageCount()
{
return queue.getLastMessageCount();
}

/**
* The default method of converting a priority into a sortable string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/
package com.netflix.curator.framework.recipes.queue;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.BackgroundCallback;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.api.CuratorEventType;
import com.netflix.curator.framework.api.CuratorListener;
import com.netflix.curator.framework.listen.Listenable;
import com.netflix.curator.framework.listen.ListenerContainer;
import com.netflix.curator.framework.recipes.leader.LeaderSelector;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -73,6 +77,8 @@ public class DistributedQueue<T> implements Closeable
private final AtomicBoolean refreshOnWatchSignaled = new AtomicBoolean(false);
private final String lockPath;
private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE);
private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<QueuePutListener<T>>();
private final AtomicInteger lastChildCount = new AtomicInteger(0);

private final AtomicInteger putCount = new AtomicInteger(0);
private final CuratorListener listener = new CuratorListener()
Expand Down Expand Up @@ -189,6 +195,29 @@ public Object call()
}
}

@Override
public void close() throws IOException
{
if ( !state.compareAndSet(State.STARTED, State.STOPPED) )
{
throw new IllegalStateException();
}

putListenerContainer.clear();
client.getCuratorListenable().removeListener(listener);
service.shutdownNow();
}

/**
* Return the manager for put listeners
*
* @return put listener container
*/
public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
{
return putListenerContainer;
}

/**
* Used when the queue is created with a {@link QueueBuilder#lockPath(String)}. Determines
* the behavior when the queue consumer throws an exception
Expand Down Expand Up @@ -233,18 +262,6 @@ public boolean flushPuts(long waitTime, TimeUnit timeUnit) throws InterruptedExc
return true;
}

@Override
public void close() throws IOException
{
if ( !state.compareAndSet(State.STARTED, State.STOPPED) )
{
throw new IllegalStateException();
}

client.getCuratorListenable().removeListener(listener);
service.shutdownNow();
}

/**
* Add an item into the queue. Adding is done in the background - thus, this method will
* return quickly.
Expand Down Expand Up @@ -275,8 +292,20 @@ public void putMulti(MultiItem<T> items) throws Exception
internalPut(null, items, path);
}

void internalPut(T item, MultiItem<T> multiItem, String path) throws Exception
/**
* Return the most recent message count from the queue. This is useful for debugging/information
* purposes only.
*
* @return count (can be 0)
*/
public int getLastMessageCount()
{
return lastChildCount.get();
}

void internalPut(final T item, MultiItem<T> multiItem, String path) throws Exception
{
final MultiItem<T> givenMultiItem = multiItem;
if ( item != null )
{
final AtomicReference<T> ref = new AtomicReference<T>(item);
Expand All @@ -291,8 +320,34 @@ public T nextItem() throws Exception
}

putCount.incrementAndGet();
byte[] bytes = ItemSerializer.serialize(multiItem, serializer);
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).inBackground().forPath(path, bytes);
byte[] bytes = ItemSerializer.serialize(multiItem, serializer);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
putListenerContainer.forEach
(
new Function<QueuePutListener<T>, Void>()
{
@Override
public Void apply(QueuePutListener<T> listener)
{
if ( item != null )
{
listener.putCompleted(item);
}
else
{
listener.putMultiCompleted(givenMultiItem);
}
return null;
}
}
);
}
};
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).inBackground(callback).forPath(path, bytes);
}

void checkState() throws Exception
Expand Down Expand Up @@ -329,6 +384,7 @@ private void runLoop()
do
{
children = client.getChildren().watched().forPath(queuePath);
lastChildCount.set(children.size());
if ( children.size() == 0 )
{
wait();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
*
* Copyright 2011 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.curator.framework.recipes.queue;

/**
* Queue puts are done in the background. Use this listener to
* be notified when the put completes
*/
public interface QueuePutListener<T>
{
/**
* Notification that a single item put has completed
*
* @param item the item
*/
public void putCompleted(T item);

/**
* Notification that a multi item put has completed
*
* @param items the items
*/
public void putMultiCompleted(MultiItem<T> items);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,64 @@ public class TestDistributedQueue extends BaseClassForTests

private static final QueueSerializer<TestQueueItem> serializer = new QueueItemSerializer();

@Test
public void testPutListener() throws Exception
{
final int itemQty = 10;

DistributedQueue<TestQueueItem> queue = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
BlockingQueueConsumer<TestQueueItem> consumer = new BlockingQueueConsumer<TestQueueItem>(Mockito.mock(ConnectionStateListener.class));

queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).buildQueue();
queue.start();

QueueTestProducer producer = new QueueTestProducer(queue, itemQty, 0);

final AtomicInteger listenerCalls = new AtomicInteger(0);
QueuePutListener<TestQueueItem> listener = new QueuePutListener<TestQueueItem>()
{
@Override
public void putCompleted(TestQueueItem item)
{
listenerCalls.incrementAndGet();
}

@Override
public void putMultiCompleted(MultiItem<TestQueueItem> items)
{
}
};
queue.getPutListenerContainer().addListener(listener);

ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer);

int iteration = 0;
while ( consumer.size() < itemQty )
{
Assert.assertTrue(++iteration < 10);
Thread.sleep(1000);
}

int i = 0;
for ( TestQueueItem item : consumer.getItems() )
{
Assert.assertEquals(item.str, Integer.toString(i++));
}

Assert.assertEquals(listenerCalls.get(), itemQty);
}
finally
{
Closeables.closeQuietly(queue);
Closeables.closeQuietly(client);
}
}

@Test
public void testErrorMode() throws Exception
{
Expand Down Expand Up @@ -552,5 +610,4 @@ public void testSimple() throws Exception
Closeables.closeQuietly(client);
}
}

}

0 comments on commit 8670ecd

Please sign in to comment.