Skip to content

Commit

Permalink
[java client] Tight-ish loop in master lookups if a tablet doesn't ha…
Browse files Browse the repository at this point in the history
…ve a leader

There's currently the possibility of a situation like this:
1. sendRpcToTablet finds the tablet that the RPC is going to doesn't have a leader
2. a master lookup is sent
3. discoverTablet is called back and still doesn't find a leader
4. RetryRpcCallback is invoked right away, going back to step 1

This quickly gets us with this exception:

Too many attempts: KuduRpc(method=Write, tablet=redacted, attempt=101, DeadlineTracker(timeout=30000, elapsed=4747)

Notice how it retried 101 times in less than 5 seconds.

This patch changes step 3 so that an exception is thrown so that RetryRpcErrback
is invoked instead, which will add delay before retrying the RPC.

This bug was found by ITClient. It's not _just_ a flaky test!

Change-Id: Ibf2bd53b03551642e4d036d322e1e592b7c2cfec
Reviewed-on: http://gerrit.cloudera.org:8080/4570
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Jean-Daniel Cryans <[email protected]>
  • Loading branch information
jdcryans committed Oct 4, 2016
1 parent 1a60ed7 commit d87486c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,8 @@ public String toString() {
}

/**
* "Errback" used to delayed-retry a RPC if it fails due to no leader master being found.
* "Errback" used to delayed-retry a RPC if a recoverable exception is thrown in the callback
* chain.
* Other exceptions are used to notify request RPC error, and passed through to be handled
* by the caller.
* <p>
Expand All @@ -835,14 +836,9 @@ public RetryRpcErrback(KuduRpc<R> request) {

@Override
public Deferred<R> call(Exception arg) {
if (arg instanceof NoLeaderMasterFoundException) {
// If we could not find the leader master, try looking up the leader master
// again.
// TODO: Handle the situation when multiple in-flight RPCs are queued waiting
// for the leader master to be determine (either after a failure or at initialization
// time). This could re-use some of the existing piping in place for non-master tablets.
if (arg instanceof RecoverableException) {
Deferred<R> d = request.getDeferred();
delayedSendRpcToTablet(request, (NoLeaderMasterFoundException) arg);
delayedSendRpcToTablet(request, (KuduException) arg);
return d;
}
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -1318,7 +1314,7 @@ public Object call(final GetTableLocationsResponsePB response) {
partitionKey,
response.getTabletLocationsList(),
response.getTtlMillis());
} catch (NonRecoverableException e) {
} catch (KuduException e) {
return e;
}
}
Expand Down Expand Up @@ -1359,7 +1355,7 @@ void releaseMasterLookupPermit() {
void discoverTablets(KuduTable table,
byte[] requestPartitionKey,
List<Master.TabletLocationsPB> locations,
long ttl) throws NonRecoverableException {
long ttl) throws KuduException {
String tableId = table.getTableId();
String tableName = table.getName();

Expand Down Expand Up @@ -1408,6 +1404,15 @@ void discoverTablets(KuduTable table,
// Give the locations to the tablet location cache for the table, so that it
// can cache them and discover non-covered ranges.
locationsCache.cacheTabletLocations(tablets, requestPartitionKey, ttl);

// Now test if we found the tablet we were looking for. If so, RetryRpcCB will retry the RPC
// right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
// sleep before retrying.
TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
if (!entry.isNonCoveredRange() && clientFor(entry.getTablet()) == null) {
throw new NoSuitableReplicaException(
Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
}
}

RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
Expand Down Expand Up @@ -1490,6 +1495,8 @@ public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
* @return A Deferred object for the master replica's current registration.
*/
Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient masterClient) {
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;

/**
* Indicates that the master lookup failed because no suitable replicas were found for
* the given RPC.
*/
final class NoSuitableReplicaException extends RecoverableException {

NoSuitableReplicaException(Status status) {
super(status);
}

NoSuitableReplicaException(Status status, Exception cause) {
super(status, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@
public class TestAsyncKuduClient extends BaseKuduTest {
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncKuduClient.class);

private static final String TABLE_NAME =
TestAsyncKuduClient.class.getName() + "-" + System.currentTimeMillis();
private static KuduTable table;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
BaseKuduTest.setUpBeforeClass();
// Set to 1 for testDisconnect to always test disconnecting the right server.
CreateTableOptions options = getBasicCreateTableOptions().setNumReplicas(1);
table = createTable(TABLE_NAME, basicSchema, options);
}

@Test(timeout = 100000)
public void testDisconnect() throws Exception {
// Set to 1 to always test disconnecting the right server.
CreateTableOptions options = getBasicCreateTableOptions().setNumReplicas(1);
KuduTable table = createTable(
"testDisconnect-" + System.currentTimeMillis(),
basicSchema,
options);

// Test that we can reconnect to a TS after a disconnection.
// 1. Warm up the cache.
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
Expand Down Expand Up @@ -149,9 +149,51 @@ public void testBadHostnames() throws Exception {
"Invalid table ID", null, null);
client.discoverTablets(badTable, null, tabletLocations, 1000);
fail("This should have failed quickly");
} catch (Exception ex) {
assertTrue(ex instanceof NonRecoverableException);
} catch (NonRecoverableException ex) {
assertTrue(ex.getMessage().contains(badHostname));
}
}

@Test
public void testNoLeader() throws Exception {
CreateTableOptions options = getBasicCreateTableOptions();
KuduTable table = createTable(
"testNoLeader-" + System.currentTimeMillis(),
basicSchema,
options);

// Lookup the current locations so that we can pass some valid information to discoverTablets.
List<LocatedTablet> tablets =
client.locateTable(table, null, null, DEFAULT_SLEEP).join(DEFAULT_SLEEP);
LocatedTablet tablet = tablets.get(0);
LocatedTablet.Replica leader = tablet.getLeaderReplica();

// Fake a master lookup that only returns one follower for the tablet.
List<Master.TabletLocationsPB> tabletLocations = new ArrayList<>();
Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
partition.setPartitionKeyStart(ByteString.EMPTY);
partition.setPartitionKeyEnd(ByteString.EMPTY);
tabletPb.setPartition(partition);
tabletPb.setTabletId(ByteString.copyFrom(tablet.getTabletId()));
Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
hostBuilder.setHost(leader.getRpcHost());
hostBuilder.setPort(leader.getRpcPort());
tsInfoBuilder.addRpcAddresses(hostBuilder);
tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8("some uuid"));
Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
Master.TabletLocationsPB.ReplicaPB.newBuilder();
replicaBuilder.setTsInfo(tsInfoBuilder);
replicaBuilder.setRole(Metadata.RaftPeerPB.Role.FOLLOWER); // This is a lie
tabletPb.addReplicas(replicaBuilder);
tabletLocations.add(tabletPb.build());

try {
client.discoverTablets(table, new byte[0], tabletLocations, 1000);
fail("discoverTablets should throw an exception if there's no leader");
} catch (NoSuitableReplicaException ex) {
// Expected.
}
}
}

0 comments on commit d87486c

Please sign in to comment.