forked from estraier/tkrzw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tkrzw_dbm_async.h
742 lines (671 loc) · 28.3 KB
/
tkrzw_dbm_async.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
/*************************************************************************************************
* Asynchronous database manager adapter
*
* Copyright 2020 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*************************************************************************************************/
#ifndef _TKRZW_DBM_ASYNC_H
#define _TKRZW_DBM_ASYNC_H
#include <functional>
#include <future>
#include <initializer_list>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <typeinfo>
#include <utility>
#include <vector>
#include <cinttypes>
#include "tkrzw_dbm.h"
#include "tkrzw_thread_util.h"
namespace tkrzw {
/**
* Asynchronous database manager adapter.
* @details This class is a wrapper of DBM for asynchronous operations. A task queue with a
* thread pool is used inside. Every method except for the constructor and the destructor is
* run by a thread in the thread pool and the result is set in the future oject of the return
* value. The caller can ignore the future object if it is not necessary. The destructor of
* this asynchronous database manager waits for all tasks to be done. Therefore, the destructor
* should be called before the database is closed.
*/
class AsyncDBM final {
public:
/**
* Interface of a common post processor for a record.
*/
class CommonPostprocessor {
public:
/**
* Destructor.
*/
virtual ~CommonPostprocessor() = default;
/**
* Processes the status of a database operation.
* @param name The method name of the database operation.
* @param status The status of the database operation.
*/
virtual void Postprocess(const char* name, const Status& status) {}
};
/**
* Lambda function type to postprocess a database operation.
* @details The first parameter is the method name of the database operation. The second
* parameter is the status of the database operation.
*/
typedef std::function<void(const char*, const Status&)> CommonPostLambdaType;
/**
* Interface of asynchronous processor for a record.
*/
class RecordProcessor : public DBM::RecordProcessor {
public:
/**
* Destructor.
*/
virtual ~RecordProcessor() = default;
/**
* Processes the status of the database operation.
* @param status The status of the database operation.
* @details This is called just after the database operation.
*/
virtual void ProcessStatus(const Status& status) {}
};
/**
* Constructor.
* @param dbm A database object which has been opened. The ownership is not taken.
* @param num_worker_threads The number of threads in the internal thread pool.
*/
AsyncDBM(DBM* dbm, int32_t num_worker_threads);
/**
* Destructor.
*/
~AsyncDBM();
/**
* Copy and assignment are disabled.
*/
explicit AsyncDBM(const AsyncDBM& rhs) = delete;
AsyncDBM& operator =(const AsyncDBM& rhs) = delete;
/**
* Set the common post processor.
* @param proc the common post processor. If it is nullptr, no postprocess is done. The
* ownership is taken.
* @details By default or if nullptr is set, no postprocess is done. If a processor is set,
* its Postprocess method is called every time in parallel after every method is called.
*/
void SetCommonPostprocessor(std::unique_ptr<CommonPostprocessor> proc);
/**
* Set the common post processor with a lambda function.
* @param post_lambda the lambda function of the common post processor.
* @details By default, no postprocess is done. If a processor is set, it is called every time
* in parallel after every method is called.
*/
void SetCommonPostprocessor(CommonPostLambdaType post_lambda) {
class PostprocessorLambda : public CommonPostprocessor {
public:
explicit PostprocessorLambda(CommonPostLambdaType post_lambda)
: post_lambda_(post_lambda) {}
void Postprocess(const char* name, const Status& status) override {
post_lambda_(name, status);
}
private:
CommonPostLambdaType post_lambda_;
};
SetCommonPostprocessor(std::make_unique<PostprocessorLambda>(post_lambda));
}
/**
* Processes a record with a processor.
* @param key The key of the record.
* @param proc The processor object derived from RecordProcessor. The ownership is taken.
* @param writable True if the processor can edit the record.
* @return The result status and the same processor object as the parameter.
* @details If the specified record exists, the ProcessFull of the processor is called.
* Otherwise, the ProcessEmpty of the processor is called.
*/
template <typename PROC>
std::future<std::pair<Status, std::unique_ptr<PROC>>> Process(
std::string_view key, std::unique_ptr<PROC> proc, bool writable);
/**
* Processes a record with a lambda function.
* @param key The key of the record.
* @param rec_lambda The lambda function to process a record. The first parameter is the key
* of the record. The second parameter is the value of the existing record, or NOOP if it the
* record doesn't exist. The return value is a string reference to NOOP, REMOVE, or the new
* record value.
* @param writable True if the processor can edit the record.
* @return The result status.
*/
std::future<Status> Process(std::string_view key, DBM::RecordLambdaType rec_lambda,
bool writable);
/**
* Gets the value of a record of a key.
* @param key The key of the record.
* @return The result status and the result value. If there's no matching record,
* NOT_FOUND_ERROR is returned.
*/
std::future<std::pair<Status, std::string>> Get(std::string_view key);
/**
* Gets the values of multiple records of keys, with a string view vector.
* @param keys The keys of records to retrieve.
* @return The result status and a map of retrieved records. Keys which don't match existing
* records are ignored. If all records of the given keys are found, SUCCESS is returned.
* If one or more records are missing, NOT_FOUND_ERROR is returned. Thus, even with an error
* code, the result map can have elements.
*/
std::future<std::pair<Status, std::map<std::string, std::string>>> GetMulti(
const std::vector<std::string_view>& keys);
/**
* Gets the values of multiple records of keys, with a string vector.
* @param keys The keys of records to retrieve.
* @return The result status and a map of retrieved records. Keys which don't match existing
* records are ignored. If all records of the given keys are found, SUCCESS is returned.
* If one or more records are missing, NOT_FOUND_ERROR is returned. Thus, even with an error
* code, the result map can have elements.
*/
std::future<std::pair<Status, std::map<std::string, std::string>>> GetMulti(
const std::vector<std::string>& keys) {
return GetMulti(MakeStrViewVectorFromValues(keys));
}
/**
* Sets a record of a key and a value.
* @param key The key of the record.
* @param value The value of the record.
* @param overwrite Whether to overwrite the existing value if there's a record with the same
* key. If true, the existing value is overwritten by the new value. If false, the operation
* is given up and an error status is returned.
* @return The result status. If overwriting is abandoned, DUPLICATION_ERROR is returned.
*/
std::future<Status> Set(std::string_view key, std::string_view value, bool overwrite = true);
/**
* Sets multiple records, with a map of string views.
* @param records The records to store.
* @param overwrite Whether to overwrite the existing value if there's a record with the same
* key. If true, the existing value is overwritten by the new value. If false, the operation
* is given up and an error status is returned.
* @return The result status.
*/
std::future<Status> SetMulti(
const std::map<std::string_view, std::string_view>& records, bool overwrite = true);
/**
* Sets multiple records, with a map of strings.
* @param records The records to store.
* @param overwrite Whether to overwrite the existing value if there's a record with the same
* key. If true, the existing value is overwritten by the new value. If false, the operation
* is given up and an error status is returned.
* @return The result status.
*/
std::future<Status> SetMulti(
const std::map<std::string, std::string>& records, bool overwrite = true) {
return SetMulti(MakeStrViewMapFromRecords(records));
}
/**
* Removes a record of a key.
* @param key The key of the record.
* @return The result status. If there's no matching record, NOT_FOUND_ERROR is returned.
*/
std::future<Status> Remove(std::string_view key);
/**
* Removes records of keys, with a string view vector.
* @param keys The keys of records to remove.
* @return The result status. If there are missing records, NOT_FOUND_ERROR is returned.
*/
std::future<Status> RemoveMulti(const std::vector<std::string_view>& keys);
/**
* Removes records of keys, with a string vector.
* @param keys The keys of records to remove.
* @return The result status. If there are missing records, NOT_FOUND_ERROR is returned.
*/
std::future<Status> RemoveMulti(const std::vector<std::string>& keys) {
return RemoveMulti(MakeStrViewVectorFromValues(keys));
}
/**
* Appends data at the end of a record of a key.
* @param key The key of the record.
* @param value The value to append.
* @param delim The delimiter to put after the existing record.
* @return The result status.
* @details If there's no existing record, the value is set without the delimiter.
*/
std::future<Status> Append(
std::string_view key, std::string_view value, std::string_view delim = "");
/**
* Appends data to multiple records, with a map of string views.
* @param records The records to append.
* @param delim The delimiter to put after the existing record.
* @return The result status.
* @details If there's no existing record, the value is set without the delimiter.
*/
std::future<Status> AppendMulti(
const std::map<std::string_view, std::string_view>& records, std::string_view delim = "");
/**
* Appends data to multiple records, with a map of strings.
* @param records The records to append.
* @param delim The delimiter to put after the existing record.
* @return The result status.
* @details If there's no existing record, the value is set without the delimiter.
*/
std::future<Status> AppendMulti(
const std::map<std::string, std::string>& records, std::string_view delim = "") {
return AppendMulti(MakeStrViewMapFromRecords(records), delim);
}
/**
* Compares the value of a record and exchanges if the condition meets.
* @param key The key of the record.
* @param expected The expected value. If the data is nullptr, no existing record is expected.
* @param desired The desired value. If the data is nullptr, the record is to be removed.
* @return The result status. If the condition doesn't meet, INFEASIBLE_ERROR is returned.
*/
std::future<Status> CompareExchange(std::string_view key, std::string_view expected,
std::string_view desired);
/**
* Compares the values of records and exchanges if the condition meets.
* @param expected The record keys and their expected values. If the value is nullptr, no
* existing record is expected.
* @param desired The record keys and their desired values. If the value is nullptr, the
* record is to be removed.
* @return The result status. If the condition doesn't meet, INFEASIBLE_ERROR is returned.
*/
std::future<Status> CompareExchangeMulti(
const std::vector<std::pair<std::string_view, std::string_view>>& expected,
const std::vector<std::pair<std::string_view, std::string_view>>& desired);
/**
* Increments the numeric value of a record.
* @param key The key of the record.
* @param increment The incremental value. If it is INT64MIN, the current value is not changed
* and a new record is not created.
* @param initial The initial value.
* @return The result status and the current value.
* @details The record value is stored as an 8-byte big-endian integer. Negative is also
* supported.
*/
std::future<std::pair<Status, int64_t>> Increment(
std::string_view key, int64_t increment = 1, int64_t initial = 0);
/**
* Processes multiple records with processors.
* @param key_proc_pairs Pairs of the keys and their processor objects derived from
* RecordProcessor. The ownership is taken.
* @param writable True if the processors can edit the records.
* @return The result status and a vector of the same object as the parameter.
* @details If the specified record exists, the ProcessFull of the processor is called.
* Otherwise, the ProcessEmpty of the processor is called.
*/
template <typename PROC>
std::future<std::pair<Status, std::vector<std::shared_ptr<PROC>>>> ProcessMulti(
const std::vector<std::pair<std::string_view, std::shared_ptr<PROC>>>& key_proc_pairs,
bool writable);
/**
* Processes multiple records with lambda functions.
* @param key_lambda_pairs Pairs of the keys and their lambda functions. The first parameter of
* the lambda functions is the key of the record, or NOOP if it the record doesn't exist. The
* return value is a string reference to NOOP, REMOVE, or the new record value.
* @param writable True if the processors can edit the records.
* @return The result status.
*/
std::future<Status> ProcessMulti(
const std::vector<std::pair<std::string_view, DBM::RecordLambdaType>>& key_lambda_pairs,
bool writable);
/**
* Processes each and every record in the database with a processor.
* @param proc The processor object derived from RecordProcessor. The ownership is taken.
* @param writable True if the processor can edit the record.
* @return The result status and the same processor object as the parameter.
* @details The ProcessFull of the processor is called repeatedly for each record. The
* ProcessEmpty of the processor is called once before the iteration and once after the
* iteration.
*/
template <typename PROC>
std::future<std::pair<Status, std::unique_ptr<PROC>>> ProcessEach(
std::unique_ptr<PROC> proc, bool writable);
/**
* Processes each and every record in the database with a lambda function.
* @param rec_lambda The lambda function to process a record. The first parameter is the key
* of the record. The second parameter is the value of the existing record, or NOOP if it the
* record doesn't exist. The return value is a string reference to NOOP, REMOVE, or the new
* record value.
* @param writable True if the processor can edit the record.
* @return The result status.
* @details The lambda function is called repeatedly for each record. It is also called once
* before the iteration and once after the iteration with both the key and the value being NOOP.
*/
std::future<Status> ProcessEach(DBM::RecordLambdaType rec_lambda, bool writable);
/**
* Removes all records.
* @return The result status.
*/
std::future<Status> Clear();
/**
* Rebuilds the entire database.
* @param params Optional parameters.
* @return The result status.
*/
std::future<Status> Rebuild(const std::map<std::string, std::string>& params = {});
/**
* Synchronizes the content of the database to the file system.
* @param hard True to do physical synchronization with the hardware or false to do only
* logical synchronization with the file system.
* @param proc The file processor object, whose Process method is called while the content of
* the file is synchronized. The ownership is taken. If it is nullptr, it is ignored.
* If it is nullptr, it is not used.
* @param params Optional parameters.
* @return The result status.
*/
std::future<Status> Synchronize(bool hard, std::unique_ptr<DBM::FileProcessor> proc = nullptr,
const std::map<std::string, std::string>& params = {});
/**
* Copies the content of the database file to another file.
* @param dest_path A path to the destination file.
* @return The result status.
* @details Copying is done while the content is synchronized and stable. So, this method is
* suitable for making a backup file while running a database service.
*/
std::future<Status> CopyFileData(const std::string& dest_path);
/**
* Exports all records to another database.
* @param dbm The pointer to the destination database. The lefetime of the database object
* must last until the task finishes.
* @return The result status.
*/
std::future<Status> Export(DBM* dbm);
/**
* Exports all records of a database to a flat record file.
* @param dest_file The file object to write records in. The lefetime of the file object
* must last until the task finishes.
* @return The result status.
* @details A flat record file contains a sequence of binary records without any high level
* structure so it is useful as a intermediate file for data migration.
*/
std::future<Status> ExportToFlatRecords(File* dest_file);
/**
* Imports records to a database from a flat record file.
* @param src_file The file object to read records from. The lefetime of the file object
* must last until the task finishes.
* @return The result status.
*/
std::future<Status> ImportFromFlatRecords(File* src_file);
/**
* Searches the database and get keys which match a pattern, according to a mode expression.
* @param mode The search mode. "contain" extracts keys containing the pattern. "begin"
* extracts keys beginning with the pattern. "end" extracts keys ending with the pattern.
* "regex" extracts keys partially matches the pattern of a regular expression. "edit"
* extracts keys whose edit distance to the UTF-8 pattern is the least. "editbin" extracts
* keys whose edit distance to the binary pattern is the least. Ordered databases support
* "upper" and "lower" which extract keys whose positions are upper/lower than the pattern.
* "upperinc" and "lowerinc" are their inclusive versions.
* @param pattern The pattern for matching.
* @param capacity The maximum records to obtain. 0 means unlimited.
* @return The result status and the result keys.
*/
std::future<std::pair<Status, std::vector<std::string>>> SearchModal(
std::string_view mode, std::string_view pattern, size_t capacity = 0);
/**
* Gets the internal task queue.
* @return The pointer to the internal task queue. The ownership is not moved.
*/
TaskQueue* GetTaskQueue() {
return &queue_;
}
private:
/** The database object. */
DBM* dbm_;
/** The task queue. */
TaskQueue queue_;
/** The postprocessor. */
std::unique_ptr<CommonPostprocessor> postproc_;
};
/**
* Wrapper of std::future containing a status object and extra data.
*/
class StatusFuture final {
public:
/**
* Constructor for a status object.
* @param future a future object. The ownership is taken.
*/
explicit StatusFuture(std::future<Status>&& future);
/**
* Constructor for a status object and a string.
* @param future a future object. The ownership is taken.
*/
explicit StatusFuture(std::future<std::pair<Status, std::string>>&& future);
/**
* Constructor for a status object and a string vector.
* @param future a future object. The ownership is taken.
*/
explicit StatusFuture(std::future<std::pair<Status, std::vector<std::string>>>&& future);
/**
* Constructor for a status object and a string map.
* @param future a future object. The ownership is taken.
*/
explicit StatusFuture(std::future<std::pair<
Status, std::map<std::string, std::string>>>&& future);
/**
* Constructor for a status object and an integer.
* @param future a future object. The ownership is taken.
*/
explicit StatusFuture(std::future<std::pair<Status, int64_t>>&& future);
/**
* Move constructor.
* @param rhs The right-hand-side object.
*/
StatusFuture(StatusFuture&& rhs);
/**
* Destructor.
*/
~StatusFuture();
/**
* Copy and assignment are disabled.
*/
explicit StatusFuture(const StatusFuture& rhs) = delete;
StatusFuture& operator =(const StatusFuture& rhs) = delete;
/**
* Waits for the operation to be done.
* @param timeout The waiting time in seconds. If it is negative, no timeout is set.
* @return True if the operation has done. False if timeout occurs.
*/
bool Wait(double timeout = -1);
/**
* Waits for the operation to be done and gets the result status.
* @return The result status.
* @details Either one of the Get method family can be called only once.
*/
Status Get();
/**
* Waits for the operation to be done and gets the status and the extra string.
* @return The result status and the extra string.
* @details Either one of the Get method family can be called only once.
*/
std::pair<Status, std::string> GetString();
/**
* Waits for the operation to be done and gets the status and the extra string vector.
* @return The result status and the extra string vector.
* @details Either one of the Get method family can be called only once.
*/
std::pair<Status, std::vector<std::string>> GetStringVector();
/**
* Waits for the operation to be done and gets the status and the extra string map.
* @return The result status and the extra string map.
* @details Either one of the Get method family can be called only once.
*/
std::pair<Status, std::map<std::string, std::string>> GetStringMap();
/**
* Waits for the operation to be done and gets the status and the extra integer.
* @return The result status and the extra integer.
* @details Either one of the Get method family can be called only once.
*/
std::pair<Status, int64_t> GetInteger();
/**
* Gets the type information of the extra data.
* @return The type information of the extra data.
*/
const std::type_info& GetExtraType();
private:
void* future_;
const std::type_info& type_;
};
template <typename PROC>
inline std::future<std::pair<Status, std::unique_ptr<PROC>>> AsyncDBM::Process(
std::string_view key, std::unique_ptr<PROC> proc, bool writable) {
struct ProcessTask : public TaskQueue::Task {
DBM* dbm;
std::string key;
std::unique_ptr<PROC> proc;
bool writable;
std::promise<std::pair<Status, std::unique_ptr<PROC>>> promise;
void Do() override {
Status status = dbm->Process(key, proc.get(), writable);
proc->ProcessStatus(status);
promise.set_value(std::make_pair(std::move(status), std::move(proc)));
}
};
auto task = std::make_unique<ProcessTask>();
task->dbm = dbm_;
task->key = key;
task->proc = std::move(proc);
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
inline std::future<Status> AsyncDBM::Process(
std::string_view key, DBM::RecordLambdaType rec_lambda, bool writable) {
struct ProcessTask : public TaskQueue::Task {
DBM* dbm;
std::string key;
DBM::RecordLambdaType rec_lambda;
bool writable;
std::promise<Status> promise;
void Do() override {
Status status = dbm->Process(key, rec_lambda, writable);
promise.set_value(std::move(status));
}
};
auto task = std::make_unique<ProcessTask>();
task->dbm = dbm_;
task->key = key;
task->rec_lambda = rec_lambda;
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
template <typename PROC>
inline std::future<std::pair<Status, std::unique_ptr<PROC>>> AsyncDBM::ProcessEach(
std::unique_ptr<PROC> proc, bool writable) {
struct ProcessEachTask : public TaskQueue::Task {
DBM* dbm;
std::unique_ptr<PROC> proc;
bool writable;
std::promise<std::pair<Status, std::unique_ptr<PROC>>> promise;
void Do() override {
Status status = dbm->ProcessEach(proc.get(), writable);
proc->ProcessStatus(status);
promise.set_value(std::make_pair(std::move(status), std::move(proc)));
}
};
auto task = std::make_unique<ProcessEachTask>();
task->dbm = dbm_;
task->proc = std::move(proc);
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
inline std::future<Status> AsyncDBM::ProcessEach(
DBM::RecordLambdaType rec_lambda, bool writable) {
struct ProcessEachTask : public TaskQueue::Task {
DBM* dbm;
DBM::RecordLambdaType rec_lambda;
bool writable;
std::promise<Status> promise;
void Do() override {
Status status = dbm->ProcessEach(rec_lambda, writable);
promise.set_value(std::move(status));
}
};
auto task = std::make_unique<ProcessEachTask>();
task->dbm = dbm_;
task->rec_lambda = rec_lambda;
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
template <typename PROC>
inline std::future<std::pair<Status, std::vector<std::shared_ptr<PROC>>>> AsyncDBM::ProcessMulti(
const std::vector<std::pair<std::string_view, std::shared_ptr<PROC>>>& key_proc_pairs,
bool writable) {
struct ProcessMultiTask : public TaskQueue::Task {
DBM* dbm;
std::vector<std::pair<std::string, std::shared_ptr<PROC>>> key_proc_pairs;
bool writable;
std::promise<std::pair<Status, std::vector<std::shared_ptr<PROC>>>> promise;
void Do() override {
std::vector<std::pair<std::string_view, DBM::RecordProcessor*>> tmp_pairs;
tmp_pairs.reserve(key_proc_pairs.size());
std::vector<std::shared_ptr<PROC>> procs;
procs.reserve(key_proc_pairs.size());
for (auto& key_proc : key_proc_pairs) {
tmp_pairs.emplace_back(std::make_pair(
std::string_view(key_proc.first), key_proc.second.get()));
procs.emplace_back(key_proc.second);
}
Status status = dbm->ProcessMulti(tmp_pairs, writable);
for (auto& proc : procs) {
proc->ProcessStatus(status);
}
promise.set_value(std::make_pair(std::move(status), std::move(procs)));
}
};
auto task = std::make_unique<ProcessMultiTask>();
task->dbm = dbm_;
task->key_proc_pairs.reserve(key_proc_pairs.size());
for (auto& key_proc : key_proc_pairs) {
task->key_proc_pairs.emplace_back(std::make_pair(
std::string(key_proc.first), key_proc.second));
}
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
inline std::future<Status> AsyncDBM::ProcessMulti(
const std::vector<std::pair<std::string_view, DBM::RecordLambdaType>>& key_lambda_pairs,
bool writable) {
struct ProcessMultiTask : public TaskQueue::Task {
DBM* dbm;
std::vector<std::pair<std::string, DBM::RecordLambdaType>> key_lambda_pairs;
bool writable;
std::promise<Status> promise;
void Do() override {
std::vector<std::pair<std::string_view, DBM::RecordProcessor*>> key_proc_pairs;
key_proc_pairs.reserve(key_lambda_pairs.size());
std::vector<DBM::RecordProcessorLambda> procs;
procs.reserve(key_lambda_pairs.size());
for (auto& key_lambda : key_lambda_pairs) {
procs.emplace_back(key_lambda.second);
key_proc_pairs.emplace_back(std::make_pair(
std::string_view(key_lambda.first), &procs.back()));
}
Status status = dbm->ProcessMulti(key_proc_pairs, writable);
promise.set_value(std::move(status));
}
};
auto task = std::make_unique<ProcessMultiTask>();
task->dbm = dbm_;
task->key_lambda_pairs.reserve(key_lambda_pairs.size());
for (auto& key_lambda : key_lambda_pairs) {
task->key_lambda_pairs.emplace_back(std::make_pair(
std::string(key_lambda.first), key_lambda.second));
}
task->writable = writable;
auto future = task->promise.get_future();
queue_.Add(std::move(task));
return future;
}
} // namespace tkrzw
#endif // _TKRZW_DBM_ASYNC_H
// END OF FILE