forked from datastax/cpp-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfuture.hpp
153 lines (120 loc) · 3.22 KB
/
future.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
Copyright (c) 2014-2016 DataStax
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef __CASS_FUTURE_HPP_INCLUDED__
#define __CASS_FUTURE_HPP_INCLUDED__
#include "atomic.hpp"
#include "cassandra.h"
#include "host.hpp"
#include "macros.hpp"
#include "scoped_lock.hpp"
#include "scoped_ptr.hpp"
#include "ref_counted.hpp"
#include <uv.h>
#include <assert.h>
#include <string>
namespace cass {
struct Error;
enum FutureType {
CASS_FUTURE_TYPE_SESSION,
CASS_FUTURE_TYPE_RESPONSE
};
class Future : public RefCounted<Future> {
public:
typedef void (*Callback)(CassFuture*, void*);
struct Error {
Error(CassError code, const std::string& message)
: code(code)
, message(message) {}
CassError code;
std::string message;
};
Future(FutureType type)
: is_set_(false)
, type_(type)
, loop_(NULL)
, callback_(NULL) {
uv_mutex_init(&mutex_);
uv_cond_init(&cond_);
}
virtual ~Future() {
uv_mutex_destroy(&mutex_);
uv_cond_destroy(&cond_);
}
FutureType type() const { return type_; }
bool ready() {
ScopedMutex lock(&mutex_);
return is_set_;
}
virtual void wait() {
ScopedMutex lock(&mutex_);
internal_wait(lock);
}
virtual bool wait_for(uint64_t timeout_us) {
ScopedMutex lock(&mutex_);
return internal_wait_for(lock, timeout_us);
}
Error* get_error() {
ScopedMutex lock(&mutex_);
internal_wait(lock);
return error_.get();
}
void set() {
ScopedMutex lock(&mutex_);
internal_set(lock);
}
void set_error(CassError code, const std::string& message) {
ScopedMutex lock(&mutex_);
internal_set_error(code, message, lock);
}
void set_loop(uv_loop_t* loop) {
loop_.store(loop);
}
bool set_callback(Callback callback, void* data);
protected:
void internal_wait(ScopedMutex& lock) {
while (!is_set_) {
uv_cond_wait(&cond_, lock.get());
}
}
bool internal_wait_for(ScopedMutex& lock, uint64_t timeout_us) {
if (!is_set_) {
if (uv_cond_timedwait(&cond_, lock.get(), timeout_us * 1000) != 0) { // Expects nanos
return false;
}
}
return is_set_;
}
void internal_set(ScopedMutex& lock);
void internal_set_error(CassError code, const std::string& message, ScopedMutex& lock) {
error_.reset(new Error(code, message));
internal_set(lock);
}
uv_mutex_t mutex_;
private:
void run_callback_on_work_thread();
static void on_work(uv_work_t* work);
static void on_after_work(uv_work_t* work, int status);
private:
bool is_set_;
uv_cond_t cond_;
FutureType type_;
ScopedPtr<Error> error_;
Atomic<uv_loop_t*> loop_;
uv_work_t work_;
Callback callback_;
void* data_;
private:
DISALLOW_COPY_AND_ASSIGN(Future);
};
} // namespace cass
#endif