Skip to content

Commit

Permalink
SAMZA-819 RocksDbKeyValueStore.flush() should be implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
feng-tao authored and navina committed Nov 20, 2015
1 parent 429f245 commit c84a0b5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.samza.container.SamzaContainerContext
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.storage.kv._
import org.apache.samza.system.SystemStreamPartition
import org.rocksdb.WriteOptions
import org.rocksdb.{FlushOptions, WriteOptions}
import org.apache.samza.config.StorageConfig._

class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
Expand All @@ -48,7 +48,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbMetrics)
val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbFlushOptions, rocksDbMetrics)
rocksDb
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class RocksDbKeyValueStore(
val isLoggedStore: Boolean,
val storeName: String,
val writeOptions: WriteOptions = new WriteOptions(),
val flushOptions: FlushOptions = new FlushOptions(),
val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {

// lazy val here is important because the store directories do not exist yet, it can only be opened
Expand Down Expand Up @@ -190,8 +191,8 @@ class RocksDbKeyValueStore(

def flush {
metrics.flushes.inc
// TODO still not exposed in Java RocksDB API, follow up with rocksDB team
trace("Flush in RocksDbKeyValueStore is not supported, ignoring")
trace("Flushing.")
db.flush(flushOptions)
}

def close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy}
import org.apache.samza.util.Util._
import org.junit.{Assert, Test}
import org.rocksdb.{RocksDBException, Options}
import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options}

class TestRocksDbKeyValueStore
{
Expand Down Expand Up @@ -65,4 +65,26 @@ class TestRocksDbKeyValueStore
Assert.assertNull(rocksDB.get(key))
rocksDB.close()
}

@Test
def testFlush(): Unit = {
val map = new util.HashMap[String, String]()
val config = new MapConfig(map)
val flushOptions = new FlushOptions().setWaitForFlush(true)
val options = new Options()
options.setCreateIfMissing(true)
val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")),
options,
config,
false,
"dbStore")
val key = "key".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
rocksDB.flush(flushOptions)
val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)
Assert.assertEquals(new String(rocksDBReadOnly.get(key), "UTF-8"), "val")
rocksDB.close()
rocksDBReadOnly.close()
}
}

0 comments on commit c84a0b5

Please sign in to comment.