Skip to content

Commit

Permalink
[SPARK-20641][CORE] Add key-value store abstraction and LevelDB imple…
Browse files Browse the repository at this point in the history
…mentation.

This change adds an abstraction and LevelDB implementation for a key-value
store that will be used to store UI and SHS data.

The interface is described in KVStore.java (see javadoc). Specifics
of the LevelDB implementation are discussed in the javadocs of both
LevelDB.java and LevelDBTypeInfo.java.

Included also are a few small benchmarks just to get some idea of
latency. Because they're too slow for regular unit test runs, they're
disabled by default.

Tested with the included unit tests, and also as part of the overall feature
implementation (including running SHS with hundreds of apps).

Author: Marcelo Vanzin <[email protected]>

Closes apache#17902 from vanzin/shs-ng/M1.
  • Loading branch information
Marcelo Vanzin authored and squito committed Jun 6, 2017
1 parent b61a401 commit 0cba495
Show file tree
Hide file tree
Showing 20 changed files with 3,313 additions and 3 deletions.
101 changes: 101 additions & 0 deletions common/kvstore/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-kvstore_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Local DB</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>kvstore</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
82 changes: 82 additions & 0 deletions common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Tags a field to be indexed when storing an object.
*
* <p>
* Types are required to have a natural index that uniquely identifies instances in the store.
* The default value of the annotation identifies the natural index for the type.
* </p>
*
* <p>
* Indexes allow for more efficient sorting of data read from the store. By annotating a field or
* "getter" method with this annotation, an index will be created that will provide sorting based on
* the string value of that field.
* </p>
*
* <p>
* Note that creating indices means more space will be needed, and maintenance operations like
* updating or deleting a value will become more expensive.
* </p>
*
* <p>
* Indices are restricted to String, integral types (byte, short, int, long, boolean), and arrays
* of those values.
* </p>
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD})
public @interface KVIndex {

public static final String NATURAL_INDEX_NAME = "__main__";

/**
* The name of the index to be created for the annotated entity. Must be unique within
* the class. Index names are not allowed to start with an underscore (that's reserved for
* internal use). The default value is the natural index name (which is always a copy index
* regardless of the annotation's values).
*/
String value() default NATURAL_INDEX_NAME;

/**
* The name of the parent index of this index. By default there is no parent index, so the
* generated data can be retrieved without having to provide a parent value.
*
* <p>
* If a parent index is defined, iterating over the data using the index will require providing
* a single value for the parent index. This serves as a rudimentary way to provide relationships
* between entities in the store.
* </p>
*/
String parent() default "";

/**
* Whether to copy the instance's data to the index, instead of just storing a pointer to the
* data. The default behavior is to just store a reference; that saves disk space but is slower
* to read, since there's a level of indirection.
*/
boolean copy() default false;

}
129 changes: 129 additions & 0 deletions common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* Abstraction for a local key/value store for storing app data.
*
* <p>
* There are two main features provided by the implementations of this interface:
* </p>
*
* <h3>Serialization</h3>
*
* <p>
* If the underlying data store requires serialization, data will be serialized to and deserialized
* using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is
* based on Jackson, so it supports all the Jackson annotations for controlling the serialization of
* app-defined types.
* </p>
*
* <p>
* Data is also automatically compressed to save disk space.
* </p>
*
* <h3>Automatic Key Management</h3>
*
* <p>
* When using the built-in key management, the implementation will automatically create unique
* keys for each type written to the store. Keys are based on the type name, and always start
* with the "+" prefix character (so that it's easy to use both manual and automatic key
* management APIs without conflicts).
* </p>
*
* <p>
* Another feature of automatic key management is indexing; by annotating fields or methods of
* objects written to the store with {@link KVIndex}, indices are created to sort the data
* by the values of those properties. This makes it possible to provide sorting without having
* to load all instances of those types from the store.
* </p>
*
* <p>
* KVStore instances are thread-safe for both reads and writes.
* </p>
*/
public interface KVStore extends Closeable {

/**
* Returns app-specific metadata from the store, or null if it's not currently set.
*
* <p>
* The metadata type is application-specific. This is a convenience method so that applications
* don't need to define their own keys for this information.
* </p>
*/
<T> T getMetadata(Class<T> klass) throws Exception;

/**
* Writes the given value in the store metadata key.
*/
void setMetadata(Object value) throws Exception;

/**
* Read a specific instance of an object.
*
* @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
* are not allowed.
* @throws NoSuchElementException If an element with the given key does not exist.
*/
<T> T read(Class<T> klass, Object naturalKey) throws Exception;

/**
* Writes the given object to the store, including indexed fields. Indices are updated based
* on the annotated fields of the object's class.
*
* <p>
* Writes may be slower when the object already exists in the store, since it will involve
* updating existing indices.
* </p>
*
* @param value The object to write.
*/
void write(Object value) throws Exception;

/**
* Removes an object and all data related to it, like index entries, from the store.
*
* @param type The object's type.
* @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
* are not allowed.
* @throws NoSuchElementException If an element with the given key does not exist.
*/
void delete(Class<?> type, Object naturalKey) throws Exception;

/**
* Returns a configurable view for iterating over entities of the given type.
*/
<T> KVStoreView<T> view(Class<T> type) throws Exception;

/**
* Returns the number of items of the given type currently in the store.
*/
long count(Class<?> type) throws Exception;

/**
* Returns the number of items of the given type which match the given indexed value.
*/
long count(Class<?> type, String index, Object indexedValue) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.util.Iterator;
import java.util.List;

/**
* An iterator for KVStore.
*
* <p>
* Iterators may keep references to resources that need to be closed. It's recommended that users
* explicitly close iterators after they're used.
* </p>
*/
public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {

/**
* Retrieve multiple elements from the store.
*
* @param max Maximum number of elements to retrieve.
*/
List<T> next(int max);

/**
* Skip in the iterator.
*
* @return Whether there are items left after skipping.
*/
boolean skip(long n);

}
Loading

0 comments on commit 0cba495

Please sign in to comment.