Skip to content

Commit

Permalink
KUDU-1715. Add a way to set ReplicaSelection to the java client
Browse files Browse the repository at this point in the history
This patch adds a ReplicaSelection option for all the RPCs, which always defaults
to LEADER_ONLY, as well as a way to change it for scanners.

Change-Id: I3bb08c9c78271a0065c8aa8fb9b0f3301f84e828
Reviewed-on: http://gerrit.cloudera.org:8080/4837
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <[email protected]>
  • Loading branch information
jdcryans committed Oct 27, 2016
1 parent a326fc9 commit a7fd4fc
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class AbstractKuduScannerBuilder
List<String> projectedColumnNames = null;
List<Integer> projectedColumnIndexes = null;
long scanRequestTimeout;
ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY;

AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
this.client = client;
Expand Down Expand Up @@ -312,6 +313,17 @@ public S exclusiveUpperBoundRaw(byte[] endPrimaryKey) {
return (S) this;
}

/**
* Sets the replica selection mechanism for this scanner. The default is to read from the
* currently known leader.
* @param replicaSelection replication selection mechanism to use
* @return this instance
*/
public S replicaSelection(ReplicaSelection replicaSelection) {
this.replicaSelection = replicaSelection;
return (S) this;
}

/**
* Set an encoded (inclusive) start partition key for the scan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,9 @@ void removeSession(AsyncKuduSession session) {
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
RemoteTablet tablet = scanner.currentTablet();
assert (tablet != null);
TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
TabletClient client =
connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
Expand Down Expand Up @@ -615,18 +616,20 @@ Deferred<AsyncKuduScanner.Response> closeScanner(final AsyncKuduScanner scanner)
return Deferred.fromResult(null);
}

final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
final TabletClient client = connectionCache.getClient(
tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
if (client == null || !client.isAlive()) {
// Oops, we couldn't find a tablet server that hosts this tablet. Our
// cache was probably invalidated while the client was scanning. So
// we can't close this scanner properly.
LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
return Deferred.fromResult(null);
}
final KuduRpc<AsyncKuduScanner.Response> close_request = scanner.getCloseRequest();
final Deferred<AsyncKuduScanner.Response> d = close_request.getDeferred();
close_request.attempt++;
client.sendRpc(close_request);

final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
closeRequest.attempt++;
client.sendRpc(closeRequest);
return d;
}

Expand Down Expand Up @@ -670,7 +673,7 @@ <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
// If we found a tablet, we'll try to find the TS to talk to.
if (entry != null) {
RemoteTablet tablet = entry.getTablet();
String uuid = tablet.getLeaderUUID();
String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
if (uuid != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public Common.ReadMode pbVersion() {

private final long htTimestamp;

private final ReplicaSelection replicaSelection;

/////////////////////
// Runtime variables.
/////////////////////
Expand Down Expand Up @@ -212,8 +214,6 @@ public Common.ReadMode pbVersion() {

private Deferred<RowResultIterator> prefetcherDeferred;

private boolean inFirstTablet = true;

final long scanRequestTimeout;

AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
Expand All @@ -222,7 +222,8 @@ public Common.ReadMode pbVersion() {
Map<String, KuduPredicate> predicates, long limit,
boolean cacheBlocks, boolean prefetching,
byte[] startPrimaryKey, byte[] endPrimaryKey,
long htTimestamp, int batchSizeBytes, PartitionPruner pruner) {
long htTimestamp, int batchSizeBytes, PartitionPruner pruner,
ReplicaSelection replicaSelection) {
checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
"got %s", batchSizeBytes);
checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
Expand Down Expand Up @@ -280,6 +281,8 @@ public Common.ReadMode pbVersion() {
this.hasMore = false;
this.closed = true;
}

this.replicaSelection = replicaSelection;
}

/**
Expand Down Expand Up @@ -586,11 +589,6 @@ RemoteTablet currentTablet() {
*/
KuduRpc<Response> getOpenRequest() {
checkScanningNotStarted();
// This is the only point where we know we haven't started scanning and where the scanner
// should be fully configured
if (this.inFirstTablet) {
this.inFirstTablet = false;
}
return new ScanRequest(table, State.OPENING);
}

Expand Down Expand Up @@ -684,6 +682,11 @@ Collection<Integer> getRequiredFeatures() {
}
}

@Override
ReplicaSelection getReplicaSelection() {
return replicaSelection;
}

/** Serializes this request. */
ChannelBuffer serialize(Message header) {
final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
Expand Down Expand Up @@ -818,7 +821,7 @@ public AsyncKuduScanner build() {
client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
scanRequestTimeout, predicates, limit, cacheBlocks,
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
htTimestamp, batchSizeBytes, PartitionPruner.create(this));
htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ long getSequenceId() {
return sequenceId;
}

ReplicaSelection getReplicaSelection() {
return ReplicaSelection.LEADER_ONLY;
}

void setSequenceId(long sequenceId) {
assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
this.sequenceId = sequenceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public KuduScanner build() {
client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
scanRequestTimeout, predicates, limit, cacheBlocks,
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
htTimestamp, batchSizeBytes, PartitionPruner.create(this)));
htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,23 @@ String getClosestUUID() {
}
}

/**
* Helper function to centralize the calling of methods based on the passed replica selection
* mechanism.
* @param replicaSelection replica selection mechanism to use
* @return a UUID for the server that matches the selection, can be null
*/
String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
switch (replicaSelection) {
case LEADER_ONLY:
return getLeaderUUID();
case CLOSEST_REPLICA:
return getClosestUUID();
default:
throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
}
}

/**
* Gets the replicas of this tablet. The returned list may not be mutated.
* @return the replicas of the tablet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;

import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;

/**
* Policy with which to choose amongst multiple replicas.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum ReplicaSelection {
/**
* Select the LEADER replica.
*/
LEADER_ONLY,
/**
* Select the closest replica to the client, or a random one if all replicas are equidistant.
*/
CLOSEST_REPLICA
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ public void testNoLocalReplica() {
assertNotNull(tablet.getClosestUUID());
}

@Test
public void testReplicaSelection() {
RemoteTablet tablet = getTablet(0, 1);

assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY));
assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA));
}

private RemoteTablet getTablet(int leaderIndex) {
return getTablet(leaderIndex, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ public void testProjections() throws Exception {
buildScannerAndCheckColumnsCount(builder, 2);
}

@Test(timeout = 100000)
public void testReplicaSelections() throws Exception {
AsyncKuduScanner scanner = client.newScannerBuilder(table)
.replicaSelection(ReplicaSelection.LEADER_ONLY)
.build();

assertEquals(9, countRowsInScan(scanner));

scanner = client.newScannerBuilder(table)
.replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
.build();

assertEquals(9, countRowsInScan(scanner));
}

private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
String lowerBoundKeyTwo,
String exclusiveUpperBoundKeyOne,
Expand Down

0 comments on commit a7fd4fc

Please sign in to comment.