Skip to content

Commit

Permalink
[java client] Implement RPC tracing, part 1
Browse files Browse the repository at this point in the history
First part of this work is adding the tracing objects and doing the tracing. A second
patch will make this information available to users.

This patch is using a pretty simple method of just
shoving container objects into a list, per RPC. The traces are lightweight
and don't try anything fancy. We also introduce the concept of "parent RPC", so that say
a Write RPC spawns a GetTableLocations, and the latter will be added to the former
so that the call to the master adds traces to both RPCs.

This patch isn't adding a nice way to present the traces (like JSON) but here's a simple
toString example:

RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973547, action=SEND_TO_SERVER, server=3926a6a73e994152be1336beb434154e},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548, action=RECEIVE_FROM_SERVER, server=3926a6a73e994152be1336beb434154e, callStatus=Network error: [Peer 3926a6a73e994152be1336beb434154e] Connection reset on [id: 0xc83743df]}
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973548, action=SLEEP_THEN_RETRY, callStatus=Network error: [Peer 3926a6a73e994152be1336beb434154e] Connection reset on [id: 0xc83743df]},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973574, action=QUERY_MASTER},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973574, action=SEND_TO_SERVER, server=c0d4588690d241c69821ee773eebd185},
RpcTraceFrame{rpcMethod='GetTableLocations', timestampMs=1477079973576, action=RECEIVE_FROM_SERVER, server=c0d4588690d241c69821ee773eebd185, callStatus=OK},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579, action=PICKED_REPLICA},
RpcTraceFrame{rpcMethod='Write', timestampMs=1477079973579, action=SEND_TO_SERVER, server=0353a6d97d6c49f9a727bc1ee6c3393e},

This patch also fixes up some paths where we weren't passing a timeout correctly to an
RPC that was created in relation to another RPC (basically paths where the parent RPC
had to be set).

Change-Id: I69ef56acc071b9f80b34e38c1821df4096f54907
Reviewed-on: http://gerrit.cloudera.org:8080/4781
Reviewed-by: Dan Burkert <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
jdcryans committed Nov 4, 2016
1 parent ce80f4e commit bdbee44
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner)
RemoteTablet tablet = scanner.currentTablet();
assert (tablet != null);
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
TabletClient client =
connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
TabletClient client = connectionCache.getClient(uuid);
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
Expand All @@ -597,7 +597,9 @@ Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner)
// A null client means we either don't know about this tablet anymore (unlikely) or we
// couldn't find a leader (which could be triggered by a read timeout).
// We'll first delay the RPC in case things take some time to settle down, then retry.
return delayedSendRpcToTablet(nextRequest, null);
Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid
+ " will retry after a delay");
return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
}
client.sendRpc(nextRequest);
return d;
Expand Down Expand Up @@ -685,6 +687,12 @@ <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
}
}

request.addTrace(
new RpcTraceFrame.RpcTraceFrameBuilder(
request.method(),
RpcTraceFrame.Action.QUERY_MASTER)
.build());

// We fall through to here in two cases:
//
// 1) This client has not yet discovered the tablet which is responsible for
Expand All @@ -702,7 +710,7 @@ <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
Callback<Deferred<R>, Master.GetTableLocationsResponsePB> cb = new RetryRpcCB<>(request);
Callback<Deferred<R>, Exception> eb = new RetryRpcErrback<>(request);
Deferred<Master.GetTableLocationsResponsePB> returnedD =
locateTablet(request.getTable(), partitionKey);
locateTablet(request.getTable(), partitionKey, request);
return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
}

Expand All @@ -712,7 +720,7 @@ <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback and
* {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code Deferred}
* returned by {@link #locateTablet(KuduTable, byte[])}.
* returned by {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
* @param <R> RPC's return type.
* @param <D> Previous query's return type, which we don't use, but need to specify in order to
* tie it all together.
Expand All @@ -739,7 +747,7 @@ public String toString() {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback" and
* {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
* {@link #locateTablet(KuduTable, byte[])}.
* {@link #locateTablet(KuduTable, byte[], KuduRpc)}.
* @see #delayedSendRpcToTablet(KuduRpc, KuduException)
* @param <R> The type of the original RPC.
*/
Expand Down Expand Up @@ -818,10 +826,12 @@ public void run(final Timeout timeout) {
}
}
}
IsCreateTableDoneRequest rpc = new IsCreateTableDoneRequest(masterTable, tableId);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
IsCreateTableDoneRequest isCreateTableDoneRequest =
new IsCreateTableDoneRequest(masterTable, tableId);
isCreateTableDoneRequest.setTimeoutMillis(defaultAdminOperationTimeoutMs);
isCreateTableDoneRequest.setParentRpc(rpc);
final Deferred<Master.IsCreateTableDoneResponsePB> d =
sendRpcToTablet(rpc).addCallback(new IsCreateTableDoneCB(tableId));
sendRpcToTablet(isCreateTableDoneRequest).addCallback(new IsCreateTableDoneCB(tableId));
if (has_permit) {
// The errback is needed here to release the lookup permit
d.addCallbacks(new ReleaseMasterLookupPermit<Master.IsCreateTableDoneResponsePB>(),
Expand Down Expand Up @@ -958,13 +968,15 @@ static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
* Sends a getTableLocations RPC to the master to find the table's tablets.
* @param table table to lookup
* @param partitionKey can be null, if not we'll find the exact tablet that contains it
* @param parentRpc RPC that prompted a master lookup, can be null
* @return Deferred to track the progress
*/
private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable table,
byte[] partitionKey) {
final boolean has_permit = acquireMasterLookupPermit();
byte[] partitionKey,
KuduRpc<?> parentRpc) {
boolean hasPermit = acquireMasterLookupPermit();
String tableId = table.getTableId();
if (!has_permit) {
if (!hasPermit) {
// If we failed to acquire a permit, it's worth checking if someone
// looked up the tablet we're interested in. Every once in a while
// this will save us a Master lookup.
Expand All @@ -974,22 +986,27 @@ private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable tabl
return Deferred.fromResult(null); // Looks like no lookup needed.
}
}
// Leave the end of the partition key range empty in order to pre-fetch tablet locations.
GetTableLocationsRequest rpc =
new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
final Deferred<Master.GetTableLocationsResponsePB> d;

// If we know this is going to the master, check the master consensus
// configuration (as specified by 'masterAddresses' field) to determine and
// cache the current leader.
Deferred<Master.GetTableLocationsResponsePB> d;
if (isMasterTable(tableId)) {
d = getMasterTableLocationsPB();
d = getMasterTableLocationsPB(parentRpc);
} else {
// Leave the end of the partition key range empty in order to pre-fetch tablet locations.
GetTableLocationsRequest rpc =
new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
if (parentRpc != null) {
rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
rpc.setParentRpc(parentRpc);
} else {
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
}
d = sendRpcToTablet(rpc);
}
d.addCallback(new MasterLookupCB(table, partitionKey));
if (has_permit) {
if (hasPermit) {
d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
}
return d;
Expand All @@ -1000,7 +1017,7 @@ private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable tabl
* fill a {@link Master.GetTabletLocationsResponsePB} object.
* @return An initialized Deferred object to hold the response.
*/
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
final GetMasterRegistrationReceived received =
new GetMasterRegistrationReceived(masterAddresses, responseD);
Expand All @@ -1017,7 +1034,7 @@ Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
Status statusIOE = Status.IOError(message);
d = Deferred.fromError(new NonRecoverableException(statusIOE));
} else {
d = getMasterRegistration(clientForHostAndPort);
d = getMasterRegistration(clientForHostAndPort, parentRpc);
}
d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
}
Expand Down Expand Up @@ -1088,7 +1105,7 @@ private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table,
// When lookup completes, the tablet (or non-covered range) for the next
// partition key will be located and added to the client's cache.
final byte[] lookupKey = partitionKey;
return locateTablet(table, key).addCallbackDeferring(
return locateTablet(table, key, null).addCallbackDeferring(
new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
@Override
public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
Expand Down Expand Up @@ -1156,7 +1173,7 @@ <R> void handleRetryableError(final KuduRpc<R> rpc, KuduException ex) {
* {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback will
* be immediately called.
* @param rpc the RPC to retry later
* @param ex the reason why we need to retry, might be null
* @param ex the reason why we need to retry
* @return a Deferred object to use if this method is called inline with the user's original
* attempt to send the RPC. Can be ignored in any other context that doesn't need to return a
* Deferred back to the user.
Expand All @@ -1171,6 +1188,15 @@ public void run(final Timeout timeout) {
sendRpcToTablet(rpc);
}
}
assert (ex != null);
Status reasonForRetry = ex.getStatus();
rpc.addTrace(
new RpcTraceFrame.RpcTraceFrameBuilder(
rpc.method(),
RpcTraceFrame.Action.SLEEP_THEN_RETRY)
.callStatus(reasonForRetry)
.build());

long sleepTime = getSleepTimeForRpc(rpc);
if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
// Don't let it retry.
Expand Down Expand Up @@ -1389,14 +1415,21 @@ public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
/**
* Retrieve the master registration (see {@link GetMasterRegistrationResponse}
* for a replica.
* @param masterClient An initialized client for the master replica.
* @return A Deferred object for the master replica's current registration.
* @param masterClient an initialized client for the master replica
* @param parentRpc RPC that prompted a master lookup, can be null
* @return a Deferred object for the master replica's current registration
*/
Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient masterClient) {
Deferred<GetMasterRegistrationResponse> getMasterRegistration(
TabletClient masterClient, KuduRpc<?> parentRpc) {
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
if (parentRpc != null) {
rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
rpc.setParentRpc(parentRpc);
} else {
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
}
Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
rpc.attempt++;
masterClient.sendRpc(rpc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ Pair<Response, Object> deserialize(final CallResponse callResponse,
public String toString() {
return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
+ (tablet != null? ", tabletSlice=" + tablet.getTabletId() : "")
+ ", attempt=" + attempt + ')';
+ ", attempt=" + attempt + ", " + super.toString() + ")";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public String toString() {
.add("operations", operations.size())
.add("tablet", tablet)
.add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
.add("rpc", super.toString())
.toString();
}

Expand Down
57 changes: 57 additions & 0 deletions java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
package org.apache.kudu.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -39,7 +40,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;

Expand All @@ -61,12 +65,20 @@
@InterfaceAudience.Private
public abstract class KuduRpc<R> {

@VisibleForTesting
public static final int MAX_TRACES_SIZE = 100;

// Service names.
protected static final String MASTER_SERVICE_NAME = "kudu.master.MasterService";
protected static final String TABLET_SERVER_SERVICE_NAME = "kudu.tserver.TabletServerService";

private static final Logger LOG = LoggerFactory.getLogger(KuduRpc.class);

private final List<RpcTraceFrame> traces =
Collections.synchronizedList(new ArrayList<RpcTraceFrame>());

private KuduRpc<?> parentRpc;

/**
* Returns the partition key this RPC is for, or {@code null} if the RPC is
* not tablet specific.
Expand Down Expand Up @@ -203,9 +215,45 @@ private void handleCallback(final Object result) {
sequenceId = RequestTracker.NO_SEQ_NO;
}
deadlineTracker.reset();
traces.clear();
parentRpc = null;
d.callback(result);
}

/**
* Add the provided trace to this RPC's collection of traces. If this RPC has a parent RPC, it
* will also receive that trace. If this RPC has reached the limit of traces it can track then
* the trace will just be discarded.
* @param rpcTraceFrame trace to add
*/
void addTrace(RpcTraceFrame rpcTraceFrame) {
if (parentRpc != null) {
parentRpc.addTrace(rpcTraceFrame);
}

if (traces.size() == MAX_TRACES_SIZE) {
// Add a last trace that indicates that we've reached the max size.
traces.add(
new RpcTraceFrame.RpcTraceFrameBuilder(
this.method(),
RpcTraceFrame.Action.TRACE_TRUNCATED)
.build());
} else if (traces.size() < MAX_TRACES_SIZE) {
traces.add(rpcTraceFrame);
}
}

/**
* Sets this RPC to receive traces from the provided parent RPC. An RPC can only have one and
* only one parent RPC.
* @param parentRpc RPC that will also receive traces from this RPC
*/
void setParentRpc(KuduRpc<?> parentRpc) {
assert (this.parentRpc == null);
assert (this.parentRpc != this);
this.parentRpc = parentRpc;
}

/**
* Package private way of making an RPC complete by giving it its result.
* If this RPC has no {@link Deferred} associated to it, nothing will
Expand Down Expand Up @@ -267,6 +315,14 @@ ReplicaSelection getReplicaSelection() {
return ReplicaSelection.LEADER_ONLY;
}

/**
* Get an immutable copy of the traces.
* @return list of traces
*/
List<RpcTraceFrame> getImmutableTraces() {
return ImmutableList.copyOf(traces);
}

void setSequenceId(long sequenceId) {
assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
this.sequenceId = sequenceId;
Expand All @@ -289,6 +345,7 @@ public String toString() {
// this method if DEBUG is enabled.
if (LOG.isDebugEnabled()) {
buf.append(", ").append(deferred);
buf.append(", ").append(traces);
}
buf.append(')');
return buf.toString();
Expand Down
Loading

0 comments on commit bdbee44

Please sign in to comment.