Skip to content

Commit

Permalink
[java client] Refactor all server info into a single class, add locality
Browse files Browse the repository at this point in the history
Having this helps propagate information around the client. RemoteTablet can now
answer requests for local servers.

This patch also tries to simplify how RemoteTablets are created, as it's getting
a bit messy and redundant with the passing of ServerInfos.

Change-Id: I33984a437d8c8d07d5db4d16f8da723b3e904189
Reviewed-on: http://gerrit.cloudera.org:8080/4836
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <[email protected]>
  • Loading branch information
jdcryans committed Oct 27, 2016
1 parent 96c2e2a commit a326fc9
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClie
* a RPC, so we need to demote it and retry.
*/
<R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
rpc.getTablet().demoteLeader(server.getUuid());
rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
handleRetryableError(rpc, ex);
}

Expand Down Expand Up @@ -1182,9 +1182,9 @@ public void run(final Timeout timeout) {
* the tablet itself from the caches.
*/
private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
tablet.getTabletId());
tablet.removeTabletClient(server.getUuid());
String uuid = server.getServerInfo().getUuid();
LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId());
tablet.removeTabletClient(uuid);
}

/** Callback executed when a master lookup completes. */
Expand Down Expand Up @@ -1275,12 +1275,14 @@ void discoverTablets(KuduTable table,
List<RemoteTablet> tablets = new ArrayList<>(locations.size());
for (Master.TabletLocationsPB tabletPb : locations) {

String tabletId = tabletPb.getTabletId().toStringUtf8();

List<UnknownHostException> lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount());
List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount());
for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
try {
connectionCache.connectTS(replica.getTsInfo());
ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo());
if (serverInfo != null) {
servers.add(serverInfo);
}
} catch (UnknownHostException ex) {
lookupExceptions.add(ex);
}
Expand All @@ -1293,8 +1295,7 @@ void discoverTablets(KuduTable table,
throw new NonRecoverableException(statusIOE);
}

Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
RemoteTablet rt = new RemoteTablet(tableId, tabletId, partition, tabletPb);
RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers);

LOG.info("Learned about tablet {} for table '{}' with partition {}",
rt.getTabletId(), tableName, rt.getPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ class ConnectionCache {
/**
* Create a connection to a tablet server based on information provided by the master.
* @param tsInfoPB master-provided information for the tablet server
* @return an object that contains all the server's information
* @throws UnknownHostException if we cannot resolve the tablet server's IP address
*/
void connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
if (addresses.isEmpty()) {
LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
return;
return null;
}

// from meta_cache.cc
Expand All @@ -110,29 +111,30 @@ void connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
throw new UnknownHostException(
"Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
}
newClient(uuid, inetAddress, addresses.get(0).getPort());
return newClient(uuid, inetAddress, addresses.get(0).getPort()).getServerInfo();
}

TabletClient newClient(String uuid, InetAddress inetAddress, int port) {
String host = inetAddress.getHostAddress();
boolean isLocal = NetUtil.isLocalAddress(inetAddress);
return newClient(uuid, host, port, isLocal);
ServerInfo serverInfo = new ServerInfo(uuid, host, port, isLocal);
return newClient(serverInfo);
}

TabletClient newClient(String uuid, String host, int port, boolean isLocal) {
TabletClient newClient(ServerInfo serverInfo) {
TabletClient client;
SocketChannel chan;

writeLock.lock();
try {
client = uuid2client.get(uuid);
client = uuid2client.get(serverInfo.getUuid());
if (client != null && client.isAlive()) {
return client;
}
final TabletClientPipeline pipeline = new TabletClientPipeline();
client = pipeline.init(uuid, host, port, isLocal);
client = pipeline.init(serverInfo);
chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
uuid2client.put(uuid, client);
uuid2client.put(serverInfo.getUuid(), client);
} finally {
writeLock.unlock();
}
Expand All @@ -144,7 +146,8 @@ TabletClient newClient(String uuid, String host, int port, boolean isLocal) {
// Java since the JRE doesn't expose any way to call setsockopt() with
// TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
config.setKeepAlive(true);
chan.connect(new InetSocketAddress(host, port)); // Won't block.
chan.connect(
new InetSocketAddress(serverInfo.getHostname(), serverInfo.getPort())); // Won't block.
return client;
}

Expand Down Expand Up @@ -178,7 +181,7 @@ TabletClient getLiveClient(String uuid) {
} else if (client.isAlive()) {
return client;
} else {
return newClient(uuid, client.getHost(), client.getPort(), client.isLocal());
return newClient(client.getServerInfo());
}
}

Expand Down Expand Up @@ -235,9 +238,9 @@ boolean allConnectionsAreDead() {

private final class TabletClientPipeline extends DefaultChannelPipeline {

TabletClient init(String uuid, String host, int port, boolean isLocal) {
TabletClient init(ServerInfo serverInfo) {
AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
final TabletClient client = new TabletClient(kuduClient, uuid, host, port, isLocal);
final TabletClient client = new TabletClient(kuduClient, serverInfo);
if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
super.addLast("timeout-handler",
new ReadTimeoutHandler(kuduClient.getTimer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class encapsulates the information regarding a tablet and its locations.
* <p>
* RemoteTablet's main function, once it is init()'d, is to keep track of where the leader for this
* RemoteTablet's main function is to keep track of where the leader for this
* tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
* it's not the leader anymore, and then call {@link #demoteLeader(String)}.
* <p>
Expand All @@ -53,23 +52,30 @@ class RemoteTablet implements Comparable<RemoteTablet> {
private final String tableId;
private final String tabletId;
@GuardedBy("tabletServers")
private final Set<String> tabletServers = new HashSet<>();
private final Map<String, ServerInfo> tabletServers;
private final AtomicReference<List<LocatedTablet.Replica>> replicas =
new AtomicReference(ImmutableList.of());
private final Partition partition;

@GuardedBy("tabletServers")
private String leaderUuid;

RemoteTablet(String tableId, String tabletId,
Partition partition, Master.TabletLocationsPB tabletLocations) {
this.tabletId = tabletId;
RemoteTablet(String tableId,
Master.TabletLocationsPB tabletLocations,
List<ServerInfo> serverInfos) {
this.tabletId = tabletLocations.getTabletId().toStringUtf8();
this.tableId = tableId;
this.partition = partition;
this.partition = ProtobufHelper.pbToPartition(tabletLocations.getPartition());
this.tabletServers = new HashMap<>(serverInfos.size());

for (ServerInfo serverInfo : serverInfos) {
this.tabletServers.put(serverInfo.getUuid(), serverInfo);
}

ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8();
tabletServers.add(uuid);
replicasBuilder.add(new LocatedTablet.Replica(replica));
if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
leaderUuid = uuid;
}
Expand All @@ -78,11 +84,6 @@ class RemoteTablet implements Comparable<RemoteTablet> {
if (leaderUuid == null) {
LOG.warn("No leader provided for tablet {}", getTabletId());
}

ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
replicasBuilder.add(new LocatedTablet.Replica(replica));
}
replicas.set(replicasBuilder.build());
}

Expand All @@ -101,7 +102,7 @@ boolean removeTabletClient(String uuid) {
if (leaderUuid != null && leaderUuid.equals(uuid)) {
leaderUuid = null;
}
if (tabletServers.remove(uuid)) {
if (tabletServers.remove(uuid) != null) {
return true;
}
LOG.debug("tablet {} already removed ts {}, size left is {}",
Expand Down Expand Up @@ -135,7 +136,7 @@ void demoteLeader(String uuid) {
}

/**
* Get the UUID of the tablet server that we think holds the leader replica for this tablet.
* Gets the UUID of the tablet server that we think holds the leader replica for this tablet.
* @return a UUID of a tablet server that we think has the leader, else null
*/
String getLeaderUUID() {
Expand All @@ -144,6 +145,25 @@ String getLeaderUUID() {
}
}

/**
* Gets the UUID of the closest server. If none is closer than the others, returns a random
* server UUID.
* @return the UUID of the closest server, which might be any if none is closer, or null if this
* cache doesn't know of any servers
*/
String getClosestUUID() {
synchronized (tabletServers) {
String lastUuid = null;
for (ServerInfo serverInfo : tabletServers.values()) {
lastUuid = serverInfo.getUuid();
if (serverInfo.isLocal()) {
return serverInfo.getUuid();
}
}
return lastUuid;
}
}

/**
* Gets the replicas of this tablet. The returned list may not be mutated.
* @return the replicas of the tablet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;

import org.apache.kudu.annotations.InterfaceAudience;

/**
* Container class for server information that never changes, like UUID and hostname.
*/
@InterfaceAudience.Private
public class ServerInfo {
private final String uuid;
private final String hostname;
private final int port;
private final boolean local;

/**
* Constructor for all the fields. The intent is that there should only be one ServerInfo
* instance per UUID the client is connected to.
* @param uuid server's UUID
* @param hostname server's hostname, only one of them
* @param port server's port
* @param local if the server is hosted on the same machine where this client is running
*/
public ServerInfo(String uuid, String hostname, int port, boolean local) {
this.uuid = uuid;
this.hostname = hostname;
this.port = port;
this.local = local;
}

/**
* Returns this server's uuid.
* @return a string that contains this server's uuid
*/
public String getUuid() {
return uuid;
}

/**
* Returns this server's hostname. We might get many hostnames from the master for a single
* TS, and this is the one we picked to connect to originally.
* @return a string that contains this server's hostname
*/
public String getHostname() {
return hostname;
}

/**
* Returns this server's port.
* @return a port number that this server is bound to
*/
public int getPort() {
return port;
}

/**
* Returns if this server is on this client's host.
* @return true if the server is local, else false
*/
public boolean isLocal() {
return local;
}
}
Loading

0 comments on commit a326fc9

Please sign in to comment.