Skip to content

Commit

Permalink
[netty#3218] Add ChannelPool / ChannelPoolMap abstraction and impleme…
Browse files Browse the repository at this point in the history
…ntations

Motivation:

Many projects need some kind a Channel/Connection pool implementation. While the protocols are different many things can be shared, so we should provide a generic API and implementation.

Modifications:

Add ChannelPool / ChannelPoolMap API and implementations.

Result:

Reusable / Generic pool implementation that users can use.
  • Loading branch information
normanmaurer committed Apr 30, 2015
1 parent 6c025b2 commit 56c9883
Show file tree
Hide file tree
Showing 17 changed files with 1,538 additions and 1 deletion.
14 changes: 14 additions & 0 deletions common/src/main/java/io/netty/util/internal/PlatformDependent.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -447,6 +450,17 @@ public static ClassLoader getSystemClassLoader() {
return PlatformDependent0.getSystemClassLoader();
}

/**
* Returns a new concurrent {@link Deque}.
*/
public static <C> Deque<C> newConcurrentDeque() {
if (javaVersion() < 7) {
return new LinkedBlockingDeque<C>();
} else {
return new ConcurrentLinkedDeque<C>();
}
}

private static boolean isAndroid0() {
boolean android;
try {
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,8 @@
<!-- SSLSession implementation -->
<ignore>javax.net.ssl.SSLEngine</ignore>
<ignore>javax.net.ssl.X509ExtendedTrustManager</ignore>

<ignore>java.util.concurrent.ConcurrentLinkedDeque</ignore>
</ignores>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

private volatile EventLoopGroup group;
volatile EventLoopGroup group;
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
Expand Down
12 changes: 12 additions & 0 deletions transport/src/main/java/io/netty/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.DefaultNameResolverGroup;
import io.netty.resolver.NameResolver;
import io.netty.resolver.NameResolverGroup;
Expand Down Expand Up @@ -281,6 +282,17 @@ public Bootstrap clone() {
return new Bootstrap(this);
}

/**
* Returns a deep clone of this bootstrap which has the identical configuration except that it uses
* the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar
* settings.
*/
public Bootstrap clone(EventLoopGroup group) {
Bootstrap bs = new Bootstrap(this);
bs.group = group;
return bs;
}

@Override
public String toString() {
if (remoteAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;

import io.netty.channel.Channel;

/**
* A skeletal {@link ChannelPoolHandler} implementation.
*/
public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler {

/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}

/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;

import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;

import java.io.Closeable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

/**
* A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool}
* the {@link Object#hashCode()} and {@link Object#equals(Object)} is used.
*/
public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
implements ChannelPoolMap<K, P>, Iterable<Entry<K, P>>, Closeable {
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();

@Override
public final P get(K key) {
P pool = map.get(checkNotNull(key, "key"));
if (pool == null) {
pool = newPool(key);
P old = map.putIfAbsent(key, pool);
if (old != null) {
// We need to destroy the newly created pool as we not use it.
pool.close();
pool = old;
}
}
return pool;
}
/**
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
* {@code false} otherwise.
*
* Please note that {@code null} keys are not allowed.
*/
public final boolean remove(K key) {
P pool = map.remove(checkNotNull(key, "key"));
if (pool != null) {
pool.close();
return true;
}
return false;
}

@Override
public final Iterator<Entry<K, P>> iterator() {
return new ReadOnlyIterator<Entry<K, P>>(map.entrySet().iterator());
}

/**
* Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}.
*/
public final int size() {
return map.size();
}

/**
* Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}.
*/
public final boolean isEmpty() {
return map.isEmpty();
}

@Override
public final boolean contains(K key) {
return map.containsKey(checkNotNull(key, "key"));
}

/**
* Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}.
*/
protected abstract P newPool(K key);

@Override
public final void close() {
for (K key: map.keySet()) {
remove(key);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;

/**
* Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*/
public interface ChannelHealthChecker {

/**
* {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
*/
ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
}
};

/**
* Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
* the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
Future<Boolean> isHealthy(Channel channel);
}
56 changes: 56 additions & 0 deletions transport/src/main/java/io/netty/channel/pool/ChannelPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;

import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;

import java.io.Closeable;
import java.io.IOException;

/**
* Allows to acquire and release {@link Channel} and so act as a pool of these.
*/
public interface ChannelPool extends Closeable {

/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once
* the acquire is successful and failed otherwise.
*/
Future<Channel> acquire();

/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once
* the acquire is successful and failed otherwise.
*/
Future<Channel> acquire(Promise<Channel> promise);

/**
* Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel);

/**
* Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel, Promise<Void> promise);

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;

/**
* Handler which is called for various actions done by the {@link ChannelPool}.
*/
public interface ChannelPoolHandler {
/**
* Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
* {@link ChannelPool#release(Channel, Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelReleased(Channel ch) throws Exception;

/**
* Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelAcquired(Channel ch) throws Exception;

/**
* Called once a new {@link Channel} is created in the {@link ChannelPool}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelCreated(Channel ch) throws Exception;
}
Loading

0 comments on commit 56c9883

Please sign in to comment.