Skip to content

Commit

Permalink
Correctly handling EsHadoopException in TransportPool.validate() (ela…
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Nov 6, 2023
1 parent 35982db commit 201c83e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.hadoop.rest.pooling;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider;
import org.elasticsearch.hadoop.rest.Transport;
import org.elasticsearch.hadoop.security.SecureSettings;
import org.elasticsearch.hadoop.util.SettingsUtils;
Expand All @@ -29,6 +30,7 @@
import java.util.UUID;

import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT;
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_SECURITY_USER_PROVIDER_CLASS;

public class AbstractTransportPoolTest {

Expand Down Expand Up @@ -77,4 +79,78 @@ public void removeOldConnections() throws Exception {
}
}


@Test
public void testValidationWithNoUserProvider() throws Exception {
/*
* In this test, we make sure that validating PooledTransports does not throw an exception if the resource is bad. Instead, the
* PooledTransport is replaced.
*/
Settings settings = new TestSettings();
settings.setProperty(ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT, "1s");
String host = SettingsUtils.discoveredOrDeclaredNodes(settings).get(0);

TransportPool pool = new TransportPool(UUID.randomUUID().toString(), host, settings, new SecureSettings(settings));

Transport transport1 = null;

try {
transport1 = pool.borrowTransport();
transport1.close();

// Wait the amount of time to close.
Thread.sleep(settings.getTransportPoolingExpirationTimeout().millis() + 1000L);

transport1 = pool.borrowTransport();
transport1.close();

// Will need to remove 2 connections at this point
pool.removeOldConnections();

} finally {
// Close everything
if (transport1 != null) {
transport1.close();
}
}

}

@Test
public void testValidationWithUserProvider() throws Exception {
/*
* In this test, we make sure that validating PooledTransports does not throw an exception if the resource is bad when we have set
* a ES_SECURITY_USER_PROVIDER_CLASS. Instead, the PooledTransport is replaced. Previously this would incorrectly throw an
* exception.
*/
Settings settings = new TestSettings();
settings.setProperty(ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT, "1s");
settings.setProperty(ES_SECURITY_USER_PROVIDER_CLASS, HadoopUserProvider.class.getName());
String badHost = "127.0.0.1:11111"; //intentionally not a real host so that validation fails

TransportPool pool = new TransportPool(UUID.randomUUID().toString(), badHost, settings, new SecureSettings(settings));

Transport transport1 = null;

try {
transport1 = pool.borrowTransport();
transport1.close();

// Wait the amount of time to close.
Thread.sleep(settings.getTransportPoolingExpirationTimeout().millis() + 1000L);

// The following will throw an exception reported in https://github.com/elastic/elasticsearch-hadoop/issues/1362 without the fix
transport1 = pool.borrowTransport();
transport1.close();

// Will need to remove 2 connections at this point
pool.removeOldConnections();

} finally {
// Close everything
if (transport1 != null) {
transport1.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Request;
Expand Down Expand Up @@ -101,6 +102,13 @@ private boolean validate(PooledTransport transport) {
try {
Response response = transport.execute(validationRequest);
return response.hasSucceeded();
} catch (EsHadoopException e) {
if (e.getCause() instanceof IOException) {
log.warn("Could not validate pooled connection on lease. Releasing pooled connection and trying again...", e.getCause());
return false;
} else {
throw e;
}
} catch (IOException ioe) {
log.warn("Could not validate pooled connection on lease. Releasing pooled connection and trying again...", ioe);
return false;
Expand Down

0 comments on commit 201c83e

Please sign in to comment.