Skip to content

Commit

Permalink
Merge branch '3.4-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
spmallette committed Apr 23, 2021
2 parents ea085b1 + 11a963d commit a0b82c6
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Exposed barrier size with getter for `NoOpBarrierStep`.
* Bumped to Netty 4.1.61.
* Added `max_content_length` as a Python driver setting.
* Fixed bug in Java `Client` initialization where certain configurations might produce a deadlock.
* Ensured that `barrier()` additions by strategies were controlled solely by `LazyBarrierStrategy`.
* Fixed `NullPointerException` in `ResponseMessage` deserialization for GraphSON.
* Enabled the Gremlin.Net driver to repair its connection pool after the server was temporarily unavailable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.driver;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
Expand All @@ -41,6 +43,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -511,19 +515,30 @@ protected Connection chooseConnection(final RequestMessage msg) throws TimeoutEx
*/
@Override
protected void initializeImplementation() {
// use a special executor here to initialize the Host instances as the worker thread pool may be
// insufficiently sized for this task and the parallel initialization of the ConnectionPool. if too small
// tasks may be schedule in such a way as to produce a deadlock: TINKERPOP-2550
//
// the cost of this single threaded executor here should be fairly small because it is only used once at
// initialization and shutdown. since users will typically construct a Client once for the life of their
// application there shouldn't be tons of thread pools being created and destroyed.
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-init-%d").build();
final ExecutorService hostExecutor = Executors.newSingleThreadExecutor(threadFactory);
try {
CompletableFuture.allOf(cluster.allHosts().stream()
.map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), cluster.executor()))
.map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), hostExecutor))
.toArray(CompletableFuture[]::new))
.join();
} catch (CompletionException ex) {
Throwable cause = null;
Throwable cause;
Throwable result = ex;
if (null != (cause = ex.getCause())) {
result = cause;
}

logger.error("", result);
} finally {
hostExecutor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
Expand Down Expand Up @@ -95,9 +96,9 @@ public ConnectionPool(final Host host, final Client client, final Optional<Integ
this.connections = new CopyOnWriteArrayList<>();

try {
final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
final List<CompletableFuture<Void>> connectionCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
connCreationFutures.add(CompletableFuture.runAsync(() -> {
connectionCreationFutures.add(CompletableFuture.runAsync(() -> {
try {
this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
} catch (ConnectionException e) {
Expand All @@ -106,7 +107,7 @@ public ConnectionPool(final Host host, final Client client, final Optional<Integ
}, cluster.executor()));
}

CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
} catch (CancellationException ce) {
logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
throw ce;
Expand All @@ -118,8 +119,7 @@ public ConnectionPool(final Host host, final Client client, final Optional<Integ
" Successful connections=" + this.connections.size() +
". Closing the connection pool.";


Throwable cause = null;
Throwable cause;
Throwable result = ce;

if (null != (cause = result.getCause())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SimpleSocketServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

public Channel start(ChannelInitializer<SocketChannel> channelInitializer) throws InterruptedException {
public Channel start(final ChannelInitializer<SocketChannel> channelInitializer) throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
final ServerBootstrap b = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -84,6 +88,58 @@ public void shutdown() {
rootLogger.removeAppender(recordingAppender);
}

/**
* Constructs a deadlock situation when initializing a {@link Client} object in sessionless form that leads to
* hanging behavior in low resource environments (TINKERPOP-2504) and for certain configurations of the
* {@link Cluster} object where there are simply not enough threads to properly allow the {@link Host} and its
* related {@link ConnectionPool} objects to spin up properly - see TINKERPOP-2550.
*/
@Test
public void shouldNotDeadlockOnInitialization() throws Exception {
// it seems you cah add the same host more than once so while kinda weird it is helpful in faithfully
// recreating the deadlock situation, though it can/will happen with just one host. workerPoolSize at
// "1" also helps faithfully reproduce the problem though it can happen at larger pool sizes depending
// on the timing/interleaving of tasks. the larger connection pool sizes may not be required given the
// other settings at play but again, just trying to make sure the deadlock state is consistently produced
// and a larger pool size will mean more time to elapse scheduling connection creation tasks which may
// further improve chances of scheduling conflicts that produce the deadlock.
//
// to force this test to a fail state, change ClusteredClient.initializeImplementation() to use the
// standard Cluster.executor rather than the hostExecutor (which is a single threaded independent thread
// pool used just for the purpose of initializing the hosts).
final Cluster cluster = Cluster.build("localhost").
addContactPoint("localhost").
addContactPoint("localhost").port(SimpleSocketServer.PORT).
workerPoolSize(1).
minConnectionPoolSize(32).maxConnectionPoolSize(32).create();

final AtomicBoolean failed = new AtomicBoolean(false);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
final Client client = cluster.connect();

// test will hang in init() where the Host and ConnectionPool are started up
client.init();
} catch (Exception ex) {
// should not "fail" - just hang and then timeout during the executor shutdown as there is
// a deadlock state, but we have this here just in case. a failed assertion of this value
// below could be interesting
logger.error("Client initialization failed with exception which was unexpected", ex);
failed.set(true);
} finally {
cluster.close();
}
});

executor.shutdown();

// 30 seconds should be ample time, even for travis. the deadlock state happens quite immediately in
// testing and in most situations this test should zip by in subsecond pace
assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), is(true));
assertThat(failed.get(), is(false));
}

/**
* Test a scenario when server closes a connection which does not have any active requests. Such connection
* should be destroyed and replaced by another connection on next request.
Expand Down

0 comments on commit a0b82c6

Please sign in to comment.