Skip to content

Commit

Permalink
add DB.iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Jan 16, 2022
1 parent 68a168e commit 11b18eb
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 9 deletions.
81 changes: 79 additions & 2 deletions rocksdb/_rocksdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ BloomFilterPolicy = PyBloomFilterPolicy
#############################################



## Here comes the stuff for the merge operator
@cython.internal
cdef class PyMergeOperator(object):
Expand Down Expand Up @@ -1950,6 +1949,85 @@ cdef class DB(object):
st = self.db.Write(opts, batch.batch)
check_status(st)

def iterator(self, start: bytes, column_family: bytes = None, iterate_lower_bound: bytes = None,
iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True,
include_value: bool = True, fill_cache: bool = True, prefix_same_as_start: bool = False,
auto_prefix_mode: bool = False):
"""
RocksDB Iterator
Args:
column_family (bytes): the name of the column family
start (bytes): prefix to seek to
iterate_lower_bound (bytes): defines the smallest key at which the backward iterator can return an entry.
Once the bound is passed, Valid() will be false. `iterate_lower_bound` is
inclusive ie the bound value is a valid entry.
If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
need to have the same prefix. This is because ordering is not guaranteed
outside of prefix domain.
iterate_upper_bound: (bytes): defines the extent up to which the forward iterator
can returns entries. Once the bound is reached, Valid() will be false.
"iterate_upper_bound" is exclusive ie the bound value is
not a valid entry. If prefix_extractor is not null:
1. If auto_prefix_mode = true, iterate_upper_bound will be used
to infer whether prefix iterating (e.g. applying prefix bloom filter)
can be used within RocksDB. This is done by comparing
iterate_upper_bound with the seek key.
2. If auto_prefix_mode = false, iterate_upper_bound only takes
effect if it shares the same prefix as the seek key. If
iterate_upper_bound is outside the prefix of the seek key, then keys
returned outside the prefix range will be undefined, just as if
iterate_upper_bound = null.
If iterate_upper_bound is not null, SeekToLast() will position the iterator
at the first key smaller than iterate_upper_bound.
reverse: (bool): run the iteration in reverse - using `reversed` is also supported
include_key (bool): the iterator should include the key in each iteration
include_value (bool): the iterator should include the value in each iteration
fill_cache (bool): Should the "data block"/"index block" read for this iteration be placed in
block cache? Callers may wish to set this field to false for bulk scans.
This would help not to the change eviction order of existing items in the
block cache. Default: true
prefix_same_as_start (bool): Enforce that the iterator only iterates over the same prefix as the seek.
This option is effective only for prefix seeks, i.e. prefix_extractor is
non-null for the column family and total_order_seek is false. Unlike
iterate_upper_bound, prefix_same_as_start only works within a prefix
but in both directions. Default: false
auto_prefix_mode (bool): When true, by default use total_order_seek = true, and RocksDB can
selectively enable prefix seek mode if won't generate a different result
from total_order_seek, based on seek key, and iterator upper bound.
Not supported in ROCKSDB_LITE mode, in the way that even with value true
prefix mode is not used. Default: false
Returns:
BaseIterator: An iterator that yields key/value pairs or keys or values alone depending on the arguments.
The iterator supports being `reversed`
"""

cf = self.get_column_family(column_family)

if not include_value:
iterator = self.iterkeys(
column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
elif not include_key:
iterator = self.itervalues(
column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
else:
iterator = self.iteritems(
column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
iterator.seek(start)
if reverse:
iterator = reversed(iterator)
return iterator

def get(self, key, *args, **kwargs):
cdef string res
cdef Status st
Expand Down Expand Up @@ -2387,7 +2465,6 @@ def list_column_families(db_name, Options opts):

return column_families


@cython.no_gc_clear
@cython.internal
cdef class Snapshot(object):
Expand Down
229 changes: 222 additions & 7 deletions tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import tempfile
from rocksdb.merge_operators import UintAddOperator, StringAppendOperator


def int_to_bytes(ob):
return str(ob).encode('ascii')


class TestHelper(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -69,6 +71,24 @@ def test_put_then_get_from_secondary(self):
secondary.try_catch_up_with_primary()
self.assertEqual(b"b", secondary.get(b"a"))

secondary2_location = os.path.join(self.db_loc, "secondary2")
secondary2 = rocksdb.DB(
os.path.join(self.db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary2_location
)
self.addCleanup(secondary2.close)

self.assertEqual(b"b", secondary2.get(b"a"))
self.db.put(b"a", b"c")
self.assertEqual(b"b", secondary.get(b"a"))
self.assertEqual(b"b", secondary2.get(b"a"))
self.assertEqual(b"c", self.db.get(b"a"))
secondary.try_catch_up_with_primary()
secondary2.try_catch_up_with_primary()
self.assertEqual(b"c", secondary.get(b"a"))
self.assertEqual(b"c", secondary2.get(b"a"))

def test_multi_get(self):
self.db.put(b"a", b"1")
self.db.put(b"b", b"2")
Expand Down Expand Up @@ -97,6 +117,18 @@ def test_write_batch(self):
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)

def test_write_batch_context(self):
with self.db.write_batch() as batch:
batch.put(b"key", b"v1")
batch.delete(b"key")
batch.put(b"key", b"v2")
batch.put(b"key", b"v3")
batch.put(b"a", b"b")

ref = {b'a': b'b', b'key': b'v3'}
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)

def test_write_batch_iter(self):
batch = rocksdb.WriteBatch()
self.assertEqual([], list(batch))
Expand All @@ -120,7 +152,6 @@ def test_write_batch_iter(self):
]
self.assertEqual(ref, list(it))


def test_key_may_exists(self):
self.db.put(b"a", b'1')

Expand Down Expand Up @@ -174,7 +205,6 @@ def test_seek_for_prev(self):
it.seek_for_prev(b'c3')
self.assertEqual(it.get(), (b'c2', b'c2_value'))


def test_iter_keys(self):
for x in range(300):
self.db.put(int_to_bytes(x), int_to_bytes(x))
Expand Down Expand Up @@ -457,6 +487,7 @@ def in_domain(self, src):
def in_range(self, dst):
return len(dst) == 5


class TestPrefixExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
Expand Down Expand Up @@ -687,15 +718,29 @@ def test_snapshot(self):
self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it))

def test_get_property(self):
secondary_location = os.path.join(self.db_loc, "secondary")
cf = {
b'A': rocksdb.ColumnFamilyOptions(),
b'B': rocksdb.ColumnFamilyOptions()
}
secondary = rocksdb.get_db_with_options(
os.path.join(self.db_loc, "test"), create_if_missing=True, max_open_files=-1,
secondary_name=secondary_location, column_families=cf
)
self.addCleanup(secondary.close)

for x in range(300):
x = int_to_bytes(x)
self.db.put((self.cf_a, x), x)

self.assertEqual(b"300",
self.db.get_property(b'rocksdb.estimate-num-keys',
self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits',
self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits', self.cf_a))
self.assertEqual(b"0", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))

secondary.try_catch_up_with_primary()

self.assertEqual(b"300", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))

def test_compact_range(self):
for x in range(10000):
Expand All @@ -704,3 +749,173 @@ def test_compact_range(self):

self.db.compact_range(column_family=self.cf_b)


class OneCharacterPrefix(rocksdb.interfaces.SliceTransform):
def name(self):
return b'test prefix'

def transform(self, src):
return (0, 1)

def in_domain(self, src):
return len(src) >= 1

def in_range(self, dst):
return len(dst) == 1


class TestPrefixIterator(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)

def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', prefix_same_as_start=True, include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)


class TestPrefixIteratorWithExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
opts.prefix_extractor = OneCharacterPrefix()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)

def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', include_value=False, prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)

def test_column_family_iterator(self):
cf_a = self.db.create_column_family(b'first', rocksdb.ColumnFamilyOptions())
cf_b = self.db.create_column_family(b'second', rocksdb.ColumnFamilyOptions())

self.db.put((cf_a, b'a0'), b'a0_value')
self.db.put((cf_a, b'a1'), b'a1_value')
self.db.put((cf_a, b'a1b'), b'a1b_value')
self.db.put((cf_a, b'a2b'), b'a2b_value')
self.db.put((cf_a, b'a3'), b'a3_value')
self.db.put((cf_a, b'a4'), b'a4_value')
self.db.put((cf_b, b'b0'), b'b0_value')

self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(map(lambda x: (x[0][-1], x[1]), self.db.iterator(column_family=b'first', start=b'a', prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a', include_value=False, prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1],
reversed(self.db.iterator(
column_family=b'first', start=b'a0', iterate_upper_bound=b'a5', include_value=False
))))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a4', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1], reversed(
self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a2', include_value=False))))
)
self.assertListEqual(
[b'b0'],
list(map(lambda x: x[-1], self.db.iterator(column_family=b'second', start=b'b', include_value=False)))
)

0 comments on commit 11b18eb

Please sign in to comment.