Skip to content

Commit

Permalink
add Transactions and Checkpoint to C API
Browse files Browse the repository at this point in the history
Summary:
I've added functions to the C API to support Transactions as requested in facebook#1637 and to support Checkpoint.

I have also added the corresponding tests to c_test.c

For now, the following is omitted:

1. Optimistic Transactions
2. The column family variation of functions
Closes facebook#2236

Differential Revision: D4989510

Pulled By: yiwu-arbug

fbshipit-source-id: 518cb39f76d5e9ec9690d633fcdc014b98958071
  • Loading branch information
boolean5 authored and facebook-github-bot committed May 17, 2017
1 parent 445f123 commit cb9392a
Show file tree
Hide file tree
Showing 3 changed files with 543 additions and 3 deletions.
263 changes: 263 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "utilities/merge_operators.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/checkpoint.h"

using rocksdb::BytewiseComparator;
using rocksdb::Cache;
Expand Down Expand Up @@ -90,6 +93,11 @@ using rocksdb::CompactRangeOptions;
using rocksdb::RateLimiter;
using rocksdb::NewGenericRateLimiter;
using rocksdb::PinnableSlice;
using rocksdb::TransactionDBOptions;
using rocksdb::TransactionDB;
using rocksdb::TransactionOptions;
using rocksdb::Transaction;
using rocksdb::Checkpoint;

using std::shared_ptr;

Expand Down Expand Up @@ -131,6 +139,21 @@ struct rocksdb_ratelimiter_t { RateLimiter* rep; };
struct rocksdb_pinnableslice_t {
PinnableSlice rep;
};
struct rocksdb_transactiondb_options_t {
TransactionDBOptions rep;
};
struct rocksdb_transactiondb_t {
TransactionDB* rep;
};
struct rocksdb_transaction_options_t {
TransactionOptions rep;
};
struct rocksdb_transaction_t {
Transaction* rep;
};
struct rocksdb_checkpoint_t {
Checkpoint* rep;
};

struct rocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep;
Expand Down Expand Up @@ -543,6 +566,29 @@ void rocksdb_backup_engine_close(rocksdb_backup_engine_t* be) {
delete be;
}

rocksdb_checkpoint_t* rocksdb_checkpoint_object_create(rocksdb_t* db,
char** errptr) {
Checkpoint* checkpoint;
if (SaveError(errptr, Checkpoint::Create(db->rep, &checkpoint))) {
return nullptr;
}
rocksdb_checkpoint_t* result = new rocksdb_checkpoint_t;
result->rep = checkpoint;
return result;
}

void rocksdb_checkpoint_create(rocksdb_checkpoint_t* checkpoint,
const char* checkpoint_dir,
uint64_t log_size_for_flush, char** errptr) {
SaveError(errptr, checkpoint->rep->CreateCheckpoint(
std::string(checkpoint_dir), log_size_for_flush));
}

void rocksdb_checkpoint_object_destroy(rocksdb_checkpoint_t* checkpoint) {
delete checkpoint->rep;
delete checkpoint;
}

void rocksdb_close(rocksdb_t* db) {
delete db->rep;
delete db;
Expand Down Expand Up @@ -3094,6 +3140,223 @@ void rocksdb_delete_file_in_range_cf(
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
}

rocksdb_transactiondb_options_t* rocksdb_transactiondb_options_create() {
return new rocksdb_transactiondb_options_t;
}

void rocksdb_transactiondb_options_destroy(rocksdb_transactiondb_options_t* opt){
delete opt;
}

void rocksdb_transactiondb_options_set_max_num_locks(
rocksdb_transactiondb_options_t* opt, int64_t max_num_locks) {
opt->rep.max_num_locks = max_num_locks;
}

void rocksdb_transactiondb_options_set_num_stripes(
rocksdb_transactiondb_options_t* opt, size_t num_stripes) {
opt->rep.num_stripes = num_stripes;
}

void rocksdb_transactiondb_options_set_transaction_lock_timeout(
rocksdb_transactiondb_options_t* opt, int64_t txn_lock_timeout) {
opt->rep.transaction_lock_timeout = txn_lock_timeout;
}

void rocksdb_transactiondb_options_set_default_lock_timeout(
rocksdb_transactiondb_options_t* opt, int64_t default_lock_timeout) {
opt->rep.default_lock_timeout = default_lock_timeout;
}

rocksdb_transaction_options_t* rocksdb_transaction_options_create() {
return new rocksdb_transaction_options_t;
}

void rocksdb_transaction_options_destroy(rocksdb_transaction_options_t* opt) {
delete opt;
}

void rocksdb_transaction_options_set_set_snapshot(
rocksdb_transaction_options_t* opt, unsigned char v) {
opt->rep.set_snapshot = v;
}

void rocksdb_transaction_options_set_deadlock_detect(
rocksdb_transaction_options_t* opt, unsigned char v) {
opt->rep.deadlock_detect = v;
}

void rocksdb_transaction_options_set_lock_timeout(
rocksdb_transaction_options_t* opt, int64_t lock_timeout) {
opt->rep.lock_timeout = lock_timeout;
}

void rocksdb_transaction_options_set_expiration(
rocksdb_transaction_options_t* opt, int64_t expiration) {
opt->rep.expiration = expiration;
}

void rocksdb_transaction_options_set_deadlock_detect_depth(
rocksdb_transaction_options_t* opt, int64_t depth) {
opt->rep.deadlock_detect_depth = depth;
}

void rocksdb_transaction_options_set_max_write_batch_size(
rocksdb_transaction_options_t* opt, size_t size) {
opt->rep.max_write_batch_size = size;
}

rocksdb_transactiondb_t* rocksdb_transactiondb_open(
const rocksdb_options_t* options,
const rocksdb_transactiondb_options_t* txn_db_options, const char* name,
char** errptr) {
TransactionDB* txn_db;
if (SaveError(errptr, TransactionDB::Open(options->rep, txn_db_options->rep,
std::string(name), &txn_db))) {
return nullptr;
}
rocksdb_transactiondb_t* result = new rocksdb_transactiondb_t;
result->rep = txn_db;
return result;
}

const rocksdb_snapshot_t* rocksdb_transactiondb_create_snapshot(
rocksdb_transactiondb_t* txn_db) {
rocksdb_snapshot_t* result = new rocksdb_snapshot_t;
result->rep = txn_db->rep->GetSnapshot();
return result;
}

void rocksdb_transactiondb_release_snapshot(
rocksdb_transactiondb_t* txn_db, const rocksdb_snapshot_t* snapshot) {
txn_db->rep->ReleaseSnapshot(snapshot->rep);
delete snapshot;
}

rocksdb_transaction_t* rocksdb_transaction_begin(
rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* write_options,
const rocksdb_transaction_options_t* txn_options,
rocksdb_transaction_t* old_txn) {
if (old_txn == nullptr) {
rocksdb_transaction_t* result = new rocksdb_transaction_t;
result->rep = txn_db->rep->BeginTransaction(write_options->rep,
txn_options->rep, nullptr);
return result;
}
old_txn->rep = txn_db->rep->BeginTransaction(write_options->rep,
txn_options->rep, old_txn->rep);
return old_txn;
}

void rocksdb_transaction_commit(rocksdb_transaction_t* txn, char** errptr) {
SaveError(errptr, txn->rep->Commit());
}

void rocksdb_transaction_rollback(rocksdb_transaction_t* txn, char** errptr) {
SaveError(errptr, txn->rep->Rollback());
}

void rocksdb_transaction_destroy(rocksdb_transaction_t* txn) {
delete txn->rep;
delete txn;
}

//Read a key inside a transaction
char* rocksdb_transaction_get(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen,
char** errptr) {
char* result = nullptr;
std::string tmp;
Status s = txn->rep->Get(options->rep, Slice(key, klen), &tmp);
if (s.ok()) {
*vlen = tmp.size();
result = CopyString(tmp);
} else {
*vlen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}

// Read a key outside a transaction
char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
const char* key, size_t klen,
size_t* vlen,
char** errptr){
char* result = nullptr;
std::string tmp;
Status s = txn_db->rep->Get(options->rep, Slice(key, klen), &tmp);
if (s.ok()) {
*vlen = tmp.size();
result = CopyString(tmp);
} else {
*vlen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}

// Put a key inside a transaction
void rocksdb_transaction_put(rocksdb_transaction_t* txn, const char* key,
size_t klen, const char* val, size_t vlen,
char** errptr) {
SaveError(errptr, txn->rep->Put(Slice(key, klen), Slice(val, vlen)));
}

//Put a key outside a transaction
void rocksdb_transactiondb_put(rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* options,
const char* key, size_t klen, const char* val,
size_t vlen, char** errptr) {
SaveError(errptr,
txn_db->rep->Put(options->rep, Slice(key, klen), Slice(val, vlen)));
}

// Delete a key inside a transaction
void rocksdb_transaction_delete(rocksdb_transaction_t* txn, const char* key,
size_t klen, char** errptr) {
SaveError(errptr, txn->rep->Delete(Slice(key, klen)));
}

// Delete a key outside a transaction
void rocksdb_transactiondb_delete(rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* options,
const char* key, size_t klen, char** errptr) {
SaveError(errptr, txn_db->rep->Delete(options->rep, Slice(key, klen)));
}

// Create an iterator inside a transaction
rocksdb_iterator_t* rocksdb_transaction_create_iterator(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = txn->rep->GetIterator(options->rep);
return result;
}

void rocksdb_transactiondb_close(rocksdb_transactiondb_t* txn_db) {
delete txn_db->rep;
delete txn_db;
}

rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create(
rocksdb_transactiondb_t* txn_db, char** errptr) {
Checkpoint* checkpoint;
if (SaveError(errptr, Checkpoint::Create(txn_db->rep, &checkpoint))) {
return nullptr;
}
rocksdb_checkpoint_t* result = new rocksdb_checkpoint_t;
result->rep = checkpoint;
return result;
}

void rocksdb_free(void* ptr) { free(ptr); }

rocksdb_pinnableslice_t* rocksdb_get_pinned(
Expand Down
Loading

0 comments on commit cb9392a

Please sign in to comment.