Skip to content

Commit

Permalink
Make it possible to iterate over a WriteBatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
stephan-hof committed Jul 5, 2015
1 parent cf95b57 commit 302d1a6
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 0 deletions.
48 changes: 48 additions & 0 deletions docs/api/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ WriteBatch
Clear all updates buffered in this batch.

.. note::
Don't call this method if there is an outstanding iterator.
Calling :py:meth:`rocksdb.WriteBatch.clear()` with outstanding
iterator, leads to SEGFAULT.

.. py:method:: data()
Retrieve the serialized version of this batch.
Expand All @@ -376,6 +381,49 @@ WriteBatch

:rtype: int

.. py:method:: __iter__()
Returns an iterator over the current contents of the write batch.

If you add new items to the batch, they are not visible for this
iterator. Create a new one if you need to see them.

.. note::
Calling :py:meth:`rocksdb.WriteBatch.clear()` on the write batch
invalidates the iterator. Using a iterator where its corresponding
write batch has been cleared, leads to SEGFAULT.

:rtype: :py:class:`rocksdb.WriteBatchIterator`

WriteBatchIterator
==================

.. py:class:: rocksdb.WriteBatchIterator
.. py:method:: __iter__()
Returns self.

.. py:method:: __next__()
Returns the next item inside the corresponding write batch.
The return value is a tuple of always size three.

First item (Name of the operation):

* ``"Put"``
* ``"Merge"``
* ``"Delete"``

Second item (key):
Key for this operation.

Third item (value):
The value for this operation. Empty for ``"Delete"``.

changelog
tutoro

Errors
======

Expand Down
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ New:
* Make CompactRange available: :py:meth:`rocksdb.DB.compact_range`
* Add init options to :py:class:`rocksdb.BlockBasedTableFactory`
* Add more option to :py:class:`rocksdb.PlainTableFactory`
* Add :py:class:`rocksdb.WriteBatchIterator`


Version 0.2
Expand Down
19 changes: 19 additions & 0 deletions docs/tutorial/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,22 @@ Here is an example to switch to *universal style compaction*. ::

See here for more options on *universal style compaction*,
:py:attr:`rocksdb.Options.compaction_options_universal`

Iterate Over WriteBatch
=======================

In same cases you need to know, what operations happened on a WriteBatch.
The pyrocksdb WriteBatch supports the iterator protocol, see this example. ::

batch = rocksdb.WriteBatch()
batch.put(b"key1", b"v1")
batch.delete(b'a')
batch.merge(b'xxx', b'value')

for op, key, value in batch:
print op, key, value

# prints the following three lines
# Put key1 v1
# Delete a
# Merge xxx value
48 changes: 48 additions & 0 deletions rocksdb/_rocksdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,9 @@ cdef class ValuesIterator
cdef class ItemsIterator
cdef class ReversedIterator

# Forward declaration
cdef class WriteBatchIterator

cdef class WriteBatch(object):
cdef db.WriteBatch* batch

Expand Down Expand Up @@ -1246,6 +1249,51 @@ cdef class WriteBatch(object):
def count(self):
return self.batch.Count()

def __iter__(self):
return WriteBatchIterator(self)


@cython.internal
cdef class WriteBatchIterator(object):
# Need a reference to the WriteBatch.
# The BatchItems are only pointers to the memory in WriteBatch.
cdef WriteBatch batch
cdef vector[db.BatchItem] items
cdef size_t pos

def __init__(self, WriteBatch batch):
cdef Status st

self.batch = batch
self.pos = 0

st = db.get_batch_items(batch.batch, cython.address(self.items))
check_status(st)

def __iter__(self):
return self

def __next__(self):
if self.pos == self.items.size():
raise StopIteration()

cdef str op

if self.items[self.pos].op == db.BatchItemOpPut:
op = "Put"
elif self.items[self.pos].op == db.BatchItemOpMerge:
op = "Merge"
elif self.items[self.pos].op == db.BatchItemOpDelte:
op = "Delete"

ret = (
op,
slice_to_bytes(self.items[self.pos].key),
slice_to_bytes(self.items[self.pos].value))

self.pos += 1
return ret

@cython.no_gc_clear
cdef class DB(object):
cdef Options opts
Expand Down
56 changes: 56 additions & 0 deletions rocksdb/cpp/write_batch_iter_helper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <vector>
#include "rocksdb/write_batch.h"

namespace py_rocks {

class RecordItemsHandler: public rocksdb::WriteBatch::Handler {
public:
enum Optype {PutRecord, MergeRecord, DeleteRecord};

class BatchItem {
public:
BatchItem(
const Optype& op,
const rocksdb::Slice& key,
const rocksdb::Slice& value):
op(op),
key(key),
value(value)
{}

const Optype op;
const rocksdb::Slice key;
const rocksdb::Slice value;
};

typedef std::vector<BatchItem> BatchItems;

public:
/* Items is filled during iteration. */
RecordItemsHandler(BatchItems* items): items(items) {}

void Put(const Slice& key, const Slice& value) {
this->items->emplace_back(PutRecord, key, value);
}

void Merge(const Slice& key, const Slice& value) {
this->items->emplace_back(MergeRecord, key, value);
}

virtual void Delete(const Slice& key) {
this->items->emplace_back(DeleteRecord, key, rocksdb::Slice());
}

private:
BatchItems* items;
};

rocksdb::Status
get_batch_items(const rocksdb::WriteBatch* batch, RecordItemsHandler::BatchItems* items) {
RecordItemsHandler handler(items);
return batch->Iterate(&handler);
}

}
14 changes: 14 additions & 0 deletions rocksdb/db.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ cdef extern from "rocksdb/write_batch.h" namespace "rocksdb":
const string& Data() nogil except+
int Count() nogil except+

cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks":
cdef enum BatchItemOp "RecordItemsHandler::Optype":
BatchItemOpPut "py_rocks::RecordItemsHandler::Optype::PutRecord"
BatchItemOpMerge "py_rocks::RecordItemsHandler::Optype::MergeRecord"
BatchItemOpDelte "py_rocks::RecordItemsHandler::Optype::DeleteRecord"

cdef cppclass BatchItem "py_rocks::RecordItemsHandler::BatchItem":
BatchItemOp op
Slice key
Slice value

Status get_batch_items(WriteBatch* batch, vector[BatchItem]* items)


cdef extern from "rocksdb/db.h" namespace "rocksdb":
ctypedef uint64_t SequenceNumber

Expand Down
31 changes: 31 additions & 0 deletions rocksdb/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,37 @@ def test_write_batch(self):
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)

def test_write_batch_iter(self):
batch = rocksdb.WriteBatch()
batch.put(b"key1", b"v1")
batch.delete(b'a')
batch.merge(b'xxx', b'value')
for op, key, value in batch:
print op, key, value

batch = rocksdb.WriteBatch()
self.assertEqual([], list(batch))

batch.put(b"key1", b"v1")
batch.put(b"key2", b"v2")
batch.put(b"key3", b"v3")
batch.delete(b'a')
batch.delete(b'key1')
batch.merge(b'xxx', b'value')

it = iter(batch)
del batch
ref = [
('Put', 'key1', 'v1'),
('Put', 'key2', 'v2'),
('Put', 'key3', 'v3'),
('Delete', 'a', ''),
('Delete', 'key1', ''),
('Merge', 'xxx', 'value')
]
self.assertEqual(ref, list(it))


def test_key_may_exists(self):
self.db.put(b"a", b'1')

Expand Down

0 comments on commit 302d1a6

Please sign in to comment.