Skip to content

Commit

Permalink
Add integration with Apache Storm
Browse files Browse the repository at this point in the history
Provide dedicated Bolt and Spout implementations
Related elastic#267
  • Loading branch information
costin committed Sep 10, 2014
1 parent fe82470 commit 92e98c6
Showing 32 changed files with 1,874 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Elasticsearch Hadoop [![Build Status](https://travis-ci.org/elasticsearch/elasticsearch-hadoop.png)](https://travis-ci.org/elasticsearch/elasticsearch-hadoop)
# Elasticsearch Hadoop [![Build Status](https://travis-ci.org/elasticsearch/elasticsearch-hadoop.png)](https://travis-ci.org/elasticsearch/elasticsearch-hadoop) [![Build Status](http://build-us-1.elasticsearch.org/buildStatus/icon?job=es-hadoop-quick)](http://build-us-1.elasticsearch.org/view/Hadoop/job/es-hadoop-quick/)
Elasticsearch real-time search and analytics natively integrated with Hadoop.
Supports [Map/Reduce](#mapreduce), [Cascading](#cascading), [Apache Hive](#apache-hive), [Apache Pig](#apache-pig) and [Apache Spark](#apache-spark).

25 changes: 25 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ allprojects {
mavenCentral()
// cascading
maven { url "http://conjars.org/repo" }
// storm dependencies
maven { url "http://clojars.org/repo" }
maven { url 'http://repo.spring.io/plugins-release' }

// Hive depends on JDO ec2 missing from Maven Central
@@ -187,6 +189,17 @@ allprojects { project ->
itestRuntime configurations.testRuntime
}

// adding the M/R project creates duplicates in the Eclipse CP so here we filter them out
// the lib entries with sources seem to be placed first so they 'win' over those w/o sources
eclipse.classpath.file {
whenMerged { cp ->
cp.entries.unique { a, b ->
return a.path.compareTo(b.path)
}
}
}


jar {
manifest.attributes["Created-By"] = "${System.getProperty("java.version")} (${System.getProperty("java.specification.vendor")})"
manifest.attributes['Implementation-Title'] = project.name
@@ -443,6 +456,18 @@ project(":elasticsearch-spark") {
}
}

project(":elasticsearch-storm") {
description = "Elasticsearch Storm"
nestMRProject(project)

dependencies {
provided("org.apache.storm:storm-core:$stormVersion")

//testRuntime "com.esotericsoftware.kryo:kryo:2.21"
itestRuntime "com.twitter:carbonite:1.4.0"
}
}

configure(rootProject) {
def eshModules = subprojects - project(":elasticsearch-repository-hdfs")

1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ sparkVersion = 1.0.2
# same as Spark's
scalaVersion = 2.10.4
scalaMajorVersion = 2.10
stormVersion = 0.9.2-incubating

# Testing
junitVersion = 4.11
Original file line number Diff line number Diff line change
@@ -55,6 +55,10 @@ public interface ConfigurationOptions {
String ES_BATCH_SIZE_ENTRIES = "es.batch.size.entries";
String ES_BATCH_SIZE_ENTRIES_DEFAULT = "1000";

/** Elasticsearch batch size given in entries */
String ES_BATCH_FLUSH_MANUAL = "es.batch.flush.manual";
String ES_BATCH_FLUSH_MANUAL_DEFAULT = "false";

/** Whether to trigger an index refresh after doing batch writing */
String ES_BATCH_WRITE_REFRESH = "es.batch.write.refresh";
String ES_BATCH_WRITE_REFRESH_DEFAULT = "true";
9 changes: 8 additions & 1 deletion mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
@@ -32,10 +32,13 @@
import org.elasticsearch.hadoop.util.unit.ByteSizeValue;
import org.elasticsearch.hadoop.util.unit.TimeValue;

import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.*;
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.*;

/**
* Holder class containing the various configuration bits used by ElasticSearch Hadoop. Handles internally the fall back to defaults when looking for undefined, optional settings.
*/
public abstract class Settings implements InternalConfigurationOptions {
public abstract class Settings {

public String getNodes() {
String host = getProperty(ES_HOST);
@@ -86,6 +89,10 @@ public boolean getBatchRefreshAfterWrite() {
return Booleans.parseBoolean(getProperty(ES_BATCH_WRITE_REFRESH, ES_BATCH_WRITE_REFRESH_DEFAULT));
}

public boolean getBatchFlushManual() {
return Booleans.parseBoolean(getProperty(ES_BATCH_FLUSH_MANUAL, ES_BATCH_FLUSH_MANUAL_DEFAULT));
}

public long getScrollKeepAlive() {
return TimeValue.parseTimeValue(getProperty(ES_SCROLL_KEEPALIVE, ES_SCROLL_KEEPALIVE_DEFAULT)).getMillis();
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -136,7 +137,7 @@ private <T> T parseContent(InputStream content, String string) {
return (T) (string != null ? map.get(string) : map);
}

public void bulk(Resource resource, TrackingBytesArray data) {
public BitSet bulk(Resource resource, TrackingBytesArray data) {
Retry retry = retryPolicy.init();
int httpStatus = 0;

@@ -164,6 +165,8 @@ public void bulk(Resource resource, TrackingBytesArray data) {

httpStatus = (retryFailedEntries(response.body(), data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK);
} while (data.length() > 0 && retry.retry(httpStatus));

return data.leftoversPosition();
}

@SuppressWarnings("rawtypes")
47 changes: 38 additions & 9 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java
Original file line number Diff line number Diff line change
@@ -21,13 +21,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
@@ -61,6 +63,7 @@ public class RestRepository implements Closeable, StatsAware {
private boolean executedBulkWrite = false;
private BytesRef trivialBytesRef;
private boolean writeInitialized = false;
private boolean autoFlush = true;

// indicates whether there were writes errorrs or not
// flag indicating whether to flush the batch at close-time or not
@@ -95,6 +98,7 @@ private void lazyInitWriting() {
if (!writeInitialized) {
writeInitialized = true;

autoFlush = !settings.getBatchFlushManual();
ba.bytes(new byte[settings.getBatchSizeInBytes()], 0);
trivialBytesRef = new BytesRef();
bufferEntriesThreshold = settings.getBatchSizeInEntries();
@@ -123,7 +127,7 @@ ScrollQuery scan(String query, BytesArray body, ScrollReader reader) {
*
* @param object object to add to the index
*/
public void writeToIndex(Object object) throws IOException {
public void writeToIndex(Object object) {
Assert.notNull(object, "no object data given");

lazyInitWriting();
@@ -149,33 +153,58 @@ public void writeProcessedToIndex(BytesArray ba) {
private void doWriteToIndex(BytesRef payload) {
// check space first
if (payload.length() > ba.available()) {
sendBatch();
if (autoFlush) {
flush();
}
else {
throw new EsHadoopIllegalStateException(
String.format("Auto flush disabled and bulk buffer full; disable manual flush or increase capacity [current size %s]; bailing out", ba.capacity()));
}
}

data.copyFrom(payload);
payload.reset();

dataEntries++;
if (bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {
sendBatch();
if (autoFlush && bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {
flush();
}
}

private void sendBatch() {
public BitSet tryFlush() {
if (log.isDebugEnabled()) {
log.debug(String.format("Sending batch of [%d] bytes/[%s] entries", data.length(), dataEntries));
}

BitSet bulk;

try {
client.bulk(resourceW, data);
bulk = client.bulk(resourceW, data);
} catch (EsHadoopException ex) {
hadWriteErrors = true;
throw ex;
}

executedBulkWrite = true;

// data still in the pipeline, don't clean it
if (!bulk.isEmpty()) {
discard();
}

return bulk;
}

public void discard() {
data.reset();
dataEntries = 0;
executedBulkWrite = true;
}

public void flush() {
BitSet bulk = tryFlush();
if (!bulk.isEmpty()) {
throw new EsHadoopException(String.format("Could not write all entries [%s/%s] (maybe ES was overloaded?). Bailing out...", bulk.cardinality(), bulk.size()));
}
}

@Override
@@ -186,7 +215,7 @@ public void close() {

if (data.length() > 0) {
if (!hadWriteErrors) {
sendBatch();
flush();
}
else {
if (log.isDebugEnabled()) {
@@ -258,7 +287,7 @@ public Map<Shard, Node> getWriteTargetPrimaryShards() {
return shards;
}

public Field getMapping() throws IOException {
public Field getMapping() {
return Field.parseField((Map<String, Object>) client.getMapping(resourceR.mapping()));
}

Loading
Oops, something went wrong.

0 comments on commit 92e98c6

Please sign in to comment.