forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocument_db.cc
1207 lines (1094 loc) · 39.6 KB
/
document_db.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/document_db.h"
#include "rocksdb/cache.h"
#include "rocksdb/table.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/json_document.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "port/port.h"
namespace rocksdb {
// IMPORTANT NOTE: Secondary index column families should be very small and
// generally fit in memory. Assume that accessing secondary index column
// families is much faster than accessing primary index (data heap) column
// family. Accessing a key (i.e. checking for existence) from a column family in
// RocksDB is not much faster than accessing both key and value since they are
// kept together and loaded from storage together.
namespace {
// < 0 <=> lhs < rhs
// == 0 <=> lhs == rhs
// > 0 <=> lhs == rhs
// TODO(icanadi) move this to JSONDocument?
int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) {
assert(lhs.IsObject() == false && rhs.IsObject() == false &&
lhs.type() == rhs.type());
switch (lhs.type()) {
case JSONDocument::kNull:
return 0;
case JSONDocument::kBool:
return static_cast<int>(lhs.GetBool()) - static_cast<int>(rhs.GetBool());
case JSONDocument::kDouble: {
double res = lhs.GetDouble() - rhs.GetDouble();
return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1);
}
case JSONDocument::kInt64: {
int64_t res = lhs.GetInt64() - rhs.GetInt64();
return res == 0 ? 0 : (res < 0 ? -1 : 1);
}
case JSONDocument::kString:
return Slice(lhs.GetString()).compare(Slice(rhs.GetString()));
default:
assert(false);
}
return 0;
}
} // namespace
class Filter {
public:
// returns nullptr on parse failure
static Filter* ParseFilter(const JSONDocument& filter);
struct Interval {
JSONDocument upper_bound;
JSONDocument lower_bound;
bool upper_inclusive;
bool lower_inclusive;
Interval()
: upper_bound(),
lower_bound(),
upper_inclusive(false),
lower_inclusive(false) {}
Interval(const JSONDocument& ub, const JSONDocument& lb, bool ui, bool li)
: upper_bound(ub),
lower_bound(lb),
upper_inclusive(ui),
lower_inclusive(li) {
}
void UpdateUpperBound(const JSONDocument& ub, bool inclusive);
void UpdateLowerBound(const JSONDocument& lb, bool inclusive);
};
bool SatisfiesFilter(const JSONDocument& document) const;
const Interval* GetInterval(const std::string& field) const;
private:
explicit Filter(const JSONDocument& filter) : filter_(filter.Copy()) {
assert(filter_.IsOwner());
}
// copied from the parameter
const JSONDocument filter_;
// constant after construction
std::unordered_map<std::string, Interval> intervals_;
};
void Filter::Interval::UpdateUpperBound(const JSONDocument& ub,
bool inclusive) {
bool update = upper_bound.IsNull();
if (!update) {
int cmp = DocumentCompare(upper_bound, ub);
update = (cmp > 0) || (cmp == 0 && !inclusive);
}
if (update) {
upper_bound = ub;
upper_inclusive = inclusive;
}
}
void Filter::Interval::UpdateLowerBound(const JSONDocument& lb,
bool inclusive) {
bool update = lower_bound.IsNull();
if (!update) {
int cmp = DocumentCompare(lower_bound, lb);
update = (cmp < 0) || (cmp == 0 && !inclusive);
}
if (update) {
lower_bound = lb;
lower_inclusive = inclusive;
}
}
Filter* Filter::ParseFilter(const JSONDocument& filter) {
if (filter.IsObject() == false) {
return nullptr;
}
std::unique_ptr<Filter> f(new Filter(filter));
for (const auto& items : f->filter_.Items()) {
if (items.first.size() && items.first[0] == '$') {
// fields starting with '$' are commands
continue;
}
assert(f->intervals_.find(items.first) == f->intervals_.end());
if (items.second.IsObject()) {
if (items.second.Count() == 0) {
// uhm...?
return nullptr;
}
Interval interval;
for (const auto& condition : items.second.Items()) {
if (condition.second.IsObject() || condition.second.IsArray()) {
// comparison operators not defined on objects. invalid array
return nullptr;
}
// comparison operators:
if (condition.first == "$gt") {
interval.UpdateLowerBound(condition.second, false);
} else if (condition.first == "$gte") {
interval.UpdateLowerBound(condition.second, true);
} else if (condition.first == "$lt") {
interval.UpdateUpperBound(condition.second, false);
} else if (condition.first == "$lte") {
interval.UpdateUpperBound(condition.second, true);
} else {
// TODO(icanadi) more logical operators
return nullptr;
}
}
f->intervals_.insert({items.first, interval});
} else {
// equality
f->intervals_.insert(
{items.first, Interval(items.second,
items.second, true, true)});
}
}
return f.release();
}
const Filter::Interval* Filter::GetInterval(const std::string& field) const {
auto itr = intervals_.find(field);
if (itr == intervals_.end()) {
return nullptr;
}
// we can do that since intervals_ is constant after construction
return &itr->second;
}
bool Filter::SatisfiesFilter(const JSONDocument& document) const {
for (const auto& interval : intervals_) {
if (!document.Contains(interval.first)) {
// doesn't have the value, doesn't satisfy the filter
// (we don't support null queries yet)
return false;
}
auto value = document[interval.first];
if (!interval.second.upper_bound.IsNull()) {
if (value.type() != interval.second.upper_bound.type()) {
// no cross-type queries yet
// TODO(icanadi) do this at least for numbers!
return false;
}
int cmp = DocumentCompare(interval.second.upper_bound, value);
if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) {
// bigger (or equal) than upper bound
return false;
}
}
if (!interval.second.lower_bound.IsNull()) {
if (value.type() != interval.second.lower_bound.type()) {
// no cross-type queries yet
return false;
}
int cmp = DocumentCompare(interval.second.lower_bound, value);
if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) {
// smaller (or equal) than the lower bound
return false;
}
}
}
return true;
}
class Index {
public:
Index() = default;
virtual ~Index() {}
virtual const char* Name() const = 0;
// Functions that are executed during write time
// ---------------------------------------------
// GetIndexKey() generates a key that will be used to index document and
// returns the key though the second std::string* parameter
virtual void GetIndexKey(const JSONDocument& document,
std::string* key) const = 0;
// Keys generated with GetIndexKey() will be compared using this comparator.
// It should be assumed that there will be a suffix added to the index key
// according to IndexKey implementation
virtual const Comparator* GetComparator() const = 0;
// Functions that are executed during query time
// ---------------------------------------------
enum Direction {
kForwards,
kBackwards,
};
// Returns true if this index can provide some optimization for satisfying
// filter. False otherwise
virtual bool UsefulIndex(const Filter& filter) const = 0;
// For every filter (assuming UsefulIndex()) there is a continuous interval of
// keys in the index that satisfy the index conditions. That interval can be
// three things:
// * [A, B]
// * [A, infinity>
// * <-infinity, B]
//
// Query engine that uses this Index for optimization will access the interval
// by first calling Position() and then iterating in the Direction (returned
// by Position()) while ShouldContinueLooking() is true.
// * For [A, B] interval Position() will Seek() to A and return kForwards.
// ShouldContinueLooking() will be true until the iterator value gets beyond B
// -- then it will return false
// * For [A, infinity> Position() will Seek() to A and return kForwards.
// ShouldContinueLooking() will always return true
// * For <-infinity, B] Position() will Seek() to B and return kBackwards.
// ShouldContinueLooking() will always return true (given that iterator is
// advanced by calling Prev())
virtual Direction Position(const Filter& filter,
Iterator* iterator) const = 0;
virtual bool ShouldContinueLooking(const Filter& filter,
const Slice& secondary_key,
Direction direction) const = 0;
// Static function that is executed when Index is created
// ---------------------------------------------
// Create Index from user-supplied description. Return nullptr on parse
// failure.
static Index* CreateIndexFromDescription(const JSONDocument& description,
const std::string& name);
private:
// No copying allowed
Index(const Index&);
void operator=(const Index&);
};
// Encoding helper function
namespace {
std::string InternalSecondaryIndexName(const std::string& user_name) {
return "index_" + user_name;
}
// Don't change these, they are persisted in secondary indexes
enum JSONPrimitivesEncoding : char {
kNull = 0x1,
kBool = 0x2,
kDouble = 0x3,
kInt64 = 0x4,
kString = 0x5,
};
// encodes simple JSON members (meaning string, integer, etc)
// the end result of this will be lexicographically compared to each other
bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) {
// TODO(icanadi) revise this at some point, have a custom comparator
switch (json.type()) {
case JSONDocument::kNull:
dst->push_back(kNull);
break;
case JSONDocument::kBool:
dst->push_back(kBool);
dst->push_back(static_cast<char>(json.GetBool()));
break;
case JSONDocument::kDouble:
dst->push_back(kDouble);
PutFixed64(dst, static_cast<uint64_t>(json.GetDouble()));
break;
case JSONDocument::kInt64:
dst->push_back(kInt64);
{
auto val = json.GetInt64();
dst->push_back((val < 0) ? '0' : '1');
PutFixed64(dst, static_cast<uint64_t>(val));
}
break;
case JSONDocument::kString:
dst->push_back(kString);
dst->append(json.GetString());
break;
default:
return false;
}
return true;
}
} // namespace
// format of the secondary key is:
// <secondary_key><primary_key><offset_of_primary_key uint32_t>
class IndexKey {
public:
IndexKey() : ok_(false) {}
explicit IndexKey(const Slice& slice) {
if (slice.size() < sizeof(uint32_t)) {
ok_ = false;
return;
}
uint32_t primary_key_offset =
DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t));
if (primary_key_offset >= slice.size() - sizeof(uint32_t)) {
ok_ = false;
return;
}
parts_[0] = Slice(slice.data(), primary_key_offset);
parts_[1] = Slice(slice.data() + primary_key_offset,
slice.size() - primary_key_offset - sizeof(uint32_t));
ok_ = true;
}
IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) {
parts_[0] = secondary_key;
parts_[1] = primary_key;
}
SliceParts GetSliceParts() {
uint32_t primary_key_offset = static_cast<uint32_t>(parts_[0].size());
EncodeFixed32(primary_key_offset_buf_, primary_key_offset);
parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t));
return SliceParts(parts_, 3);
}
const Slice& GetPrimaryKey() const { return parts_[1]; }
const Slice& GetSecondaryKey() const { return parts_[0]; }
bool ok() const { return ok_; }
private:
bool ok_;
// 0 -- secondary key
// 1 -- primary key
// 2 -- primary key offset
Slice parts_[3];
char primary_key_offset_buf_[sizeof(uint32_t)];
};
class SimpleSortedIndex : public Index {
public:
SimpleSortedIndex(const std::string& field, const std::string& name)
: field_(field), name_(name) {}
virtual const char* Name() const override { return name_.c_str(); }
virtual void GetIndexKey(const JSONDocument& document, std::string* key) const
override {
if (!document.Contains(field_)) {
if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) {
assert(false);
}
} else {
if (!EncodeJSONPrimitive(document[field_], key)) {
assert(false);
}
}
}
virtual const Comparator* GetComparator() const override {
return BytewiseComparator();
}
virtual bool UsefulIndex(const Filter& filter) const override {
return filter.GetInterval(field_) != nullptr;
}
// REQUIRES: UsefulIndex(filter) == true
virtual Direction Position(const Filter& filter,
Iterator* iterator) const override {
auto interval = filter.GetInterval(field_);
assert(interval != nullptr); // because index is useful
Direction direction;
const JSONDocument* limit;
if (!interval->lower_bound.IsNull()) {
limit = &(interval->lower_bound);
direction = kForwards;
} else {
limit = &(interval->upper_bound);
direction = kBackwards;
}
std::string encoded_limit;
if (!EncodeJSONPrimitive(*limit, &encoded_limit)) {
assert(false);
}
iterator->Seek(Slice(encoded_limit));
return direction;
}
// REQUIRES: UsefulIndex(filter) == true
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable : 4702) // Unreachable code
#endif
virtual bool ShouldContinueLooking(
const Filter& filter, const Slice& secondary_key,
Index::Direction direction) const override {
auto interval = filter.GetInterval(field_);
assert(interval != nullptr); // because index is useful
if (direction == kForwards) {
if (interval->upper_bound.IsNull()) {
// continue looking, no upper bound
return true;
}
std::string encoded_upper_bound;
if (!EncodeJSONPrimitive(interval->upper_bound, &encoded_upper_bound)) {
// uhm...?
// TODO(icanadi) store encoded upper and lower bounds in Filter*?
assert(false);
}
// TODO(icanadi) we need to somehow decode this and use DocumentCompare()
int compare = secondary_key.compare(Slice(encoded_upper_bound));
// if (current key is bigger than upper bound) OR (current key is equal to
// upper bound, but inclusive is false) THEN stop looking. otherwise,
// continue
return (compare > 0 ||
(compare == 0 && interval->upper_inclusive == false))
? false
: true;
} else {
assert(direction == kBackwards);
if (interval->lower_bound.IsNull()) {
// continue looking, no lower bound
return true;
}
std::string encoded_lower_bound;
if (!EncodeJSONPrimitive(interval->lower_bound, &encoded_lower_bound)) {
// uhm...?
// TODO(icanadi) store encoded upper and lower bounds in Filter*?
assert(false);
}
// TODO(icanadi) we need to somehow decode this and use DocumentCompare()
int compare = secondary_key.compare(Slice(encoded_lower_bound));
// if (current key is smaller than lower bound) OR (current key is equal
// to lower bound, but inclusive is false) THEN stop looking. otherwise,
// continue
return (compare < 0 ||
(compare == 0 && interval->lower_inclusive == false))
? false
: true;
}
assert(false);
// this is here just so compiler doesn't complain
return false;
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
private:
std::string field_;
std::string name_;
};
Index* Index::CreateIndexFromDescription(const JSONDocument& description,
const std::string& name) {
if (!description.IsObject() || description.Count() != 1) {
// not supported yet
return nullptr;
}
const auto& field = *description.Items().begin();
if (field.second.IsInt64() == false || field.second.GetInt64() != 1) {
// not supported yet
return nullptr;
}
return new SimpleSortedIndex(field.first, name);
}
class CursorWithFilterIndexed : public Cursor {
public:
CursorWithFilterIndexed(Iterator* primary_index_iter,
Iterator* secondary_index_iter, const Index* index,
const Filter* filter)
: primary_index_iter_(primary_index_iter),
secondary_index_iter_(secondary_index_iter),
index_(index),
filter_(filter),
valid_(true),
current_json_document_(nullptr) {
assert(filter_.get() != nullptr);
direction_ = index->Position(*filter_.get(), secondary_index_iter_.get());
UpdateIndexKey();
AdvanceUntilSatisfies();
}
virtual bool Valid() const override {
return valid_ && secondary_index_iter_->Valid();
}
virtual void Next() override {
assert(Valid());
Advance();
AdvanceUntilSatisfies();
}
// temporary object. copy it if you want to use it
virtual const JSONDocument& document() const override {
assert(Valid());
return *current_json_document_;
}
virtual Status status() const override {
if (!status_.ok()) {
return status_;
}
if (!primary_index_iter_->status().ok()) {
return primary_index_iter_->status();
}
return secondary_index_iter_->status();
}
private:
void Advance() {
if (direction_ == Index::kForwards) {
secondary_index_iter_->Next();
} else {
secondary_index_iter_->Prev();
}
UpdateIndexKey();
}
void AdvanceUntilSatisfies() {
bool found = false;
while (secondary_index_iter_->Valid() &&
index_->ShouldContinueLooking(
*filter_.get(), index_key_.GetSecondaryKey(), direction_)) {
if (!UpdateJSONDocument()) {
// corruption happened
return;
}
if (filter_->SatisfiesFilter(*current_json_document_)) {
// we found satisfied!
found = true;
break;
} else {
// doesn't satisfy :(
Advance();
}
}
if (!found) {
valid_ = false;
}
}
bool UpdateJSONDocument() {
assert(secondary_index_iter_->Valid());
primary_index_iter_->Seek(index_key_.GetPrimaryKey());
if (!primary_index_iter_->Valid()) {
status_ = Status::Corruption(
"Inconsistency between primary and secondary index");
valid_ = false;
return false;
}
current_json_document_.reset(
JSONDocument::Deserialize(primary_index_iter_->value()));
assert(current_json_document_->IsOwner());
if (current_json_document_.get() == nullptr) {
status_ = Status::Corruption("JSON deserialization failed");
valid_ = false;
return false;
}
return true;
}
void UpdateIndexKey() {
if (secondary_index_iter_->Valid()) {
index_key_ = IndexKey(secondary_index_iter_->key());
if (!index_key_.ok()) {
status_ = Status::Corruption("Invalid index key");
valid_ = false;
}
}
}
std::unique_ptr<Iterator> primary_index_iter_;
std::unique_ptr<Iterator> secondary_index_iter_;
// we don't own index_
const Index* index_;
Index::Direction direction_;
std::unique_ptr<const Filter> filter_;
bool valid_;
IndexKey index_key_;
std::unique_ptr<JSONDocument> current_json_document_;
Status status_;
};
class CursorFromIterator : public Cursor {
public:
explicit CursorFromIterator(Iterator* iter)
: iter_(iter), current_json_document_(nullptr) {
iter_->SeekToFirst();
UpdateCurrentJSON();
}
virtual bool Valid() const override { return status_.ok() && iter_->Valid(); }
virtual void Next() override {
iter_->Next();
UpdateCurrentJSON();
}
virtual const JSONDocument& document() const override {
assert(Valid());
return *current_json_document_;
};
virtual Status status() const override {
if (!status_.ok()) {
return status_;
}
return iter_->status();
}
// not part of public Cursor interface
Slice key() const { return iter_->key(); }
private:
void UpdateCurrentJSON() {
if (Valid()) {
current_json_document_.reset(JSONDocument::Deserialize(iter_->value()));
if (current_json_document_.get() == nullptr) {
status_ = Status::Corruption("JSON deserialization failed");
}
}
}
Status status_;
std::unique_ptr<Iterator> iter_;
std::unique_ptr<JSONDocument> current_json_document_;
};
class CursorWithFilter : public Cursor {
public:
CursorWithFilter(Cursor* base_cursor, const Filter* filter)
: base_cursor_(base_cursor), filter_(filter) {
assert(filter_.get() != nullptr);
SeekToNextSatisfies();
}
virtual bool Valid() const override { return base_cursor_->Valid(); }
virtual void Next() override {
assert(Valid());
base_cursor_->Next();
SeekToNextSatisfies();
}
virtual const JSONDocument& document() const override {
assert(Valid());
return base_cursor_->document();
}
virtual Status status() const override { return base_cursor_->status(); }
private:
void SeekToNextSatisfies() {
for (; base_cursor_->Valid(); base_cursor_->Next()) {
if (filter_->SatisfiesFilter(base_cursor_->document())) {
break;
}
}
}
std::unique_ptr<Cursor> base_cursor_;
std::unique_ptr<const Filter> filter_;
};
class CursorError : public Cursor {
public:
explicit CursorError(Status s) : s_(s) { assert(!s.ok()); }
virtual Status status() const override { return s_; }
virtual bool Valid() const override { return false; }
virtual void Next() override {}
virtual const JSONDocument& document() const override {
assert(false);
// compiler complains otherwise
return trash_;
}
private:
Status s_;
JSONDocument trash_;
};
class DocumentDBImpl : public DocumentDB {
public:
DocumentDBImpl(
DB* db, ColumnFamilyHandle* primary_key_column_family,
const std::vector<std::pair<Index*, ColumnFamilyHandle*>>& indexes,
const Options& rocksdb_options)
: DocumentDB(db),
primary_key_column_family_(primary_key_column_family),
rocksdb_options_(rocksdb_options) {
for (const auto& index : indexes) {
name_to_index_.insert(
{index.first->Name(), IndexColumnFamily(index.first, index.second)});
}
}
~DocumentDBImpl() {
for (auto& iter : name_to_index_) {
delete iter.second.index;
delete iter.second.column_family;
}
delete primary_key_column_family_;
}
virtual Status CreateIndex(const WriteOptions& write_options,
const IndexDescriptor& index) override {
auto index_obj =
Index::CreateIndexFromDescription(*index.description, index.name);
if (index_obj == nullptr) {
return Status::InvalidArgument("Failed parsing index description");
}
ColumnFamilyHandle* cf_handle;
Status s =
CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_),
InternalSecondaryIndexName(index.name), &cf_handle);
if (!s.ok()) {
delete index_obj;
return s;
}
MutexLock l(&write_mutex_);
std::unique_ptr<CursorFromIterator> cursor(new CursorFromIterator(
DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_)));
WriteBatch batch;
for (; cursor->Valid(); cursor->Next()) {
std::string secondary_index_key;
index_obj->GetIndexKey(cursor->document(), &secondary_index_key);
IndexKey index_key(Slice(secondary_index_key), cursor->key());
batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts());
}
if (!cursor->status().ok()) {
delete index_obj;
return cursor->status();
}
{
MutexLock l_nti(&name_to_index_mutex_);
name_to_index_.insert(
{index.name, IndexColumnFamily(index_obj, cf_handle)});
}
return DocumentDB::Write(write_options, &batch);
}
virtual Status DropIndex(const std::string& name) override {
MutexLock l(&write_mutex_);
auto index_iter = name_to_index_.find(name);
if (index_iter == name_to_index_.end()) {
return Status::InvalidArgument("No such index");
}
Status s = DropColumnFamily(index_iter->second.column_family);
if (!s.ok()) {
return s;
}
delete index_iter->second.index;
delete index_iter->second.column_family;
// remove from name_to_index_
{
MutexLock l_nti(&name_to_index_mutex_);
name_to_index_.erase(index_iter);
}
return Status::OK();
}
virtual Status Insert(const WriteOptions& options,
const JSONDocument& document) override {
WriteBatch batch;
if (!document.IsObject()) {
return Status::InvalidArgument("Document not an object");
}
if (!document.Contains(kPrimaryKey)) {
return Status::InvalidArgument("No primary key");
}
auto primary_key = document[kPrimaryKey];
if (primary_key.IsNull() ||
(!primary_key.IsString() && !primary_key.IsInt64())) {
return Status::InvalidArgument(
"Primary key format error");
}
std::string encoded_document;
document.Serialize(&encoded_document);
std::string primary_key_encoded;
if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
// previous call should be guaranteed to pass because of all primary_key
// conditions checked before
assert(false);
}
Slice primary_key_slice(primary_key_encoded);
// Lock now, since we're starting DB operations
MutexLock l(&write_mutex_);
// check if there is already a document with the same primary key
PinnableSlice value;
Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_,
primary_key_slice, &value);
if (!s.IsNotFound()) {
return s.ok() ? Status::InvalidArgument("Duplicate primary key!") : s;
}
batch.Put(primary_key_column_family_, primary_key_slice, encoded_document);
for (const auto& iter : name_to_index_) {
std::string secondary_index_key;
iter.second.index->GetIndexKey(document, &secondary_index_key);
IndexKey index_key(Slice(secondary_index_key), primary_key_slice);
batch.Put(iter.second.column_family, index_key.GetSliceParts(),
SliceParts());
}
return DocumentDB::Write(options, &batch);
}
virtual Status Remove(const ReadOptions& read_options,
const WriteOptions& write_options,
const JSONDocument& query) override {
MutexLock l(&write_mutex_);
std::unique_ptr<Cursor> cursor(
ConstructFilterCursor(read_options, nullptr, query));
WriteBatch batch;
for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) {
const auto& document = cursor->document();
if (!document.IsObject()) {
return Status::Corruption("Document corruption");
}
if (!document.Contains(kPrimaryKey)) {
return Status::Corruption("Document corruption");
}
auto primary_key = document[kPrimaryKey];
if (primary_key.IsNull() ||
(!primary_key.IsString() && !primary_key.IsInt64())) {
return Status::Corruption("Document corruption");
}
// TODO(icanadi) Instead of doing this, just get primary key encoding from
// cursor, as it already has this information
std::string primary_key_encoded;
if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
// previous call should be guaranteed to pass because of all primary_key
// conditions checked before
assert(false);
}
Slice primary_key_slice(primary_key_encoded);
batch.Delete(primary_key_column_family_, primary_key_slice);
for (const auto& iter : name_to_index_) {
std::string secondary_index_key;
iter.second.index->GetIndexKey(document, &secondary_index_key);
IndexKey index_key(Slice(secondary_index_key), primary_key_slice);
batch.Delete(iter.second.column_family, index_key.GetSliceParts());
}
}
if (!cursor->status().ok()) {
return cursor->status();
}
return DocumentDB::Write(write_options, &batch);
}
virtual Status Update(const ReadOptions& read_options,
const WriteOptions& write_options,
const JSONDocument& filter,
const JSONDocument& updates) override {
MutexLock l(&write_mutex_);
std::unique_ptr<Cursor> cursor(
ConstructFilterCursor(read_options, nullptr, filter));
if (!updates.IsObject()) {
return Status::Corruption("Bad update document format");
}
WriteBatch batch;
for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) {
const auto& old_document = cursor->document();
JSONDocument new_document(old_document);
if (!new_document.IsObject()) {
return Status::Corruption("Document corruption");
}
// TODO(icanadi) Make this nicer, something like class Filter
for (const auto& update : updates.Items()) {
if (update.first == "$set") {
JSONDocumentBuilder builder;
bool res __attribute__((__unused__)) = builder.WriteStartObject();
assert(res);
for (const auto& itr : update.second.Items()) {
if (itr.first == kPrimaryKey) {
return Status::NotSupported("Please don't change primary key");
}
res = builder.WriteKeyValue(itr.first, itr.second);
assert(res);
}
res = builder.WriteEndObject();
assert(res);
JSONDocument update_document = builder.GetJSONDocument();
builder.Reset();
res = builder.WriteStartObject();
assert(res);
for (const auto& itr : new_document.Items()) {
if (update_document.Contains(itr.first)) {
res = builder.WriteKeyValue(itr.first,
update_document[itr.first]);
} else {
res = builder.WriteKeyValue(itr.first, new_document[itr.first]);
}
assert(res);
}
res = builder.WriteEndObject();
assert(res);
new_document = builder.GetJSONDocument();
assert(new_document.IsOwner());
} else {
// TODO(icanadi) more commands
return Status::InvalidArgument("Can't understand update command");
}
}
// TODO(icanadi) reuse some of this code
if (!new_document.Contains(kPrimaryKey)) {
return Status::Corruption("Corrupted document -- primary key missing");
}
auto primary_key = new_document[kPrimaryKey];
if (primary_key.IsNull() ||
(!primary_key.IsString() && !primary_key.IsInt64())) {
// This will happen when document on storage doesn't have primary key,
// since we don't support any update operations on primary key. That's
// why this is corruption error
return Status::Corruption("Corrupted document -- primary key missing");
}
std::string encoded_document;
new_document.Serialize(&encoded_document);
std::string primary_key_encoded;
if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
// previous call should be guaranteed to pass because of all primary_key
// conditions checked before
assert(false);
}
Slice primary_key_slice(primary_key_encoded);
batch.Put(primary_key_column_family_, primary_key_slice,
encoded_document);
for (const auto& iter : name_to_index_) {
std::string old_key, new_key;
iter.second.index->GetIndexKey(old_document, &old_key);
iter.second.index->GetIndexKey(new_document, &new_key);
if (old_key == new_key) {
// don't need to update this secondary index
continue;
}
IndexKey old_index_key(Slice(old_key), primary_key_slice);
IndexKey new_index_key(Slice(new_key), primary_key_slice);
batch.Delete(iter.second.column_family, old_index_key.GetSliceParts());
batch.Put(iter.second.column_family, new_index_key.GetSliceParts(),
SliceParts());
}
}