Skip to content

Commit

Permalink
Add client-only routing
Browse files Browse the repository at this point in the history
relates elastic#375
  • Loading branch information
costin committed Feb 8, 2015
1 parent e7067e6 commit 07c3474
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 83 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ allprojects { project ->
}
}
}


jar {
manifest {
Expand Down Expand Up @@ -500,6 +499,9 @@ project(":elasticsearch-storm") {
itestCompile "com.google.guava:guava:16.0.1"
itestRuntime "com.twitter:carbonite:1.4.0"
}

// add itest to Eclipse
eclipse.classpath.plusConfigurations += configurations.itestCompile
}

configure(rootProject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ static Settings addDefaultsToSettings(Properties flowProperties, Properties tapP
Settings settings = HadoopSettingsManager.loadFrom(CascadingUtils.extractOriginalProperties(flowProperties)).merge(tapProperties);

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public interface ConfigurationOptions {

String ES_QUERY = "es.query";

/** Clients only */
String ES_NODES_CLIENT_ONLY = "es.nodes.client.only";
String ES_NODES_CLIENT_ONLY_DEFAULT = "false";

/** Elasticsearch batch size given in bytes */
String ES_BATCH_SIZE_BYTES = "es.batch.size.bytes";
String ES_BATCH_SIZE_BYTES_DEFAULT = "1mb";
Expand Down Expand Up @@ -93,7 +97,7 @@ public interface ConfigurationOptions {
/** Scroll fields */
String ES_SCROLL_FIELDS = "es.scroll.fields";

String ES_SCROLL_ESCAPE_QUERY_URI = "es.scroll.escape.query.uri";
String ES_SCROLL_ESCAPE_QUERY_URI = "es.scroll.escape.query.uri";
String ES_SCROLL_ESCAPE_QUERY_URI_DEFAULT = "true";

String ES_HEART_BEAT_LEAD = "es.action.heart.beat.lead";
Expand Down Expand Up @@ -149,20 +153,20 @@ public interface ConfigurationOptions {
String ES_MAPPING_DEFAULT_INDEX_FORMATTER_CLASS = DateIndexFormatter.class.getName();
String ES_MAPPING_PARAMS_EXTRACTOR_CLASS = "es.mapping.params.extractor.class";
String ES_MAPPING_PARAMS_DEFAULT_EXTRACTOR_CLASS = DefaultParamsExtractor.class.getName();
String ES_MAPPING_CONSTANT_AUTO_QUOTE = "es.mapping.constant.auto.quote";
String ES_MAPPING_CONSTANT_AUTO_QUOTE_DEFAULT = "true";
String ES_MAPPING_CONSTANT_AUTO_QUOTE = "es.mapping.constant.auto.quote";
String ES_MAPPING_CONSTANT_AUTO_QUOTE_DEFAULT = "true";

String ES_MAPPING_VERSION_TYPE = "es.mapping.version.type";
String ES_MAPPING_VERSION_TYPE_INTERNAL = "internal";
String ES_MAPPING_VERSION_TYPE_EXTERNAL = "external";
String ES_MAPPING_VERSION_TYPE_EXTERNAL_GT = "external_gt";
String ES_MAPPING_VERSION_TYPE_EXTERNAL_GTE = "external_gte";
String ES_MAPPING_VERSION_TYPE_FORCE = "force";
String ES_MAPPING_VERSION_TYPE = "es.mapping.version.type";
String ES_MAPPING_VERSION_TYPE_INTERNAL = "internal";
String ES_MAPPING_VERSION_TYPE_EXTERNAL = "external";
String ES_MAPPING_VERSION_TYPE_EXTERNAL_GT = "external_gt";
String ES_MAPPING_VERSION_TYPE_EXTERNAL_GTE = "external_gte";
String ES_MAPPING_VERSION_TYPE_FORCE = "force";

String ES_MAPPING_INCLUDE = "es.mapping.include";
String ES_MAPPING_INCLUDE_DEFAULT = "";
String ES_MAPPING_EXCLUDE = "es.mapping.exclude";
String ES_MAPPING_EXCLUDE_DEFAULT = "";
String ES_MAPPING_INCLUDE = "es.mapping.include";
String ES_MAPPING_INCLUDE_DEFAULT = "";
String ES_MAPPING_EXCLUDE = "es.mapping.exclude";
String ES_MAPPING_EXCLUDE_DEFAULT = "";


/** Read settings */
Expand Down Expand Up @@ -190,28 +194,28 @@ public interface ConfigurationOptions {
String ES_UPDATE_SCRIPT_PARAMS = "es.update.script.params";
String ES_UPDATE_SCRIPT_PARAMS_JSON = "es.update.script.params.json";

/** Output options **/
String ES_OUTPUT_JSON = "es.output.json";
String ES_OUTPUT_JSON_DEFAULT = "no";
/** Output options **/
String ES_OUTPUT_JSON = "es.output.json";
String ES_OUTPUT_JSON_DEFAULT = "no";

/** Network options */
// SSL
String ES_NET_USE_SSL = "es.net.ssl";
String ES_NET_USE_SSL_DEFAULT = "false";

String ES_NET_SSL_PROTOCOL = "es.net.ssl.protocol";
String ES_NET_SSL_PROTOCOL_DEFAULT = "TLS"; // SSL as an alternative
String ES_NET_SSL_PROTOCOL = "es.net.ssl.protocol";
String ES_NET_SSL_PROTOCOL_DEFAULT = "TLS"; // SSL as an alternative

String ES_NET_SSL_KEYSTORE_LOCATION = "es.net.ssl.keystore.location";
String ES_NET_SSL_KEYSTORE_TYPE = "es.net.ssl.keystore.type";
String ES_NET_SSL_KEYSTORE_TYPE_DEFAULT = "JKS"; // PKCS12 could also be used
String ES_NET_SSL_KEYSTORE_PASS = "es.net.ssl.keystore.pass";
String ES_NET_SSL_KEYSTORE_LOCATION = "es.net.ssl.keystore.location";
String ES_NET_SSL_KEYSTORE_TYPE = "es.net.ssl.keystore.type";
String ES_NET_SSL_KEYSTORE_TYPE_DEFAULT = "JKS"; // PKCS12 could also be used
String ES_NET_SSL_KEYSTORE_PASS = "es.net.ssl.keystore.pass";

String ES_NET_SSL_TRUST_STORE_LOCATION = "es.net.ssl.truststore.location";
String ES_NET_SSL_TRUST_STORE_PASS = "es.net.ssl.truststore.pass";
String ES_NET_SSL_TRUST_STORE_LOCATION = "es.net.ssl.truststore.location";
String ES_NET_SSL_TRUST_STORE_PASS = "es.net.ssl.truststore.pass";

String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED = "es.net.ssl.cert.allow.self.signed";
String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED_DEFAULT = "false";
String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED = "es.net.ssl.cert.allow.self.signed";
String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED_DEFAULT = "false";

String ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user";
String ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,4 @@ public interface InternalConfigurationOptions extends ConfigurationOptions {
String INTERNAL_ES_PINNED_NODE = "es.internal.pinned.node";

String INTERNAL_ES_VERSION = "es.internal.es.version";
// selector used when multiple indices are specified
String INTERNAL_ES_SELECTED_INDEX = "es.internal.selected.index";
}
26 changes: 15 additions & 11 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public boolean getNodesDiscovery() {
return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY, ES_NODES_DISCOVERY_DEFAULT));
}

public boolean getNodesClientOnly() {
return Booleans.parseBoolean(getProperty(ES_NODES_CLIENT_ONLY, ES_NODES_CLIENT_ONLY_DEFAULT));
}

public long getHttpTimeout() {
return TimeValue.parseTimeValue(getProperty(ES_HTTP_TIMEOUT, ES_HTTP_TIMEOUT_DEFAULT)).getMillis();
}
Expand Down Expand Up @@ -154,12 +158,12 @@ public String getMappingVersion() {
return getProperty(ES_MAPPING_VERSION);
}

public boolean hasMappingVersionType() {
String versionType = getMappingVersionType();
return (StringUtils.hasText(getMappingVersion()) && StringUtils.hasText(versionType) && !versionType.equals(ES_MAPPING_VERSION_TYPE_INTERNAL));
}
public boolean hasMappingVersionType() {
String versionType = getMappingVersionType();
return (StringUtils.hasText(getMappingVersion()) && StringUtils.hasText(versionType) && !versionType.equals(ES_MAPPING_VERSION_TYPE_INTERNAL));
}

public String getMappingVersionType() {
public String getMappingVersionType() {
return getProperty(ES_MAPPING_VERSION_TYPE, ES_MAPPING_VERSION_TYPE_EXTERNAL);
}

Expand Down Expand Up @@ -219,13 +223,13 @@ public boolean getMappingConstantAutoQuote() {
return Booleans.parseBoolean(getProperty(ES_MAPPING_CONSTANT_AUTO_QUOTE, ES_MAPPING_CONSTANT_AUTO_QUOTE_DEFAULT));
}

public String getMappingIncludes() {
return getProperty(ES_MAPPING_INCLUDE, ES_MAPPING_INCLUDE_DEFAULT);
}
public String getMappingIncludes() {
return getProperty(ES_MAPPING_INCLUDE, ES_MAPPING_INCLUDE_DEFAULT);
}

public String getMappingExcludes() {
return getProperty(ES_MAPPING_EXCLUDE, ES_MAPPING_EXCLUDE_DEFAULT);
}
public String getMappingExcludes() {
return getProperty(ES_MAPPING_EXCLUDE, ES_MAPPING_EXCLUDE_DEFAULT);
}

public int getUpdateRetryOnConflict() {
return Integer.parseInt(getProperty(ES_UPDATE_RETRY_ON_CONFLICT, ES_UPDATE_RETRY_ON_CONFLICT_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,49 @@ public static boolean discoverNodesIfNeeded(Settings settings, Log log) {
if (settings.getNodesDiscovery()) {
RestClient bootstrap = new RestClient(settings);

List<String> discoveredNodes = bootstrap.discoverNodes();
if (log.isDebugEnabled()) {
log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
}

SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
bootstrap.close();
try {
List<String> discoveredNodes = bootstrap.discoverNodes();
if (log.isDebugEnabled()) {
log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
}

SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
} finally {
bootstrap.close();
}
return true;
}

return false;
}

public static void filterNonClientNodesIfNeeded(Settings settings, Log log) {
if (!settings.getNodesClientOnly()) {
return;
}

RestClient bootstrap = new RestClient(settings);
try {
List<String> clientNodes = bootstrap.getClientNodes();
if (clientNodes.isEmpty()) {
throw new EsHadoopIllegalArgumentException("Client-only routing specified but not client nodes were found in the cluster...");
}
if (log.isDebugEnabled()) {
log.debug(String.format("Found client nodes %s", clientNodes));
}

List<String> ddNodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
// remove non-client nodes
ddNodes.retainAll(clientNodes);
if (log.isDebugEnabled()) {
log.debug(String.format("Filtered discovered only nodes %s to client-only %s", SettingsUtils.discoveredOrDeclaredNodes(settings), ddNodes));
}
SettingsUtils.setDiscoveredNodes(settings, ddNodes);
} finally {
bootstrap.close();
}
}

public static String discoverEsVersion(Settings settings, Log log) {
String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
if (StringUtils.hasText(version)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ public Stats stats() {
Stats transportStats() {
return currentTransport.stats();
}

public String currentNode() {
return currentNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public String toString() {
return "QueryBuilder [" + assemble() + "]";
}

public QueryBuilder onlyNode(boolean onlyNode) {
public QueryBuilder restrictToNode(boolean onlyNode) {
this.onlyNode = onlyNode;
return this;
}
Expand Down
17 changes: 17 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ public Map<String, Node> getNodes() {
return nodes;
}

public List<String> getClientNodes() {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
List<String> nodes = new ArrayList<String>();

for (Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
Node node = new Node(entry.getKey(), entry.getValue());
if (node.isClient() && node.hasHttp()) {
nodes.add(node.getInet());
}
}
return nodes;
}

@SuppressWarnings("unchecked")
public Map<String, Object> getMapping(String query) {
return (Map<String, Object>) get(query, null);
Expand Down Expand Up @@ -398,4 +411,8 @@ private void countStreamStats(InputStream content) {
stats.aggregate(((StatsAware) content).stats());
}
}

public String getCurrentNode() {
return network.currentNode();
}
}
38 changes: 35 additions & 3 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
Map<Shard, Node> targetShards = null;

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

String savedSettings = settings.save();
Expand Down Expand Up @@ -284,7 +285,16 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {

if (!SettingsUtils.hasPinnedNode(settings)) {
SettingsUtils.pinNode(settings, partition.nodeIp, partition.nodePort);
// pin node only if client-routing is disabled; otherwise simply go through them...
if (!settings.getNodesClientOnly()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]",
partition, partition.nodeId, partition.nodePort));
}

SettingsUtils.pinNode(settings, partition.nodeIp, partition.nodePort);
}

}

ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
Expand All @@ -303,8 +313,15 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
// initialize REST client
RestRepository client = new RestRepository(settings);

QueryBuilder queryBuilder = QueryBuilder.query(settings).shard(partition.shardId).node(partition.nodeId).onlyNode(partition.onlyNode);
if (settings.getNodesClientOnly()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition reader instance [%s] assigned to [%s]",
partition, client.getRestClient().getCurrentNode()));
}
}

// take into account client node routing
QueryBuilder queryBuilder = QueryBuilder.query(settings).shard(partition.shardId).node(partition.nodeId).restrictToNode(partition.onlyNode && !settings.getNodesClientOnly());
queryBuilder.fields(settings.getScrollFields());

return new PartitionReader(scrollReader, client, queryBuilder);
Expand Down Expand Up @@ -354,6 +371,7 @@ public static MultiReaderIterator multiReader(Settings settings, List<PartitionD
public static PartitionWriter createWriter(Settings settings, int currentSplit, int totalSplits, Log log) {

InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.discoverEsVersion(settings, log);

List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
Expand Down Expand Up @@ -388,12 +406,26 @@ private static RestRepository initSingleIndex(Settings settings, int currentInst
}
}

Map<Shard, Node> targetShards = repository.getWriteTargetPrimaryShards();
// if client-nodes are used, simply use the underlying
if (settings.getNodesClientOnly()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]",
currentInstance, repository.getRestClient().getCurrentNode()));
}

return repository;
}

// no routing necessary; select the relevant target shard/node
Map<Shard, Node> targetShards = Collections.emptyMap();

targetShards = repository.getWriteTargetPrimaryShards();
repository.close();

Assert.isTrue(!targetShards.isEmpty(),
String.format("Cannot determine write shards for [%s]; likely its format is incorrect (maybe it contains illegal characters?)", resource));


List<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());
// make sure the order is strict
Collections.sort(orderedShards);
Expand Down
Loading

0 comments on commit 07c3474

Please sign in to comment.