Skip to content

Commit

Permalink
[FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 1, 2017
1 parent a84ce0b commit 60721e0
Show file tree
Hide file tree
Showing 25 changed files with 205 additions and 163 deletions.
39 changes: 39 additions & 0 deletions flink-contrib/flink-statebackend-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,43 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
</dependencies>
<configuration>
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<executions>
<!--
Execute checkstyle after compilation but before tests.
This ensures that any parsing or type checking errors are from
javac, so they look as expected. Beyond that, we want to
fail as early as possible.
-->
<execution>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;

import org.rocksdb.ColumnFamilyHandle;
Expand All @@ -52,19 +52,19 @@
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
implements InternalKvState<N>, State {

/** Serializer for the namespace */
/** Serializer for the namespace. */
final TypeSerializer<N> namespaceSerializer;

/** The current namespace, which the next value methods will refer to */
/** The current namespace, which the next value methods will refer to. */
private N currentNamespace;

/** Backend that holds the actual RocksDB instance where we store state */
/** Backend that holds the actual RocksDB instance where we store state. */
protected RocksDBKeyedStateBackend<K> backend;

/** The column family of this particular instance of state */
/** The column family of this particular instance of state. */
protected ColumnFamilyHandle columnFamily;

/** State descriptor from which to create this state instance */
/** State descriptor from which to create this state instance. */
protected final SD stateDesc;

/**
Expand Down Expand Up @@ -110,7 +110,7 @@ public void clear() {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
backend.db.remove(columnFamily, writeOptions, key);
} catch (IOException|RocksDBException e) {
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while removing entry from RocksDB", e);
}
}
Expand Down Expand Up @@ -220,7 +220,7 @@ private static void writeVariableIntBytes(
value >>>= 8;
} while (value != 0);
}

protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
int keyGroup = readKeyGroup(inputView);
K key = readKey(inputStream, inputView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
* A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
*
*
* <p>A typical pattern to use this OptionsFactory is as follows:
*
*
* <h3>Java 8:</h3>
* <pre>{@code
* rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) );
* }</pre>
*
*
* <h3>Java 7:</h3>
* <pre>{@code
* rocksDbBackend.setOptions(new OptionsFactory() {
*
*
* public Options setOptions(Options currentOptions) {
* return currentOptions.setMaxOpenFiles(1024);
* }
Expand All @@ -49,11 +49,11 @@ public interface OptionsFactory extends java.io.Serializable {
* This method should set the additional options on top of the current options object.
* The current options object may contain pre-defined options based on flags that have
* been configured on the state backend.
*
*
* <p>It is important to set the options on the current object and return the result from
* the setter methods, otherwise the pre-defined options may get lost.
*
* @param currentOptions The options object with the pre-defined options.
*
* @param currentOptions The options object with the pre-defined options.
* @return The options object on which the additional options are set.
*/
DBOptions createDBOptions(DBOptions currentOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.rocksdb.StringAppendOperator;

/**
* The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
* The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
* The various pre-defined choices are configurations that have been empirically
* determined to be beneficial for performance under different settings.
*
*
* <p>Some of these settings are based on experiments by the Flink community, some follow
* guides from the RocksDB project.
*/
Expand All @@ -37,12 +37,12 @@ public enum PredefinedOptions {
/**
* Default options for all settings, except that writes are not forced to the
* disk.
*
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
DEFAULT {

@Override
public DBOptions createDBOptions() {
return new DBOptions()
Expand All @@ -60,11 +60,11 @@ public ColumnFamilyOptions createColumnOptions() {

/**
* Pre-defined options for regular spinning hard disks.
*
*
* <p>This constant configures RocksDB with some options that lead empirically
* to better performance when the machines executing the system use
* regular spinning hard disks.
*
*
* <p>The following options are set:
* <ul>
* <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
Expand All @@ -74,7 +74,7 @@ public ColumnFamilyOptions createColumnOptions() {
* <li>setDisableDataSync(true)</li>
* <li>setMaxOpenFiles(-1)</li>
* </ul>
*
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
Expand Down Expand Up @@ -121,7 +121,7 @@ public ColumnFamilyOptions createColumnOptions() {
* <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
* <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
* </ul>
*
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
Expand Down Expand Up @@ -161,21 +161,21 @@ public ColumnFamilyOptions createColumnOptions() {
);
}
},

/**
* Pre-defined options for Flash SSDs.
*
* <p>This constant configures RocksDB with some options that lead empirically
* to better performance when the machines executing the system use SSDs.
*
*
* <p>The following options are set:
* <ul>
* <li>setIncreaseParallelism(4)</li>
* <li>setUseFsync(false)</li>
* <li>setDisableDataSync(true)</li>
* <li>setMaxOpenFiles(-1)</li>
* </ul>
*
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
Expand All @@ -196,13 +196,13 @@ public ColumnFamilyOptions createColumnOptions() {
.setMergeOperator(new StringAppendOperator());
}
};

// ------------------------------------------------------------------------

/**
* Creates the {@link DBOptions}for this pre-defined setting.
*
* @return The pre-defined options object.
*
* @return The pre-defined options object.
*/
public abstract DBOptions createDBOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

import org.apache.flink.runtime.state.internal.InternalAggregatingState;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
Expand All @@ -47,10 +47,10 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
implements InternalAggregatingState<N, T, R> {

/** Serializer for the values */
/** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;

/** User-specified aggregation function */
/** User-specified aggregation function. */
private final AggregateFunction<T, ACC, R> aggFunction;

/**
Expand All @@ -64,7 +64,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
*
* @param namespaceSerializer
* The serializer for the namespace.
* @param stateDesc
* @param stateDesc
* The state identifier for the state. This contains the state name and aggregation function.
*/
public RocksDBAggregatingState(
Expand Down Expand Up @@ -154,7 +154,7 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
writeKeyWithGroupAndNamespace(
keyGroup, key, source,
keySerializationStream, keySerializationDataOutputView);

final byte[] sourceKey = keySerializationStream.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);

Expand All @@ -174,7 +174,7 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {

// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
// create the target full-binary-key
// create the target full-binary-key
writeKeyWithGroupAndNamespace(
keyGroup, key, target,
keySerializationStream, keySerializationDataOutputView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalFoldingState;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
Expand All @@ -47,10 +48,10 @@ public class RocksDBFoldingState<K, N, T, ACC>
extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
implements InternalFoldingState<N, T, ACC> {

/** Serializer for the values */
/** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;

/** User-specified fold function */
/** User-specified fold function. */
private final FoldFunction<T, ACC> foldFunction;

/**
Expand Down Expand Up @@ -90,7 +91,7 @@ public ACC get() {
return null;
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
} catch (IOException|RocksDBException e) {
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
}
Expand Down
Loading

0 comments on commit 60721e0

Please sign in to comment.