Skip to content

Commit

Permalink
Make ZkBookieRackAffinityMapping work as expected (apache#6917)
Browse files Browse the repository at this point in the history
### Motivation

The current bookie rack affinity logic is problematic. The rack map is expecting a `host:port` pair but the bookkeeper DNS resolver is expecting a hostname or a host address. 

### Modification

- Introduce a new HashMap to keep a mapping between hostname and bookie info. It maintains the mapping for bookkeeper DNS resolver to lookup the network location.
  • Loading branch information
sijie authored May 19, 2020
1 parent 33df120 commit 5d0e3b3
Showing 1 changed file with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private static final ObjectMapper jsonMapper = ObjectMapperFactory.create();

private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

@Override
public void setConf(Configuration conf) {
Expand All @@ -77,17 +80,32 @@ private void updateRacksWithHost(BookiesRackConfiguration racks) {
// for just the IP/hostname when trying to get the rack for a bookie.
// To work around this issue, we insert in the map the bookie ip/hostname with same rack-info
BookiesRackConfiguration newRacksWithHost = new BookiesRackConfiguration();
Map<String, BookieInfo> newBookieInfoMap = new HashMap<>();
racks.forEach((group, bookies) ->
bookies.forEach((addr, bi) -> {
try {
BookieSocketAddress bsa = new BookieSocketAddress(addr);
newRacksWithHost.updateBookie(group, bsa.getHostName(), bi);
newRacksWithHost.updateBookie(group, bsa.toString(), bi);

String hostname = bsa.getSocketAddress().getHostName();
newBookieInfoMap.put(hostname, bi);

InetAddress address = bsa.getSocketAddress().getAddress();
if (null != address) {
String hostIp = address.getHostAddress();
if (null != hostIp) {
newBookieInfoMap.put(hostIp, bi);
}
} else {
LOG.info("Network address for {} is unresolvable yet.", addr);
}
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
})
);
racksWithHost = newRacksWithHost;
bookieInfoMap = newBookieInfoMap;
}

private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configuration conf) {
Expand Down Expand Up @@ -162,10 +180,19 @@ private String getRack(String bookieAddress) {
throw new RuntimeException(e);
}

BookieInfo bi = bookieInfoMap.get(bookieAddress);
if (bi == null) {
Optional<BookieInfo> biOpt = racksWithHost.getBookie(bookieAddress);
if (biOpt.isPresent()) {
bi = biOpt.get();
} else {
updateRacksWithHost(racksWithHost);
bi = bookieInfoMap.get(bookieAddress);
}
}

Optional<BookieInfo> bi = racksWithHost.getBookie(bookieAddress);
if (bi.isPresent()) {
String rack = bi.get().getRack();
if (bi != null) {
String rack = bi.getRack();
if (!rack.startsWith("/")) {
rack = "/" + rack;
}
Expand Down

0 comments on commit 5d0e3b3

Please sign in to comment.