Skip to content

Commit

Permalink
JNI direct buffer support for basic operations (facebook#2283)
Browse files Browse the repository at this point in the history
Summary:
It is very useful to support direct ByteBuffers in Java. It allows to have zero memory copy and some serializers are using that directly so one do not need to create byte[] array for it.

This change also contains some fixes for Windows JNI build.
Pull Request resolved: facebook#2283

Differential Revision: D19834971

Pulled By: pdillinger

fbshipit-source-id: 44173aa02afc9836c5498c592fd1ea95b6086e8e
  • Loading branch information
koldat authored and facebook-github-bot committed Feb 11, 2020
1 parent 28aa09d commit e412a42
Show file tree
Hide file tree
Showing 26 changed files with 1,232 additions and 43 deletions.
3 changes: 2 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
### Java API Changes
* Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252).
* Added overloads of common API methods using direct ByteBuffers for keys and values (#2283).

### Bug Fixes
* Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev().
Expand Down
61 changes: 61 additions & 0 deletions java/rocksjni/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>

#include "include/org_rocksdb_RocksIterator.h"
#include "rocksdb/iterator.h"
Expand Down Expand Up @@ -102,6 +103,36 @@ void Java_org_rocksdb_RocksIterator_seek0(JNIEnv* env, jobject /*jobj*/,
env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: seekDirect0
* Signature: (JLjava/nio/ByteBuffer;II)V
*/
void Java_org_rocksdb_RocksIterator_seekDirect0(JNIEnv* env, jobject /*jobj*/,
jlong handle, jobject jtarget,
jint jtarget_off,
jint jtarget_len) {
auto* it = reinterpret_cast<rocksdb::Iterator*>(handle);
auto seek = [&it](rocksdb::Slice& target_slice) { it->Seek(target_slice); };
rocksdb::JniUtil::k_op_direct(seek, env, jtarget, jtarget_off, jtarget_len);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: seekForPrevDirect0
* Signature: (JLjava/nio/ByteBuffer;II)V
*/
void Java_org_rocksdb_RocksIterator_seekForPrevDirect0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget,
jint jtarget_off, jint jtarget_len) {
auto* it = reinterpret_cast<rocksdb::Iterator*>(handle);
auto seekPrev = [&it](rocksdb::Slice& target_slice) {
it->SeekForPrev(target_slice);
};
rocksdb::JniUtil::k_op_direct(seekPrev, env, jtarget, jtarget_off,
jtarget_len);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: seekForPrev0
Expand Down Expand Up @@ -163,6 +194,21 @@ jbyteArray Java_org_rocksdb_RocksIterator_key0(JNIEnv* env, jobject /*jobj*/,
return jkey;
}

/*
* Class: org_rocksdb_RocksIterator
* Method: keyDirect0
* Signature: (JLjava/nio/ByteBuffer;II)I
*/
jint Java_org_rocksdb_RocksIterator_keyDirect0(JNIEnv* env, jobject /*jobj*/,
jlong handle, jobject jtarget,
jint jtarget_off,
jint jtarget_len) {
auto* it = reinterpret_cast<rocksdb::Iterator*>(handle);
rocksdb::Slice key_slice = it->key();
return rocksdb::JniUtil::copyToDirect(env, key_slice, jtarget, jtarget_off,
jtarget_len);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: value0
Expand All @@ -184,3 +230,18 @@ jbyteArray Java_org_rocksdb_RocksIterator_value0(JNIEnv* env, jobject /*jobj*/,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(value_slice.data())));
return jkeyValue;
}

/*
* Class: org_rocksdb_RocksIterator
* Method: valueDirect0
* Signature: (JLjava/nio/ByteBuffer;II)I
*/
jint Java_org_rocksdb_RocksIterator_valueDirect0(JNIEnv* env, jobject /*jobj*/,
jlong handle, jobject jtarget,
jint jtarget_off,
jint jtarget_len) {
auto* it = reinterpret_cast<rocksdb::Iterator*>(handle);
rocksdb::Slice value_slice = it->value();
return rocksdb::JniUtil::copyToDirect(env, value_slice, jtarget, jtarget_off,
jtarget_len);
}
79 changes: 79 additions & 0 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,85 @@ class JniUtil {

return jpointers;
}

/*
* Helper for operations on a key and value
* for example WriteBatch->Put
*
* TODO(AR) could be extended to cover returning rocksdb::Status
* from `op` and used for RocksDB->Put etc.
*/
static void kv_op_direct(
std::function<void(rocksdb::Slice&, rocksdb::Slice&)> op, JNIEnv* env,
jobject jkey, jint jkey_off, jint jkey_len, jobject jval, jint jval_off,
jint jval_len) {
char* key = reinterpret_cast<char*>(env->GetDirectBufferAddress(jkey));
if (key == nullptr ||
env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid key argument");
return;
}

char* value = reinterpret_cast<char*>(env->GetDirectBufferAddress(jval));
if (value == nullptr ||
env->GetDirectBufferCapacity(jval) < (jval_off + jval_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid value argument");
return;
}

key += jkey_off;
value += jval_off;

rocksdb::Slice key_slice(key, jkey_len);
rocksdb::Slice value_slice(value, jval_len);

op(key_slice, value_slice);
}

/*
* Helper for operations on a key and value
* for example WriteBatch->Delete
*
* TODO(AR) could be extended to cover returning rocksdb::Status
* from `op` and used for RocksDB->Delete etc.
*/
static void k_op_direct(std::function<void(rocksdb::Slice&)> op,
JNIEnv* env, jobject jkey, jint jkey_off,
jint jkey_len) {
char* key = reinterpret_cast<char*>(env->GetDirectBufferAddress(jkey));
if (key == nullptr ||
env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid key argument");
return;
}

key += jkey_off;

rocksdb::Slice key_slice(key, jkey_len);

return op(key_slice);
}

template <class T>
static jint copyToDirect(JNIEnv* env, T& source, jobject jtarget,
jint jtarget_off, jint jtarget_len) {
char* target =
reinterpret_cast<char*>(env->GetDirectBufferAddress(jtarget));
if (target == nullptr ||
env->GetDirectBufferCapacity(jtarget) < (jtarget_off + jtarget_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid target argument");
return 0;
}

target += jtarget_off;

const jint cvalue_len = static_cast<jint>(source.size());
const jint length = std::min(jtarget_len, cvalue_len);

memcpy(target, source.data(), length);

return cvalue_len;
}
};

class MapJni : public JavaClass {
Expand Down
166 changes: 166 additions & 0 deletions java/rocksjni/rocksjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,36 @@ void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BIIJ(
}
}

/*
* Class: org_rocksdb_RocksDB
* Method: putDirect
* Signature: (JJLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)V
*/
void Java_org_rocksdb_RocksDB_putDirect(
JNIEnv* env, jobject /*jdb*/, jlong jdb_handle, jlong jwrite_options_handle,
jobject jkey, jint jkey_off, jint jkey_len, jobject jval, jint jval_off,
jint jval_len, jlong jcf_handle) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* write_options =
reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto put = [&env, &db, &cf_handle, &write_options](rocksdb::Slice& key,
rocksdb::Slice& value) {
rocksdb::Status s;
if (cf_handle == nullptr) {
s = db->Put(*write_options, key, value);
} else {
s = db->Put(*write_options, cf_handle, key, value);
}
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
};
rocksdb::JniUtil::kv_op_direct(put, env, jkey, jkey_off, jkey_len, jval,
jval_off, jval_len);
}

//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Delete()

Expand Down Expand Up @@ -868,6 +898,92 @@ void Java_org_rocksdb_RocksDB_deleteRange__J_3BII_3BII(
jend_key, jend_key_off, jend_key_len);
}

jint rocksdb_get_helper_direct(
JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_options,
rocksdb::ColumnFamilyHandle* column_family_handle, jobject jkey,
jint jkey_off, jint jkey_len, jobject jval, jint jval_off, jint jval_len,
bool* has_exception) {
static const int kNotFound = -1;
static const int kStatusError = -2;
static const int kArgumentError = -3;

char* key = reinterpret_cast<char*>(env->GetDirectBufferAddress(jkey));
if (key == nullptr) {
rocksdb::RocksDBExceptionJni::ThrowNew(
env,
"Invalid key argument (argument is not a valid direct ByteBuffer)");
*has_exception = true;
return kArgumentError;
}
if (env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(
env,
"Invalid key argument. Capacity is less than requested region (offset "
"+ length).");
*has_exception = true;
return kArgumentError;
}

char* value = reinterpret_cast<char*>(env->GetDirectBufferAddress(jval));
if (value == nullptr) {
rocksdb::RocksDBExceptionJni::ThrowNew(
env,
"Invalid value argument (argument is not a valid direct ByteBuffer)");
*has_exception = true;
return kArgumentError;
}

if (env->GetDirectBufferCapacity(jval) < (jval_off + jval_len)) {
rocksdb::RocksDBExceptionJni::ThrowNew(
env,
"Invalid value argument. Capacity is less than requested region "
"(offset + length).");
*has_exception = true;
return kArgumentError;
}

key += jkey_off;
value += jval_off;

rocksdb::Slice key_slice(key, jkey_len);

// TODO(yhchiang): we might save one memory allocation here by adding
// a DB::Get() function which takes preallocated jbyte* as input.
std::string cvalue;
rocksdb::Status s;
if (column_family_handle != nullptr) {
s = db->Get(read_options, column_family_handle, key_slice, &cvalue);
} else {
// backwards compatibility
s = db->Get(read_options, key_slice, &cvalue);
}

if (s.IsNotFound()) {
*has_exception = false;
return kNotFound;
} else if (!s.ok()) {
*has_exception = true;
// Here since we are throwing a Java exception from c++ side.
// As a result, c++ does not know calling this function will in fact
// throwing an exception. As a result, the execution flow will
// not stop here, and codes after this throw will still be
// executed.
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);

// Return a dummy const value to avoid compilation error, although
// java side might not have a chance to get the return value :)
return kStatusError;
}

const jint cvalue_len = static_cast<jint>(cvalue.size());
const jint length = std::min(jval_len, cvalue_len);

memcpy(value, cvalue.c_str(), length);

*has_exception = false;
return cvalue_len;
}

/*
* Class: org_rocksdb_RocksDB
* Method: deleteRange
Expand Down Expand Up @@ -933,6 +1049,27 @@ void Java_org_rocksdb_RocksDB_deleteRange__JJ_3BII_3BIIJ(
}
}

/*
* Class: org_rocksdb_RocksDB
* Method: getDirect
* Signature: (JJLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)I
*/
jint Java_org_rocksdb_RocksDB_getDirect(JNIEnv* env, jobject /*jdb*/,
jlong jdb_handle, jlong jropt_handle,
jobject jkey, jint jkey_off,
jint jkey_len, jobject jval,
jint jval_off, jint jval_len,
jlong jcf_handle) {
auto* db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* ro_opt = reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
bool has_exception = false;
return rocksdb_get_helper_direct(
env, db_handle, ro_opt == nullptr ? rocksdb::ReadOptions() : *ro_opt,
cf_handle, jkey, jkey_off, jkey_len, jval, jval_off, jval_len,
&has_exception);
}

//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Merge

Expand Down Expand Up @@ -1071,6 +1208,35 @@ jlong rocksdb_iterator_helper(rocksdb::DB* db,
return reinterpret_cast<jlong>(iterator);
}

/*
* Class: org_rocksdb_RocksDB
* Method: deleteDirect
* Signature: (JJLjava/nio/ByteBuffer;IIJ)V
*/
void Java_org_rocksdb_RocksDB_deleteDirect(JNIEnv* env, jobject /*jdb*/,
jlong jdb_handle,
jlong jwrite_options, jobject jkey,
jint jkey_offset, jint jkey_len,
jlong jcf_handle) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* write_options =
reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto remove = [&env, &db, &write_options, &cf_handle](rocksdb::Slice& key) {
rocksdb::Status s;
if (cf_handle == nullptr) {
s = db->Delete(*write_options, key);
} else {
s = db->Delete(*write_options, cf_handle, key);
}
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
};
rocksdb::JniUtil::k_op_direct(remove, env, jkey, jkey_offset, jkey_len);
}

//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Write
/*
Expand Down
Loading

0 comments on commit e412a42

Please sign in to comment.