diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 6a6b9ee3..091fc6d7 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -323,7 +323,6 @@ BloomFilterPolicy = PyBloomFilterPolicy ############################################# - ## Here comes the stuff for the merge operator @cython.internal cdef class PyMergeOperator(object): @@ -1950,6 +1949,85 @@ cdef class DB(object): st = self.db.Write(opts, batch.batch) check_status(st) + def iterator(self, start: bytes, column_family: bytes = None, iterate_lower_bound: bytes = None, + iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, + include_value: bool = True, fill_cache: bool = True, prefix_same_as_start: bool = False, + auto_prefix_mode: bool = False): + """ + RocksDB Iterator + + Args: + column_family (bytes): the name of the column family + start (bytes): prefix to seek to + iterate_lower_bound (bytes): defines the smallest key at which the backward iterator can return an entry. + Once the bound is passed, Valid() will be false. `iterate_lower_bound` is + inclusive ie the bound value is a valid entry. + If prefix_extractor is not null, the Seek target and `iterate_lower_bound` + need to have the same prefix. This is because ordering is not guaranteed + outside of prefix domain. + iterate_upper_bound: (bytes): defines the extent up to which the forward iterator + can returns entries. Once the bound is reached, Valid() will be false. + "iterate_upper_bound" is exclusive ie the bound value is + not a valid entry. If prefix_extractor is not null: + 1. If auto_prefix_mode = true, iterate_upper_bound will be used + to infer whether prefix iterating (e.g. applying prefix bloom filter) + can be used within RocksDB. This is done by comparing + iterate_upper_bound with the seek key. + 2. If auto_prefix_mode = false, iterate_upper_bound only takes + effect if it shares the same prefix as the seek key. If + iterate_upper_bound is outside the prefix of the seek key, then keys + returned outside the prefix range will be undefined, just as if + iterate_upper_bound = null. + If iterate_upper_bound is not null, SeekToLast() will position the iterator + at the first key smaller than iterate_upper_bound. + reverse: (bool): run the iteration in reverse - using `reversed` is also supported + include_key (bool): the iterator should include the key in each iteration + include_value (bool): the iterator should include the value in each iteration + fill_cache (bool): Should the "data block"/"index block" read for this iteration be placed in + block cache? Callers may wish to set this field to false for bulk scans. + This would help not to the change eviction order of existing items in the + block cache. Default: true + prefix_same_as_start (bool): Enforce that the iterator only iterates over the same prefix as the seek. + This option is effective only for prefix seeks, i.e. prefix_extractor is + non-null for the column family and total_order_seek is false. Unlike + iterate_upper_bound, prefix_same_as_start only works within a prefix + but in both directions. Default: false + auto_prefix_mode (bool): When true, by default use total_order_seek = true, and RocksDB can + selectively enable prefix seek mode if won't generate a different result + from total_order_seek, based on seek key, and iterator upper bound. + Not supported in ROCKSDB_LITE mode, in the way that even with value true + prefix mode is not used. Default: false + + Returns: + BaseIterator: An iterator that yields key/value pairs or keys or values alone depending on the arguments. + The iterator supports being `reversed` + """ + + cf = self.get_column_family(column_family) + + if not include_value: + iterator = self.iterkeys( + column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + elif not include_key: + iterator = self.itervalues( + column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + else: + iterator = self.iteritems( + column_family=cf, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + iterator.seek(start) + if reverse: + iterator = reversed(iterator) + return iterator + def get(self, key, *args, **kwargs): cdef string res cdef Status st @@ -2387,7 +2465,6 @@ def list_column_families(db_name, Options opts): return column_families - @cython.no_gc_clear @cython.internal cdef class Snapshot(object): diff --git a/tests/test_db.py b/tests/test_db.py index c0adcd5f..a5de895e 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -9,9 +9,11 @@ import tempfile from rocksdb.merge_operators import UintAddOperator, StringAppendOperator + def int_to_bytes(ob): return str(ob).encode('ascii') + class TestHelper(unittest.TestCase): def setUp(self): @@ -69,6 +71,24 @@ def test_put_then_get_from_secondary(self): secondary.try_catch_up_with_primary() self.assertEqual(b"b", secondary.get(b"a")) + secondary2_location = os.path.join(self.db_loc, "secondary2") + secondary2 = rocksdb.DB( + os.path.join(self.db_loc, "test"), + rocksdb.Options(create_if_missing=True, max_open_files=-1), + secondary_name=secondary2_location + ) + self.addCleanup(secondary2.close) + + self.assertEqual(b"b", secondary2.get(b"a")) + self.db.put(b"a", b"c") + self.assertEqual(b"b", secondary.get(b"a")) + self.assertEqual(b"b", secondary2.get(b"a")) + self.assertEqual(b"c", self.db.get(b"a")) + secondary.try_catch_up_with_primary() + secondary2.try_catch_up_with_primary() + self.assertEqual(b"c", secondary.get(b"a")) + self.assertEqual(b"c", secondary2.get(b"a")) + def test_multi_get(self): self.db.put(b"a", b"1") self.db.put(b"b", b"2") @@ -97,6 +117,18 @@ def test_write_batch(self): ret = self.db.multi_get([b'key', b'a']) self.assertEqual(ref, ret) + def test_write_batch_context(self): + with self.db.write_batch() as batch: + batch.put(b"key", b"v1") + batch.delete(b"key") + batch.put(b"key", b"v2") + batch.put(b"key", b"v3") + batch.put(b"a", b"b") + + ref = {b'a': b'b', b'key': b'v3'} + ret = self.db.multi_get([b'key', b'a']) + self.assertEqual(ref, ret) + def test_write_batch_iter(self): batch = rocksdb.WriteBatch() self.assertEqual([], list(batch)) @@ -120,7 +152,6 @@ def test_write_batch_iter(self): ] self.assertEqual(ref, list(it)) - def test_key_may_exists(self): self.db.put(b"a", b'1') @@ -174,7 +205,6 @@ def test_seek_for_prev(self): it.seek_for_prev(b'c3') self.assertEqual(it.get(), (b'c2', b'c2_value')) - def test_iter_keys(self): for x in range(300): self.db.put(int_to_bytes(x), int_to_bytes(x)) @@ -457,6 +487,7 @@ def in_domain(self, src): def in_range(self, dst): return len(dst) == 5 + class TestPrefixExtractor(TestHelper): def setUp(self): TestHelper.setUp(self) @@ -687,15 +718,29 @@ def test_snapshot(self): self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it)) def test_get_property(self): + secondary_location = os.path.join(self.db_loc, "secondary") + cf = { + b'A': rocksdb.ColumnFamilyOptions(), + b'B': rocksdb.ColumnFamilyOptions() + } + secondary = rocksdb.get_db_with_options( + os.path.join(self.db_loc, "test"), create_if_missing=True, max_open_files=-1, + secondary_name=secondary_location, column_families=cf + ) + self.addCleanup(secondary.close) + for x in range(300): x = int_to_bytes(x) self.db.put((self.cf_a, x), x) - self.assertEqual(b"300", - self.db.get_property(b'rocksdb.estimate-num-keys', - self.cf_a)) - self.assertIsNone(self.db.get_property(b'does not exsits', - self.cf_a)) + self.assertIsNone(self.db.get_property(b'does not exsits', self.cf_a)) + self.assertEqual(b"0", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A'))) + self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a)) + + secondary.try_catch_up_with_primary() + + self.assertEqual(b"300", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A'))) + self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a)) def test_compact_range(self): for x in range(10000): @@ -704,3 +749,173 @@ def test_compact_range(self): self.db.compact_range(column_family=self.cf_b) + +class OneCharacterPrefix(rocksdb.interfaces.SliceTransform): + def name(self): + return b'test prefix' + + def transform(self, src): + return (0, 1) + + def in_domain(self, src): + return len(src) >= 1 + + def in_range(self, dst): + return len(dst) == 1 + + +class TestPrefixIterator(TestHelper): + def setUp(self): + TestHelper.setUp(self) + opts = rocksdb.Options(create_if_missing=True) + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) + + def test_iterator(self): + self.db.put(b'a0', b'a0_value') + self.db.put(b'a1', b'a1_value') + self.db.put(b'a1b', b'a1b_value') + self.db.put(b'a2b', b'a2b_value') + self.db.put(b'a3', b'a3_value') + self.db.put(b'a4', b'a4_value') + self.db.put(b'b0', b'b0_value') + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', prefix_same_as_start=True)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', prefix_same_as_start=True, include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False)) + ) + + +class TestPrefixIteratorWithExtractor(TestHelper): + def setUp(self): + TestHelper.setUp(self) + opts = rocksdb.Options(create_if_missing=True) + opts.prefix_extractor = OneCharacterPrefix() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) + + def test_iterator(self): + self.db.put(b'a0', b'a0_value') + self.db.put(b'a1', b'a1_value') + self.db.put(b'a1b', b'a1b_value') + self.db.put(b'a2b', b'a2b_value') + self.db.put(b'a3', b'a3_value') + self.db.put(b'a4', b'a4_value') + self.db.put(b'b0', b'b0_value') + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(self.db.iterator(start=b'a', prefix_same_as_start=True)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', include_value=False, prefix_same_as_start=True)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False)) + ) + + def test_column_family_iterator(self): + cf_a = self.db.create_column_family(b'first', rocksdb.ColumnFamilyOptions()) + cf_b = self.db.create_column_family(b'second', rocksdb.ColumnFamilyOptions()) + + self.db.put((cf_a, b'a0'), b'a0_value') + self.db.put((cf_a, b'a1'), b'a1_value') + self.db.put((cf_a, b'a1b'), b'a1b_value') + self.db.put((cf_a, b'a2b'), b'a2b_value') + self.db.put((cf_a, b'a3'), b'a3_value') + self.db.put((cf_a, b'a4'), b'a4_value') + self.db.put((cf_b, b'b0'), b'b0_value') + + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(map(lambda x: (x[0][-1], x[1]), self.db.iterator(column_family=b'first', start=b'a', prefix_same_as_start=True))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a', include_value=False, prefix_same_as_start=True))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(map(lambda x: x[-1], + reversed(self.db.iterator( + column_family=b'first', start=b'a0', iterate_upper_bound=b'a5', include_value=False + )))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a4', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(map(lambda x: x[-1], self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(map(lambda x: x[-1], reversed( + self.db.iterator(column_family=b'first', start=b'a0', iterate_upper_bound=b'a2', include_value=False)))) + ) + self.assertListEqual( + [b'b0'], + list(map(lambda x: x[-1], self.db.iterator(column_family=b'second', start=b'b', include_value=False))) + )