diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 7bbfdb96fb..0c0d9bbe0c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -816,7 +816,8 @@ public String toString() {
}
/**
- * "Errback" used to delayed-retry a RPC if it fails due to no leader master being found.
+ * "Errback" used to delayed-retry a RPC if a recoverable exception is thrown in the callback
+ * chain.
* Other exceptions are used to notify request RPC error, and passed through to be handled
* by the caller.
*
@@ -835,14 +836,9 @@ public RetryRpcErrback(KuduRpc request) {
@Override
public Deferred call(Exception arg) {
- if (arg instanceof NoLeaderMasterFoundException) {
- // If we could not find the leader master, try looking up the leader master
- // again.
- // TODO: Handle the situation when multiple in-flight RPCs are queued waiting
- // for the leader master to be determine (either after a failure or at initialization
- // time). This could re-use some of the existing piping in place for non-master tablets.
+ if (arg instanceof RecoverableException) {
Deferred d = request.getDeferred();
- delayedSendRpcToTablet(request, (NoLeaderMasterFoundException) arg);
+ delayedSendRpcToTablet(request, (KuduException) arg);
return d;
}
if (LOG.isDebugEnabled()) {
@@ -1318,7 +1314,7 @@ public Object call(final GetTableLocationsResponsePB response) {
partitionKey,
response.getTabletLocationsList(),
response.getTtlMillis());
- } catch (NonRecoverableException e) {
+ } catch (KuduException e) {
return e;
}
}
@@ -1359,7 +1355,7 @@ void releaseMasterLookupPermit() {
void discoverTablets(KuduTable table,
byte[] requestPartitionKey,
List locations,
- long ttl) throws NonRecoverableException {
+ long ttl) throws KuduException {
String tableId = table.getTableId();
String tableName = table.getName();
@@ -1408,6 +1404,15 @@ void discoverTablets(KuduTable table,
// Give the locations to the tablet location cache for the table, so that it
// can cache them and discover non-covered ranges.
locationsCache.cacheTabletLocations(tablets, requestPartitionKey, ttl);
+
+ // Now test if we found the tablet we were looking for. If so, RetryRpcCB will retry the RPC
+ // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
+ // sleep before retrying.
+ TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
+ if (!entry.isNonCoveredRange() && clientFor(entry.getTablet()) == null) {
+ throw new NoSuitableReplicaException(
+ Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
+ }
}
RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
@@ -1490,6 +1495,8 @@ public Deferred call(List tablets) {
* @return A Deferred object for the master replica's current registration.
*/
Deferred getMasterRegistration(TabletClient masterClient) {
+ // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
+ // basically reuse in some way the master permits.
GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
Deferred d = rpc.getDeferred();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java
new file mode 100644
index 0000000000..e51a346394
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/NoSuitableReplicaException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+/**
+ * Indicates that the master lookup failed because no suitable replicas were found for
+ * the given RPC.
+ */
+final class NoSuitableReplicaException extends RecoverableException {
+
+ NoSuitableReplicaException(Status status) {
+ super(status);
+ }
+
+ NoSuitableReplicaException(Status status, Exception cause) {
+ super(status, cause);
+ }
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index ad6b5d8619..15112217bc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -38,20 +38,20 @@
public class TestAsyncKuduClient extends BaseKuduTest {
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncKuduClient.class);
- private static final String TABLE_NAME =
- TestAsyncKuduClient.class.getName() + "-" + System.currentTimeMillis();
- private static KuduTable table;
-
@BeforeClass
public static void setUpBeforeClass() throws Exception {
BaseKuduTest.setUpBeforeClass();
- // Set to 1 for testDisconnect to always test disconnecting the right server.
- CreateTableOptions options = getBasicCreateTableOptions().setNumReplicas(1);
- table = createTable(TABLE_NAME, basicSchema, options);
}
@Test(timeout = 100000)
public void testDisconnect() throws Exception {
+ // Set to 1 to always test disconnecting the right server.
+ CreateTableOptions options = getBasicCreateTableOptions().setNumReplicas(1);
+ KuduTable table = createTable(
+ "testDisconnect-" + System.currentTimeMillis(),
+ basicSchema,
+ options);
+
// Test that we can reconnect to a TS after a disconnection.
// 1. Warm up the cache.
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
@@ -149,9 +149,51 @@ public void testBadHostnames() throws Exception {
"Invalid table ID", null, null);
client.discoverTablets(badTable, null, tabletLocations, 1000);
fail("This should have failed quickly");
- } catch (Exception ex) {
- assertTrue(ex instanceof NonRecoverableException);
+ } catch (NonRecoverableException ex) {
assertTrue(ex.getMessage().contains(badHostname));
}
}
+
+ @Test
+ public void testNoLeader() throws Exception {
+ CreateTableOptions options = getBasicCreateTableOptions();
+ KuduTable table = createTable(
+ "testNoLeader-" + System.currentTimeMillis(),
+ basicSchema,
+ options);
+
+ // Lookup the current locations so that we can pass some valid information to discoverTablets.
+ List tablets =
+ client.locateTable(table, null, null, DEFAULT_SLEEP).join(DEFAULT_SLEEP);
+ LocatedTablet tablet = tablets.get(0);
+ LocatedTablet.Replica leader = tablet.getLeaderReplica();
+
+ // Fake a master lookup that only returns one follower for the tablet.
+ List tabletLocations = new ArrayList<>();
+ Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
+ Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
+ partition.setPartitionKeyStart(ByteString.EMPTY);
+ partition.setPartitionKeyEnd(ByteString.EMPTY);
+ tabletPb.setPartition(partition);
+ tabletPb.setTabletId(ByteString.copyFrom(tablet.getTabletId()));
+ Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+ Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
+ hostBuilder.setHost(leader.getRpcHost());
+ hostBuilder.setPort(leader.getRpcPort());
+ tsInfoBuilder.addRpcAddresses(hostBuilder);
+ tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8("some uuid"));
+ Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+ Master.TabletLocationsPB.ReplicaPB.newBuilder();
+ replicaBuilder.setTsInfo(tsInfoBuilder);
+ replicaBuilder.setRole(Metadata.RaftPeerPB.Role.FOLLOWER); // This is a lie
+ tabletPb.addReplicas(replicaBuilder);
+ tabletLocations.add(tabletPb.build());
+
+ try {
+ client.discoverTablets(table, new byte[0], tabletLocations, 1000);
+ fail("discoverTablets should throw an exception if there's no leader");
+ } catch (NoSuitableReplicaException ex) {
+ // Expected.
+ }
+ }
}