forked from OpenAtomFoundation/pika
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pika_binlog.h
125 lines (89 loc) · 3.18 KB
/
pika_binlog.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef PIKA_BINLOG_H_
#define PIKA_BINLOG_H_
#include <atomic>
#include "pstd/include/env.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"
#include "include/pika_define.h"
std::string NewFileName(const std::string& name, uint32_t current);
class Version final : public pstd::noncopyable {
public:
Version(const std::shared_ptr<pstd::RWFile>& save);
~Version();
pstd::Status Init();
// RWLock should be held when access members.
pstd::Status StableSave();
uint32_t pro_num_ = 0;
uint64_t pro_offset_ = 0;
uint64_t logic_id_ = 0;
uint32_t term_ = 0;
std::shared_mutex rwlock_;
void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
}
private:
// shared with versionfile_
std::shared_ptr<pstd::RWFile> save_;
};
class Binlog : public pstd::noncopyable {
public:
Binlog(std::string Binlog_path, int file_size = 100 * 1024 * 1024);
~Binlog();
void Lock() { mutex_.lock(); }
void Unlock() { mutex_.unlock(); }
pstd::Status Put(const std::string& item);
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
/*
* Set Producer pro_num and pro_offset with lock
*/
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);
uint64_t file_size() { return file_size_; }
std::string filename() { return filename_; }
bool IsBinlogIoError() { return binlog_io_error_; }
// need to hold mutex_
void SetTerm(uint32_t term) {
std::lock_guard l(version_->rwlock_);
version_->term_ = term;
version_->StableSave();
}
uint32_t term() {
std::shared_lock l(version_->rwlock_);
return version_->term_;
}
void Close();
private:
pstd::Status Put(const char* item, int len);
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
// pstd::WritableFile *queue() { return queue_; }
void InitLogFile();
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
/*
* Produce
*/
pstd::Status Produce(const pstd::Slice& item, int* pro_offset);
std::atomic<bool> opened_;
std::unique_ptr<Version> version_;
std::unique_ptr<pstd::WritableFile> queue_;
// versionfile_ can only be used as a shared_ptr, and it will be used as a variable version_ in the ~Version() function.
std::shared_ptr<pstd::RWFile> versionfile_;
pstd::Mutex mutex_;
uint32_t pro_num_ = 0;
int block_offset_ = 0;
char* pool_ = nullptr;
bool exit_all_consume_ = false;
const std::string binlog_path_;
uint64_t file_size_ = 0;
std::string filename_;
std::atomic<bool> binlog_io_error_;
// Not use
// int32_t retry_;
};
#endif