forked from TES3MP/CrabNet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCloudServer.h
384 lines (309 loc) · 17.8 KB
/
CloudServer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
/*
* Copyright (c) 2014, Oculus VR, Inc.
* Copyright (c) 2016-2018, TES3MP Team
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
/// \file CloudServer.h
/// \brief Stores client data, and allows cross-server communication to retrieve this data
/// \details TODO
///
#include "NativeFeatureIncludes.h"
#if _CRABNET_SUPPORT_CloudServer==1
#ifndef __CLOUD_SERVER_H
#define __CLOUD_SERVER_H
#include "PluginInterface2.h"
#include <stdint.h>
#include "RakString.h"
#include "DS_Hash.h"
#include "CloudCommon.h"
#include "DS_OrderedList.h"
#include <cstdlib>
/// If the data is smaller than this value, an allocation is avoid. However, this value exists for every row
#define CLOUD_SERVER_DATA_STACK_SIZE 32
namespace RakNet
{
/// Forward declarations
class RakPeerInterface;
/// \brief Zero or more instances of CloudServerQueryFilter can be attached to CloudServer to restrict client queries
/// All attached instances of CloudServerQueryFilter on each corresponding operation, from all directly connected clients
/// If any attached instance returns false for a given operation, that operation is silently rejected
/// \ingroup CLOUD_GROUP
class RAK_DLL_EXPORT CloudServerQueryFilter
{
public:
CloudServerQueryFilter() {}
virtual ~CloudServerQueryFilter() {}
/// Called when a local client wants to post data
/// \return true to allow, false to reject
virtual bool OnPostRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudKey key, uint32_t dataLength, const char *data)=0;
/// Called when a local client wants to release data that it has previously uploaded
/// \return true to allow, false to reject
virtual bool OnReleaseRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys)=0;
/// Called when a local client wants to query data
/// If you return false, the client will get no response at all
/// \return true to allow, false to reject
virtual bool OnGetRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudQuery &query, DataStructures::List<RakNetGUID> &specificSystems)=0;
/// Called when a local client wants to stop getting updates for data
/// If you return false, the client will keep getting updates for that data
/// \return true to allow, false to reject
virtual bool OnUnsubscribeRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys, DataStructures::List<RakNetGUID> &specificSystems)=0;
};
/// \brief Stores client data, and allows cross-server communication to retrieve this data
/// \ingroup CLOUD_GROUP
class RAK_DLL_EXPORT CloudServer : public PluginInterface2, CloudAllocator
{
public:
// GetInstance() and DestroyInstance(instance*)
STATIC_FACTORY_DECLARATIONS(CloudServer)
CloudServer();
virtual ~CloudServer();
/// \brief Max bytes a client can upload
/// Data in excess of this value is silently ignored
/// defaults to 0 (unlimited)
/// \param[in] bytes Max bytes a client can upload. 0 means unlimited.
void SetMaxUploadBytesPerClient(uint64_t bytes);
/// \brief Max bytes returned by a download. If the number of bytes would exceed this amount, the returned list is truncated
/// However, if this would result in no rows downloaded, then one row will be returned.
/// \param[in] bytes Max bytes a client can download from a single Get(). 0 means unlimited.
void SetMaxBytesPerDownload(uint64_t bytes);
/// \brief Add a server, which is assumed to be connected in a fully connected mesh to all other servers and also running the CloudServer plugin
/// The other system must also call AddServer before getting the subscription data, or it will be rejected.
/// Sending a message telling the other system to call AddServer(), followed by calling AddServer() locally, would be sufficient for this to work.
/// \note This sends subscription data to the other system, using RELIABLE_ORDERED on channel 0
/// \param[in] systemIdentifier Identifier of the remote system
void AddServer(RakNetGUID systemIdentifier);
/// \brief Removes a server added through AddServer()
/// \param[in] systemIdentifier Identifier of the remote system
void RemoveServer(RakNetGUID systemIdentifier);
/// Return list of servers added with AddServer()
/// \param[out] remoteServers List of servers added
void GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut);
/// \brief Frees all memory. Does not remove query filters
void Clear(void);
/// \brief Report the specified SystemAddress to client queries, rather than what RakPeer reads.
/// This is useful if you already know your public IP
/// This only applies to future updates, so call it before updating to apply to all queries
/// \param[in] forcedAddress The systmeAddress to return in queries. Use UNASSIGNED_SYSTEM_ADDRESS (default) to use what RakPeer returns
void ForceExternalSystemAddress(SystemAddress forcedAddress);
/// \brief Adds a callback called on each query. If all filters returns true for an operation, the operation is allowed.
/// If the filter was already added, the function silently fails
/// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
void AddQueryFilter(CloudServerQueryFilter* filter);
/// \brief Removes a callback added with AddQueryFilter()
/// The instance is not deleted, only unreferenced. It is up to the user to delete the instance, if necessary
/// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
void RemoveQueryFilter(CloudServerQueryFilter* filter);
/// \brief Removes all instances of CloudServerQueryFilter added with AddQueryFilter().
/// The instances are not deleted, only unreferenced. It is up to the user to delete the instances, if necessary
void RemoveAllQueryFilters(void);
protected:
virtual void Update(void);
virtual PluginReceiveResult OnReceive(Packet *packet);
virtual void OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason );
virtual void OnRakPeerShutdown(void);
virtual void OnPostRequest(Packet *packet);
virtual void OnReleaseRequest(Packet *packet);
virtual void OnGetRequest(Packet *packet);
virtual void OnUnsubscribeRequest(Packet *packet);
virtual void OnServerToServerGetRequest(Packet *packet);
virtual void OnServerToServerGetResponse(Packet *packet);
uint64_t maxUploadBytesPerClient, maxBytesPerDowload;
// ----------------------------------------------------------------------------
// For a given data key, quickly look up one or all systems that have uploaded
// ----------------------------------------------------------------------------
struct CloudData
{
CloudData(): stackData{0}, allocatedData(nullptr), dataPtr(nullptr), dataLengthBytes(0), isUploaded(false) {}
~CloudData() {if (allocatedData) free(allocatedData);}
bool IsUnused(void) const {return isUploaded==false && specificSubscribers.Size()==0;}
void Clear(void) {if (dataPtr==allocatedData) free(allocatedData); allocatedData=0; dataPtr=0; dataLengthBytes=0; isUploaded=false;}
unsigned char stackData[CLOUD_SERVER_DATA_STACK_SIZE];
unsigned char *allocatedData; // Uses allocatedData instead of stackData if length of data exceeds CLOUD_SERVER_DATA_STACK_SIZE
unsigned char *dataPtr; // Points to either stackData or allocatedData
uint32_t dataLengthBytes;
bool isUploaded;
/// System address of server that is holding this data, and the client is connected to
SystemAddress serverSystemAddress;
/// System address of client that uploaded this data
SystemAddress clientSystemAddress;
/// RakNetGUID of server that is holding this data, and the client is connected to
RakNetGUID serverGUID;
/// RakNetGUID of client that uploaded this data
RakNetGUID clientGUID;
/// When the key data changes from this particular system, notify these subscribers
/// This list mutually exclusive with CloudDataList::nonSpecificSubscribers
DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSubscribers;
};
void WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
void WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
static int KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data );
struct CloudDataList
{
bool IsUnused(void) const {return keyData.Size()==0 && nonSpecificSubscribers.Size()==0;}
bool IsNotUploaded(void) const {return uploaderCount==0;}
bool RemoveSubscriber(RakNetGUID g) {
bool objectExists;
unsigned int index;
index = nonSpecificSubscribers.GetIndexFromKey(g, &objectExists);
if (objectExists)
{
subscriberCount--;
nonSpecificSubscribers.RemoveAtIndex(index);
return true;
}
return false;
}
unsigned int uploaderCount, subscriberCount;
CloudKey key;
// Data uploaded from or subscribed to for various systems
DataStructures::OrderedList<RakNetGUID, CloudData*, CloudServer::KeyDataPtrComp> keyData;
/// When the key data changes from any system, notify these subscribers
/// This list mutually exclusive with CloudData::specificSubscribers
DataStructures::OrderedList<RakNetGUID, RakNetGUID> nonSpecificSubscribers;
};
static int KeyDataListComp( const CloudKey &key, CloudDataList * const &data );
DataStructures::OrderedList<CloudKey, CloudDataList*, CloudServer::KeyDataListComp> dataRepository;
struct KeySubscriberID
{
CloudKey key;
DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSystemsSubscribedTo;
};
static int KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data );
// Remote systems
struct RemoteCloudClient
{
bool IsUnused(void) const {return uploadedKeys.Size()==0 && subscribedKeys.Size()==0;}
DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
DataStructures::OrderedList<CloudKey,KeySubscriberID*,CloudServer::KeySubscriberIDComp> subscribedKeys;
uint64_t uploadedBytes;
};
DataStructures::Hash<RakNetGUID, RemoteCloudClient*, 2048, RakNetGUID::ToUint32> remoteSystems;
// For a given user, release all subscribed and uploaded keys
void ReleaseSystem(RakNetGUID clientAddress );
// For a given user, release a set of keys
void ReleaseKeys(RakNetGUID clientAddress, DataStructures::List<CloudKey> &keys );
void NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
void NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
void NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated );
struct RemoteServer
{
RakNetGUID serverAddress;
// This server needs to know about these keys when they are updated or deleted
DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> subscribedKeys;
// This server has uploaded these keys, and needs to know about Get() requests
DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
// Just for processing
bool workingFlag;
// If false, we don't know what keys they have yet, so send everything
bool gotSubscribedAndUploadedKeys;
};
static int RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data );
DataStructures::OrderedList<RakNetGUID, RemoteServer*, CloudServer::RemoteServerComp> remoteServers;
struct BufferedGetResponseFromServer
{
void Clear(CloudAllocator *allocator);
RakNetGUID serverAddress;
CloudQueryResult queryResult;
bool gotResult;
};
struct CloudQueryWithAddresses
{
// Inputs
CloudQuery cloudQuery;
DataStructures::List<RakNetGUID> specificSystems;
void Serialize(bool writeToBitstream, BitStream *bitStream);
};
static int BufferedGetResponseFromServerComp(const RakNetGUID &key, BufferedGetResponseFromServer* const &data );
struct GetRequest
{
void Clear(CloudAllocator *allocator);
bool AllRemoteServersHaveResponded(void) const;
CloudQueryWithAddresses cloudQueryWithAddresses;
// When request started. If takes too long for a response from another system, can abort remaining systems
RakNet::Time requestStartTime;
// Assigned by server that gets the request to identify response. See nextGetRequestId
uint32_t requestId;
RakNetGUID requestingClient;
DataStructures::OrderedList<RakNetGUID, BufferedGetResponseFromServer*, CloudServer::BufferedGetResponseFromServerComp> remoteServerResponses;
};
static int GetRequestComp(const uint32_t &key, GetRequest* const &data );
DataStructures::OrderedList<uint32_t, GetRequest*, CloudServer::GetRequestComp> getRequests;
RakNet::Time nextGetRequestsCheck;
uint32_t nextGetRequestId;
void ProcessAndTransmitGetRequest(GetRequest *getRequest);
void ProcessCloudQueryWithAddresses(
CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses,
DataStructures::List<CloudData*> &cloudDataResultList,
DataStructures::List<CloudKey> &cloudKeyResultList
);
void SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress );
void SendUploadedKeyToServers( CloudKey &cloudKey );
void SendSubscribedKeyToServers( CloudKey &cloudKey );
void RemoveUploadedKeyFromServers( CloudKey &cloudKey );
void RemoveSubscribedKeyFromServers( CloudKey &cloudKey );
void OnSendUploadedAndSubscribedKeysToServer( Packet *packet );
void OnSendUploadedKeyToServers( Packet *packet );
void OnSendSubscribedKeyToServers( Packet *packet );
void OnRemoveUploadedKeyFromServers( Packet *packet );
void OnRemoveSubscribedKeyFromServers( Packet *packet );
void OnServerDataChanged( Packet *packet );
void GetServersWithUploadedKeys(
DataStructures::List<CloudKey> &keys,
DataStructures::List<RemoteServer*> &remoteServersWithData
);
CloudServer::CloudDataList *GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex);
void UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems);
void RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid);
DataStructures::List<CloudServerQueryFilter*> queryFilters;
SystemAddress forceAddress;
};
} // namespace RakNet
#endif
// Key subscription
//
// A given system can subscribe to one or more keys.
// The subscription can be further be defined as only subscribing to keys uploaded by or changed by a given system.
// It is possible to subscribe to keys not yet uploaded, or uploaded to another system
//
// Operations:
//
// 1. SubscribeToKey() - Get() operation with subscription
// A. Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
// B. Send to remote servers that for this key, they should send us updates
// C. (Done, get operation returns current values)
//
// 2. UpdateData() - Post() operation
// A. Find all subscribers to this data, for the uploading system.
// B. Send them the uploaded data
// C. Find all servers that subscribe to this data
// D. Send them the uploaded data
//
// 3. DeleteData() - Release() operation
// A. Find all subscribers to this data, for the deleting system.
// B. Inform them of the deletion
// C. Find all servers that subscribe to this data
// D. Inform them of the deletion
//
// 4. Unsubscribe()
// A. Find this subscriber, and remove their subscription
// B. If no one else is subscribing to this key for any system, notify remote servers we no longer need subscription updates
//
// Internal operations:
//
// 1. Find if any connected client has subscribed to a given key
// A. This is used add and remove our subscription for this key to remote servers
//
// 2. For a given key and updating address, find all connected clients that care
// A. First find connected clients that have subscribed to this key, regardless of address
// B. Then find connected clients that have subscribed to this key for this particular address
//
// 3. Find all remote servers that have subscribed to a given key
// A. This is so when the key is updated or deleted, we know who to send it to
//
// 4. For a given client (such as on disconnect), remove all records of their subscriptions
#endif // _CRABNET_SUPPORT_*