Skip to content

Commit

Permalink
[java client] Decouple TabletClient from RemoteTablet
Browse files Browse the repository at this point in the history
RemoteTablet was caching TabletClients which was making it hard to test it
in isolation, since it required having real connections to servers.

This patch makes it so that only UUIDs are passed around, the client now
queries a RemoteTablet to know which UUID to get from ConnectionCache. A
lot of new unit tests were written that weren't possible before without
a lot of mocking.

Doing this brings subtle behavior changes. ConnectionCache is now the only
component responsible for handling TabletClients. The whole concept of "reconnection"
could be brought into ConnectionCache, simplifying sendRpcToTablet. It also
means that using an object monitor likely creates a bottleneck in ConnectionCache,
so a RWL was deployed instead.

Since I was in that code, I also changed the tablet ID from Slice to String in
RemoteTablet. It was an old mistake I made years ago, we didn't need the Slice.

This still needs to be benchmarked against the 1.0.x client jar.

Change-Id: If3ad2190c7e2c7f51cb9ffe6ed3348b62488e675
Reviewed-on: http://gerrit.cloudera.org:8080/4757
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
jdcryans authored and adembo committed Oct 20, 2016
1 parent cd40d1c commit 65cb2ed
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 520 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,21 +583,21 @@ void removeSession(AsyncKuduSession session) {
* @return A deferred row.
*/
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
final RemoteTablet tablet = scanner.currentTablet();
RemoteTablet tablet = scanner.currentTablet();
assert (tablet != null);
final TabletClient client = tablet.getLeaderConnection();
final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
next_request.attempt++;
nextRequest.attempt++;
if (client == null || !client.isAlive()) {
// A null client means we either don't know about this tablet anymore (unlikely) or we
// couldn't find a leader (which could be triggered by a read timeout).
// We'll first delay the RPC in case things take some time to settle down, then retry.
return delayedSendRpcToTablet(next_request, null);
return delayedSendRpcToTablet(nextRequest, null);
}
client.sendRpc(next_request);
client.sendRpc(nextRequest);
return d;
}

Expand All @@ -614,7 +614,7 @@ Deferred<AsyncKuduScanner.Response> closeScanner(final AsyncKuduScanner scanner)
return Deferred.fromResult(null);
}

final TabletClient client = tablet.getLeaderConnection();
final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
if (client == null || !client.isAlive()) {
// Oops, we couldn't find a tablet server that hosts this tablet. Our
// cache was probably invalidated while the client was scanning. So
Expand Down Expand Up @@ -666,43 +666,16 @@ <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
request.setPropagatedTimestamp(lastPropagatedTs);
}

// If we found a tablet, we'll try to find the TS to talk to. If that TS was previously
// disconnected, say because we didn't query that tablet for some seconds, then we'll try to
// reconnect based on the old information. If that fails, we'll instead continue with the next
// block that queries the master.
// If we found a tablet, we'll try to find the TS to talk to.
if (entry != null) {
RemoteTablet tablet = entry.getTablet();
TabletClient tabletClient = tablet.getLeaderConnection();
if (tabletClient != null) {
final Deferred<R> d = request.getDeferred();
if (tabletClient.isAlive()) {
request.setTablet(tablet);
tabletClient.sendRpc(request);
return d;
}
try {
tablet.reconnectTabletClient(tabletClient);
} catch (UnknownHostException e) {
LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
tabletClient.getUuid(), e);
// Because of this exception, getLeaderConnection() below won't be able to find a newTabletClient
// and we'll delay the RPC.
}
TabletClient newTabletClient = tablet.getLeaderConnection();
assert (tabletClient != newTabletClient);

if (newTabletClient == null) {
// Wait a little bit before hitting the master.
return delayedSendRpcToTablet(request, null);
}

if (!newTabletClient.isAlive()) {
LOG.debug("Tried reconnecting to tablet server {} but failed, " +
"will query the master", tabletClient.getUuid());
// Let fall through.
} else {
request.setTablet(tablet);
newTabletClient.sendRpc(request);
String uuid = tablet.getLeaderUUID();
if (uuid != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
TabletClient client = connectionCache.getLiveClient(uuid);
if (client != null) {
client.sendRpc(request);
return d;
}
}
Expand Down Expand Up @@ -924,7 +897,12 @@ long getSleepTimeForRpc(KuduRpc<?> rpc) {
*/
@VisibleForTesting
List<TabletClient> getTabletClients() {
return connectionCache.getTabletClients();
return connectionCache.getImmutableTabletClientsList();
}

@VisibleForTesting
TabletClient getTabletClient(String uuid) {
return connectionCache.getClient(uuid);
}

/**
Expand Down Expand Up @@ -988,7 +966,7 @@ private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable tabl
// this will save us a Master lookup.
TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
if (entry != null && !entry.isNonCoveredRange()
&& entry.getTablet().getLeaderConnection() != null) {
&& entry.getTablet().getLeaderUUID() != null) {
return Deferred.fromResult(null); // Looks like no lookup needed.
}
}
Expand Down Expand Up @@ -1158,7 +1136,7 @@ <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClie
* a RPC, so we need to demote it and retry.
*/
<R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
rpc.getTablet().demoteLeader(server);
rpc.getTablet().demoteLeader(server.getUuid());
handleRetryableError(rpc, ex);
}

Expand Down Expand Up @@ -1204,8 +1182,8 @@ public void run(final Timeout timeout) {
*/
private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
tablet.getTabletIdAsString());
tablet.removeTabletClient(server);
tablet.getTabletId());
tablet.removeTabletClient(server.getUuid());
}

/** Callback executed when a master lookup completes. */
Expand Down Expand Up @@ -1295,10 +1273,30 @@ void discoverTablets(KuduTable table,
// already discovered the tablet, its locations are refreshed.
List<RemoteTablet> tablets = new ArrayList<>(locations.size());
for (Master.TabletLocationsPB tabletPb : locations) {
RemoteTablet rt = RemoteTablet.createTabletFromPb(tableId, tabletPb, connectionCache);

String tabletId = tabletPb.getTabletId().toStringUtf8();

List<UnknownHostException> lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount());
for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
try {
connectionCache.connectTS(replica.getTsInfo());
} catch (UnknownHostException ex) {
lookupExceptions.add(ex);
}
}

if (!lookupExceptions.isEmpty() &&
lookupExceptions.size() == tabletPb.getReplicasCount()) {
Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
lookupExceptions);
throw new NonRecoverableException(statusIOE);
}

Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
RemoteTablet rt = new RemoteTablet(tableId, tabletId, partition, tabletPb);

LOG.info("Learned about tablet {} for table '{}' with partition {}",
rt.getTabletIdAsString(), tableName, rt.getPartition());
rt.getTabletId(), tableName, rt.getPartition());
tablets.add(rt);
}

Expand All @@ -1310,7 +1308,7 @@ void discoverTablets(KuduTable table,
// right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
// sleep before retrying.
TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderConnection() == null) {
if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
throw new NoLeaderFoundException(
Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ void scanFinished() {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning tablet {} for partition {} with scanner id {}",
tablet.getTabletIdAsString(), tablet.getPartition(), Bytes.pretty(scannerId));
tablet.getTabletId(), tablet.getPartition(), Bytes.pretty(scannerId));
}
scannerId = null;
sequenceId = 0;
Expand Down Expand Up @@ -537,7 +537,7 @@ public String toString() {
}

public String toString() {
final String tablet = this.tablet == null ? "null" : this.tablet.getTabletIdAsString();
final String tablet = this.tablet == null ? "null" : this.tablet.getTabletId();
final StringBuilder buf = new StringBuilder();
buf.append("KuduScanner(table=");
buf.append(table.getName());
Expand Down Expand Up @@ -785,7 +785,7 @@ Pair<Response, Object> deserialize(final CallResponse callResponse,

public String toString() {
return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
+ (tablet != null? ", tabletSlice=" + tablet.getTabletIdAsString() : "")
+ (tablet != null? ", tabletSlice=" + tablet.getTabletId() : "")
+ ", attempt=" + attempt + ')';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ boolean isRequestTracked() {

@Override
void updateStatistics(Statistics statistics, BatchResponse response) {
Slice tabletId = this.getTablet().getTabletId();
String tabletId = this.getTablet().getTabletId();
String tableName = this.getTable().getName();
TabletStatistics tabletStatistics = statistics.getTabletStatistics(tableName, tabletId);
if (response == null) {
Expand Down
Loading

0 comments on commit 65cb2ed

Please sign in to comment.