Skip to content

🐘 Elasticsearch real-time search and analytics natively integrated with Hadoop

License

Notifications You must be signed in to change notification settings

allenai/elasticsearch-hadoop

 
 

Repository files navigation

S2 customizations

Added ConfigurationOptions.ES_BATCH_WRITE_STATUS_IGNORE = "es.batch.write.status.ignore"

Set to a comma-separated list of status codes for bulk writes to ignore, e.g. "404".

This was added so that elasticsearch-spark jobs could EsSpark.saveToEs updates (including partial updates) without aborting the entire job if a target document no longer existed.

Note 5.5.0 is fairly old. So these changes are probably not super useful except to S2's unique (and dated) dependency needs.

To see the changes between the original 5.5.0 release and this fork https://github.com/elastic/elasticsearch-hadoop/compare/v5.5.0...allenai:v5.5.0-ai2

Quick dev instructions

This project may import into idea without trouble through gradle integration.

To run unit tests.

./gradlew test

There are integration tests, but they have unknown environment requirements (meant to be run by their build server) and will not run locally. On a modern version of this library this may be worth pursuing, but not this version. Do not rely on them and instead rely on integration tests in the applications dependent on this fork.

To build the elasticsearch-spark dependency for Scala 2.11 and install it to your local repo.

./gradlew -P scala=211 'elasticsearch-spark-20:install'

Only scala 2.10 and 2.11 are supported. To build for other Scala major versions, you will have to modify the gradle build config. It is completely unknown if there will be other incompatibilities.

Once you have the files generated locally, you will have to manually publish them to whatever dependency repository we are currently using. At the time of this writing that was in flux, but it was this bintray repo. Whatever our repository, the directory structure should mirror your local maven repo.

The files are installed locally in a path like this.

$ tree ~/.m2/repository/org/elasticsearch/elasticsearch-spark-20_2.11/5.5.0-ai2-SNAPSHOT/
/Users/jasond/.m2/repository/org/elasticsearch/elasticsearch-spark-20_2.11/5.5.0-ai2-SNAPSHOT/
├── elasticsearch-spark-20_2.11-5.5.0-ai2-SNAPSHOT-javadoc.jar
├── elasticsearch-spark-20_2.11-5.5.0-ai2-SNAPSHOT-sources.jar
├── elasticsearch-spark-20_2.11-5.5.0-ai2-SNAPSHOT.jar
├── elasticsearch-spark-20_2.11-5.5.0-ai2-SNAPSHOT.pom
└── maven-metadata-local.xml

If you rely on the snapshot semantics illustrated above it is a good idea to declare the dependency as changing in your config so that updated snapshots get pulled down. In sbt it's something like this.

libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.5.0-ai2-SNAPSHOT" changing()

Elasticsearch Hadoop Build Status

Elasticsearch real-time search and analytics natively integrated with Hadoop.
Supports Map/Reduce, Cascading, Apache Hive, Apache Pig, Apache Spark and Apache Storm.

See project page and documentation for detailed information.

Requirements

Elasticsearch (1.x or higher (5.x highly recommended)) cluster accessible through REST. That's it! Significant effort has been invested to create a small, dependency-free, self-contained jar that can be downloaded and put to use without any dependencies. Simply make it available to your job classpath and you're set. For a certain library, see the dedicated chapter.

ES-Hadoop 5.x and higher are compatible with Elasticsearch 1.X, 2.X and 5.X

ES-Hadoop 2.2.x and higher are compatible with Elasticsearch 1.X and 2.X

ES-Hadoop 2.0.x and 2.1.x are compatible with Elasticsearch 1.X only

Installation

Stable Release (currently 5.5.0)

Available through any Maven-compatible tool:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>5.5.0</version>
</dependency>

or as a stand-alone ZIP.

Development Snapshot

Grab the latest nightly build from the repository again through Maven:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>5.5.1.BUILD-SNAPSHOT</version>
</dependency>
<repositories>
  <repository>
    <id>sonatype-oss</id>
    <url>http://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
  </repository>
</repositories>

or build the project yourself.

We do build and test the code on each commit.

Supported Hadoop Version

Hadoop 1.x as well as the "old" api (mapred) are deprecated in 5.5 and will be removed in 6.0. More information in this section.

Feedback / Q&A

We're interested in your feedback! You can find us on the User mailing list - please append [Hadoop] to the post subject to filter it out. For more details, see the community page.

Online Documentation

The latest reference documentation is available online on the project home page. Below the README contains basic usage instructions at a glance.

Usage

Configuration Properties

All configuration properties start with es prefix. Note that the es.internal namespace is reserved for the library internal use and should not be used by the user at any point. The properties are read mainly from the Hadoop configuration but the user can specify (some of) them directly depending on the library used.

Required

es.resource=<ES resource location, relative to the host/port specified above>

Essential

es.query=<uri or query dsl query>              # defaults to {"query":{"match_all":{}}}
es.nodes=<ES host address>                     # defaults to localhost
es.port=<ES REST port>                         # defaults to 9200

The full list is available here

For basic, low-level or performance-sensitive environments, ES-Hadoop provides dedicated InputFormat and OutputFormat that read and write data to Elasticsearch. To use them, add the es-hadoop jar to your job classpath (either by bundling the library along - it's ~300kB and there are no-dependencies), using the DistributedCache or by provisioning the cluster manually. See the documentation for more information.

Note that support of the (old) 'mapred' API is deprecated as of 5.5 and will be removed in 6.0.

'Old' (org.apache.hadoop.mapred) API

!!! DEPRECATED IN 5.5 !!!

Support of the (old) 'mapred' API is deprecated as of 5.5 and will be removed in 6.0.

Reading

To read data from ES, configure the EsInputFormat on your job configuration along with the relevant properties:

JobConf conf = new JobConf();
conf.setInputFormat(EsInputFormat.class);
conf.set("es.resource", "radio/artists"); 
conf.set("es.query", "?q=me*");             // replace this with the relevant query
...
JobClient.runJob(conf);

Writing

Same configuration template can be used for writing but using EsOuputFormat:

JobConf conf = new JobConf();
conf.setOutputFormat(EsOutputFormat.class);
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
...
JobClient.runJob(conf);

'New' (org.apache.hadoop.mapreduce) API

Reading

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists"); 
conf.set("es.query", "?q=me*");             // replace this with the relevant query
Job job = new Job(conf)
job.setInputFormatClass(EsInputFormat.class);
...
job.waitForCompletion(true);

Writing

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
Job job = new Job(conf)
job.setOutputFormatClass(EsOutputFormat.class);
...
job.waitForCompletion(true);

ES-Hadoop provides a Hive storage handler for Elasticsearch, meaning one can define an external table on top of ES.

Add es-hadoop-.jar to hive.aux.jars.path or register it manually in your Hive script (recommended):

ADD JAR /path_to_jar/es-hadoop-<version>.jar;

Reading

To read data from ES, define a table backed by the desired index:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists', 'es.query' = '?q=me*');

The fields defined in the table are mapped to the JSON when communicating with Elasticsearch. Notice the use of TBLPROPERTIES to define the location, that is the query used for reading from this table.

Once defined, the table can be used just like any other:

SELECT * FROM artists;

Writing

To write data, a similar definition is used but with a different es.resource:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists');

Any data passed to the table is then passed down to Elasticsearch; for example considering a table s, mapped to a TSV/CSV file, one can index it to Elasticsearch like this:

INSERT OVERWRITE TABLE artists 
    SELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture) FROM source s;

As one can note, currently the reading and writing are treated separately but we're working on unifying the two and automatically translating HiveQL to Elasticsearch queries.

ES-Hadoop provides both read and write functions for Pig so you can access Elasticsearch from Pig scripts.

Register ES-Hadoop jar into your script or add it to your Pig classpath:

REGISTER /path_to_jar/es-hadoop-<version>.jar;

Additionally one can define an alias to save some chars:

%define ESSTORAGE org.elasticsearch.hadoop.pig.EsStorage()

and use $ESSTORAGE for storage definition.

Reading

To read data from ES, use EsStorage and specify the query through the LOAD function:

A = LOAD 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage('es.query=?q=me*');
DUMP A;

Writing

Use the same Storage to write data to Elasticsearch:

A = LOAD 'src/artists.dat' USING PigStorage() AS (id:long, name, url:chararray, picture: chararray);
B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links;
STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage();

ES-Hadoop provides native (Java and Scala) integration with Spark: for reading a dedicated RDD and for writing, methods that work on any RDD. Spark SQL is also supported

Scala

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument:

import org.elasticsearch.spark._

..
val conf = ...
val sc = new SparkContext(conf)
sc.esRDD("radio/artists", "?q=me*")

Spark SQL

import org.elasticsearch.spark.sql._

// DataFrame schema automatically inferred
val df = sqlContext.read.format("es").load("buckethead/albums")

// operations get pushed down and translated at runtime to Elasticsearch QueryDSL
val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016)))

Writing

Import the org.elasticsearch.spark._ package to gain savetoEs methods on your RDDs:

import org.elasticsearch.spark._        

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

Spark SQL

import org.elasticsearch.spark.sql._

val df = sqlContext.read.json("examples/people.json")
df.saveToES("spark/people")

Java

In a Java environment, use the org.elasticsearch.spark.rdd.java.api package, in particular the JavaEsSpark class.

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument.

import org.apache.spark.api.java.JavaSparkContext;   
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);   

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");

Spark SQL

SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().format("es").load("buckethead/albums");
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016)))

Writing

Use JavaEsSpark to index any RDD to Elasticsearch:

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf); 

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);     
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(doc1, doc2)); 
JavaEsSpark.saveToEs(javaRDD, "spark/docs");

Spark SQL

import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

DataFrame df = sqlContext.read.json("examples/people.json")
JavaEsSparkSQL.saveToES(df, "spark/docs")

ES-Hadoop offers a dedicate Elasticsearch Tap, EsTap that can be used both as a sink or a source. Note that EsTap can be used in both local (LocalFlowConnector) and Hadoop (HadoopFlowConnector) flows:

Reading

Tap in = new EsTap("radio/artists", "?q=me*");
Tap out = new StdOut(new TextLine());
new LocalFlowConnector().connect(in, out, new Pipe("read-from-ES")).complete();

Writing

Tap in = Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap out = new EsTap("radio/artists", new Fields("name", "url", "picture"));
new HadoopFlowConnector().connect(in, out, new Pipe("write-to-ES")).complete();

ES-Hadoop provides native integration with Storm: for reading a dedicated Spout and for writing a specialized Bolt

Reading

To read data from ES, use EsSpout:

import org.elasticsearch.storm.EsSpout; 

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("es-spout", new EsSpout("storm/docs", "?q=me*"), 5);
builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout");

Writing

To index data to ES, use EsBolt:

import org.elasticsearch.storm.EsBolt; 

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 10);
builder.setBolt("es-bolt", new EsBolt("storm/docs"), 5).shuffleGrouping("spout");

Building the source

Elasticsearch Hadoop uses Gradle for its build system and it is not required to have it installed on your machine. By default (gradlew), it automatically builds the package and runs the unit tests. For integration testing, use the integrationTests task. See gradlew tasks for more information.

To create a distributable zip, run gradlew distZip from the command line; once completed you will find the jar in build/libs.

To build the project, JVM 8 (Oracle one is recommended) or higher is required.

License

This project is released under version 2.0 of the Apache License

Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright
ownership. Elasticsearch licenses this file to you under
the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.

About

🐘 Elasticsearch real-time search and analytics natively integrated with Hadoop

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 81.6%
  • Scala 18.4%