forked from eventmachine/eventmachine
-
Notifications
You must be signed in to change notification settings - Fork 4
/
em.h
244 lines (188 loc) · 5.52 KB
/
em.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/*****************************************************************************
$Id$
File: em.h
Date: 06Apr06
Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
Gmail: blackhedd
This program is free software; you can redistribute it and/or modify
it under the terms of either: 1) the GNU General Public License
as published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version; or 2) Ruby's License.
See the file COPYING for complete licensing information.
*****************************************************************************/
#ifndef __EventMachine__H_
#define __EventMachine__H_
#ifdef BUILD_FOR_RUBY
#include <ruby.h>
#define EmSelect rb_thread_select
#ifdef HAVE_RB_WAIT_FOR_SINGLE_FD
#include <ruby/io.h>
#endif
#if defined(HAVE_RBTRAP)
#include <rubysig.h>
#elif defined(HAVE_RB_ENABLE_INTERRUPT)
extern "C" {
void rb_enable_interrupt(void);
void rb_disable_interrupt(void);
}
#define TRAP_BEG rb_enable_interrupt()
#define TRAP_END do { rb_disable_interrupt(); rb_thread_check_ints(); } while(0)
#else
#define TRAP_BEG
#define TRAP_END
#endif
// 1.9.0 compat
#ifndef RUBY_UBF_IO
#define RUBY_UBF_IO RB_UBF_DFL
#endif
#ifndef RSTRING_PTR
#define RSTRING_PTR(str) RSTRING(str)->ptr
#endif
#ifndef RSTRING_LEN
#define RSTRING_LEN(str) RSTRING(str)->len
#endif
#ifndef RSTRING_LENINT
#define RSTRING_LENINT(str) RSTRING_LEN(str)
#endif
#else
#define EmSelect select
#endif
class EventableDescriptor;
class InotifyDescriptor;
/********************
class EventMachine_t
********************/
class EventMachine_t
{
public:
static int GetMaxTimerCount();
static void SetMaxTimerCount (int);
public:
EventMachine_t (EMCallback);
virtual ~EventMachine_t();
void Run();
void ScheduleHalt();
void SignalLoopBreaker();
const unsigned long InstallOneshotTimer (int);
const unsigned long ConnectToServer (const char *, int, const char *, int);
const unsigned long ConnectToUnixServer (const char *);
const unsigned long CreateTcpServer (const char *, int);
const unsigned long OpenDatagramSocket (const char *, int);
const unsigned long CreateUnixDomainServer (const char*);
const unsigned long AttachSD (int);
const unsigned long OpenKeyboard();
//const char *Popen (const char*, const char*);
const unsigned long Socketpair (char* const*);
void Add (EventableDescriptor*);
void Modify (EventableDescriptor*);
void Deregister (EventableDescriptor*);
const unsigned long AttachFD (int, bool);
int DetachFD (EventableDescriptor*);
void ArmKqueueWriter (EventableDescriptor*);
void ArmKqueueReader (EventableDescriptor*);
void SetTimerQuantum (int);
static void SetuidString (const char*);
static int SetRlimitNofile (int);
pid_t SubprocessPid;
int SubprocessExitStatus;
int GetConnectionCount();
float GetHeartbeatInterval();
int SetHeartbeatInterval(float);
const unsigned long WatchFile (const char*);
void UnwatchFile (int);
void UnwatchFile (const unsigned long);
#ifdef HAVE_KQUEUE
void _HandleKqueueFileEvent (struct kevent*);
void _RegisterKqueueFileEvent(int);
#endif
const unsigned long WatchPid (int);
void UnwatchPid (int);
void UnwatchPid (const unsigned long);
#ifdef HAVE_KQUEUE
void _HandleKqueuePidEvent (struct kevent*);
#endif
uint64_t GetCurrentLoopTime() { return MyCurrentLoopTime; }
// Temporary:
void _UseEpoll();
void _UseKqueue();
bool UsingKqueue() { return bKqueue; }
bool UsingEpoll() { return bEpoll; }
void QueueHeartbeat(EventableDescriptor*);
void ClearHeartbeat(uint64_t, EventableDescriptor*);
uint64_t GetRealTime();
private:
void _RunOnce();
void _RunTimers();
void _UpdateTime();
void _AddNewDescriptors();
void _ModifyDescriptors();
void _InitializeLoopBreaker();
void _CleanupSockets();
void _RunSelectOnce();
void _RunEpollOnce();
void _RunKqueueOnce();
void _ModifyEpollEvent (EventableDescriptor*);
void _DispatchHeartbeats();
timeval _TimeTilNextEvent();
void _CleanBadDescriptors();
public:
void _ReadLoopBreaker();
void _ReadInotifyEvents();
int NumCloseScheduled;
private:
enum {
MaxEpollDescriptors = 64*1024,
MaxEvents = 4096
};
int HeartbeatInterval;
EMCallback EventCallback;
class Timer_t: public Bindable_t {
};
multimap<uint64_t, Timer_t> Timers;
multimap<uint64_t, EventableDescriptor*> Heartbeats;
map<int, Bindable_t*> Files;
map<int, Bindable_t*> Pids;
vector<EventableDescriptor*> Descriptors;
vector<EventableDescriptor*> NewDescriptors;
set<EventableDescriptor*> ModifiedDescriptors;
uint64_t NextHeartbeatTime;
int LoopBreakerReader;
int LoopBreakerWriter;
#ifdef OS_WIN32
struct sockaddr_in LoopBreakerTarget;
#endif
timeval Quantum;
uint64_t MyCurrentLoopTime;
#ifdef OS_WIN32
unsigned TickCountTickover;
unsigned LastTickCount;
#endif
private:
bool bTerminateSignalReceived;
bool bEpoll;
int epfd; // Epoll file-descriptor
#ifdef HAVE_EPOLL
struct epoll_event epoll_events [MaxEvents];
#endif
bool bKqueue;
int kqfd; // Kqueue file-descriptor
#ifdef HAVE_KQUEUE
struct kevent Karray [MaxEvents];
#endif
InotifyDescriptor *inotify; // pollable descriptor for our inotify instance
};
/*******************
struct SelectData_t
*******************/
struct SelectData_t
{
SelectData_t();
int _Select();
int maxsocket;
fd_set fdreads;
fd_set fdwrites;
fd_set fderrors;
timeval tv;
int nSockets;
};
#endif // __EventMachine__H_