diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml
new file mode 100644
index 0000000..1ffcb0c
--- /dev/null
+++ b/.github/workflows/codeql.yml
@@ -0,0 +1,83 @@
+# For most projects, this workflow file will not need changing; you simply need
+# to commit it to your repository.
+#
+# You may wish to alter this file to override the set of languages analyzed,
+# or to provide custom queries or build logic.
+#
+# ******** NOTE ********
+# We have attempted to detect the languages in your repository. Please check
+# the `language` matrix defined below to confirm you have the correct set of
+# supported CodeQL languages.
+#
+name: "CodeQL"
+
+on:
+ push:
+ branches: [ "master" ]
+ pull_request:
+ # The branches below must be a subset of the branches above
+ branches: [ "master" ]
+jobs:
+ analyze:
+ name: Analyze
+ runs-on: ubuntu-latest
+ permissions:
+ actions: read
+ contents: read
+ security-events: write
+
+ strategy:
+ fail-fast: false
+ matrix:
+ language: [ 'cpp' ]
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ # Initializes the CodeQL tools for scanning.
+ - name: Initialize CodeQL
+ uses: github/codeql-action/init@v2
+ with:
+ languages: ${{ matrix.language }}
+ # If you wish to specify custom queries, you can do so here or in a config file.
+ # By default, queries listed here will override any specified in a config file.
+ # Prefix the list here with "+" to use these queries and those in the config file.
+
+ # Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
+ # queries: security-extended,security-and-quality
+
+
+ - name: Install dependencies
+ run: |
+ sudo apt-get update && \
+ sudo apt-get -qq install clang cmake git openssl libssl-dev libhiredis-dev \
+ libspdlog-dev libfmt-dev ninja-build
+
+ - name: Install redis-plus-plus
+ run: |
+ mkdir -p /tmp/redis-plus-plus && cd /tmp/redis-plus-plus && \
+ git clone https://github.com/sewenew/redis-plus-plus.git . && \
+ git checkout tags/1.3.3 && \
+ mkdir compile && cd compile && cmake -GNinja -DCMAKE_BUILD_TYPE=Release .. && \
+ ninja && sudo ninja install
+
+ # Autobuild attempts to build any compiled languages (C/C++, C#, Go, or Java).
+ # If this step fails, then you should remove it and run the build manually (see below)
+ - name: Autobuild
+ uses: github/codeql-action/autobuild@v2
+
+ # âšī¸ Command-line programs to run using the OS shell.
+ # đ See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
+
+ # If the Autobuild fails above, remove it and uncomment the following three lines.
+ # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
+
+ # - run: |
+ # echo "Run, Build Application using script"
+ # ./location_of_script_within_repo/buildscript.sh
+
+ - name: Perform CodeQL Analysis
+ uses: github/codeql-action/analyze@v2
+ with:
+ category: "/language:${{matrix.language}}"
diff --git a/.gitignore b/.gitignore
index 2ca1c63..7a4bd44 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@ build-dbg/*
eventhub.code-workspace
tests/integration/clienttest/clienttest
tests/integration/clienttest/vendor/*
+eventhub.conf
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3d32e98..e3907e1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,11 +1,11 @@
-cmake_minimum_required (VERSION 2.9)
+cmake_minimum_required(VERSION 3.5)
project (eventhub)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake")
set(CMAKE_C_FLAGS "-Wall")
set(CMAKE_C_COMPILER "clang")
set(CMAKE_CXX_COMPILER "clang++")
-set(CMAKE_CXX_FLAGS "-Wall -std=c++17 -DSPDLOG_FMT_EXTERNAL")
+set(CMAKE_CXX_FLAGS "-Wall -std=c++17 -DSPDLOG_FMT_EXTERNAL=ON -Wno-deprecated-declarations")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}")
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
@@ -62,4 +62,4 @@ add_custom_target(
-style=file
-i
${ALL_SOURCE_FILES}
-)
\ No newline at end of file
+)
diff --git a/Dockerfile b/Dockerfile
index ebff9cf..817181a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM debian:bullseye-slim
+FROM debian:bookworm-slim
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && \
@@ -33,4 +33,4 @@ RUN apt-get -qq remove clang cmake git ninja-build && \
USER eventhub
-ENTRYPOINT [ "/usr/bin/eventhub" ]
\ No newline at end of file
+ENTRYPOINT [ "/usr/bin/eventhub" ]
diff --git a/Dockerfile.debug b/Dockerfile.debug
index 1d8def3..2f98b02 100644
--- a/Dockerfile.debug
+++ b/Dockerfile.debug
@@ -2,7 +2,7 @@ FROM debian:bullseye-slim
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && \
- apt-get -qq install clang cmake git openssl libssl-dev libhiredis-dev gdb bash vim psmisc procps htop curl sudo \
+ apt-get -qq install gcc g++ cmake git openssl libssl-dev libhiredis-dev gdb bash vim psmisc procps htop curl sudo \
libspdlog-dev libfmt-dev ninja-build
RUN mkdir -p /usr/src/redis-plus-plus && cd /usr/src/redis-plus-plus && \
@@ -17,8 +17,10 @@ WORKDIR /usr/src/eventhub
COPY . .
RUN mkdir -p build && cd build && \
- cmake -DSKIP_TESTS=1 -GNinja -DCMAKE_BUILD_TYPE=RelWithDebInfo .. && \
- ninja && \
+ sed -i 's/clang++/g++/' ../CMakeLists.txt && \
+ sed -i 's/clang/gcc/' ../CMakeLists.txt && \
+ cmake -GNinja -DSKIP_TESTS=1 -DCMAKE_BUILD_TYPE=Debug .. && \
+ ninja -j0 && \
cp -a eventhub /usr/bin/eventhub
WORKDIR /tmp
@@ -28,9 +30,6 @@ RUN addgroup --system eventhub && \
echo "eventhub ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers && \
mkdir -p /tmp/coredumps; chown -R eventhub:eventhub /tmp/coredumps
-RUN apt-get -qq remove clang cmake git ninja-build && \
- apt-get -qq -f autoremove
-
USER eventhub
ENTRYPOINT [ "/usr/bin/eventhub" ]
\ No newline at end of file
diff --git a/README.md b/README.md
index ddc7a49..34d2bf1 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
Eventhub is a pub/sub over WebSocket server written in modern C++.
It implements the [publish-subscribe pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) and concept of topics.
-The key focus of the project is to deliver high performance, availability, and easy integration.
+The key focus of the project is to deliver high performance and ease of integration.
@@ -42,7 +42,7 @@ Eventhub use the same layout for patterns as MQTT where ```+``` matches a single
## Eventlog
-Eventhub stores all published messages into a log that can be requested by clients who want to get all events in or since a given time frame. For example if a client gets disconnected it can request this log to get all new events since the last event that was received.
+Eventhub stores all published messages into a log that can be requested by clients who want to get all events in or since a given time frame. For example if a client is disconnected it can request this log when it reconnects to get all new events since the last event that was received.
## Authentication
@@ -63,8 +63,16 @@ This token wil allow subscription to all channels under ```topic1``` and ```topi
Eventhub does not have a interface or API to generate these tokens for you yet. So you have to generate them in your backend or through a JWT token generator like [jwt.io](https://jwt.io/).
+## Key/value store
+
+Most applications require some kind of state to be stored.
+Eventhub implements a simple key/value store for this purpose. This is a feature separate from the pub/sub functionality.
+
+The read/write ACL's in the JWT-token defines what keys you have access to.
+
# Clients
* [JavaScript (Browser/Node.js)](https://github.com/olesku/eventhub-jsclient)
+* [Python](https://github.com/olesku/eventhub-pyclient)
* [Swift (iOS 13+)](https://github.com/shtrihstr/EventHub)
#### Implementing your own client
@@ -95,18 +103,20 @@ All configuration options can also be set using [environment variables](https://
|handshake_timeout | Client handshake timeout | 15
|disable_auth | Disable client authentication | false
|[enable_sse](docs/sse.md) | Enable Server-Sent-Events support | false
-|enable_cache | Enable retained cache for topics. | true
+|enable_cache | Enable retained cache for topics. | false
|prometheus_metric_prefix | Prometheus prefix | eventhub
|default_cache_ttl | Default message TTL | 60
-|max_cache_request_limit | Default returned cache result limit | 1000
+|max_cache_request_limit | Default returned cache result limit | 100
|log_level | Log level to use | info
|enable_ssl | Enable SSL | false
+|ssl_listen_port | Port to listen on for SSL requests | 8443
|ssl_certificate | Path to certificate for SSL | None
|ssl_private_key | Path to private key for SSL | None
|ssl_ca_certificate | Path to CA certificate | None
|ssl_cert_auto_reload | Reload ssl cert when changed on disk | false
|ssl_cert_check_interval | How often to check for cert changes | 300
|disable_unsecure_listener | Disable unsecure listener when ssl is enabled | false
+|enable_kvstore | Enable key/value store functionality | true
## Docker
The easiest way is to use our docker image.
diff --git a/docs/protocol.md b/docs/protocol.md
index 794bc8d..a41a042 100644
--- a/docs/protocol.md
+++ b/docs/protocol.md
@@ -1,15 +1,19 @@
# Protocol specification
Eventhub uses [JSON-RPC](http://www.jsonrpc.org/) over WebSocket as transport protocol.
-| RPC method | Parameters | Description |
-|-------------------------------------|----------------|-------------|
-| [subscribe](#subscribe) | *topic, since* | Subscribe to a topic or pattern.
-| [publish](#publish) | *topic, message* | Publish to a topic.
-| [unsubscribe](#unsubscribe) | *topic* | Unsubscribe from a topic or pattern.
-| [unsubscribeall](#unsubscribeall) | *None* | Unsubscribe from all current subscriptions.
-| [list](#list) | *None* | List all current subscriptions.
-| [ping](#ping) | *None* | Ping the server.
-| [disconnect](#disconnect) | *None* | Disconnect from the server.
+| RPC method | Parameters | Description |
+|-------------------------------------|------------------------------ |---------------------------------------------|
+| [subscribe](#subscribe) | *topic, since* | Subscribe to a topic or pattern.
+| [publish](#publish) | *topic, message* | Publish to a topic.
+| [unsubscribe](#unsubscribe) | *topic* | Unsubscribe from a topic or pattern.
+| [unsubscribeall](#unsubscribeall) | *None* | Unsubscribe from all current subscriptions.
+| [list](#list) | *None* | List all current subscriptions.
+| [eventlog](#eventlog) | *topic, since, sinceEventId, limit* | Request event history for a topic.
+| [get](#get) | *key* | Get key from key/value store.
+| [set](#set) | *key, value, ttl* | Set key in key/value store.
+| [del](#del) | *key* | Delete key in key/value store.
+| [ping](#ping) | *None* | Ping the server.
+| [disconnect](#disconnect) | *None* | Disconnect from the server.
**Important:** Each request must have a unique `id` attribute as specified by JSON-RPC. It can be a number or a string.
@@ -24,14 +28,14 @@ If you are implementing your own client I can recommend using the nice [websocat
"id": 1,
"jsonrpc": "2.0",
"method": "subscribe",
- "params": {
- "topic": "my/topic1",
+ "params": {
+ "topic": "my/topic1",
"since": 0
}
}
```
-*The `since` attribute can be set to a timestamp or a message id to get all events from the history log since that period. If unset or set to 0 no history will be requested.*
+*The `since` attribute can be set to a timestamp or a message id to get all events from the eventlog since that period. If unset or set to 0 eventlog will not be requested.*
**Confirmation response:**
```json
@@ -154,13 +158,150 @@ All messages received on a subscribed topic or pattern will have the same `id` a
}
```
-## ping
+## eventlog
+**Request:**
+```json5
+{
+ "id": 1,
+ "jsonrpc": "2.0",
+ "method": "eventlog",
+ "params": {
+ // All events from the past 60 seconds.
+ // 'since' can also be a literal unix timestamp in milliseconds or
+ // you can use 'sinceEventId' to get all events since a given
+ // event id.
+ "since": -60000,
+
+ // Limit result to 100 latest events in given time period.
+ "limit": 100
+ }
+}
+```
+
+**Response:**
+```json
+{
+ "id": 2,
+ "jsonrpc": "2.0",
+ "result": {
+ "action": "eventlog",
+ "items": [
+ {
+ "id": "1661265352086-0",
+ "message": "Event 1",
+ "topic": "my/topic1"
+ },
+ {
+ "id": "1661265374910-0",
+ "message": "Event 1",
+ "topic": "my/topic1"
+ },
+ {
+ "id": "1661265379198-0",
+ "message": "Event 2",
+ "topic": "my/topic1"
+ },
+ {
+ "id": "1661265383286-0",
+ "message": "Event 3",
+ "topic": "my/topic1"
+ }
+ ],
+ "status": "ok",
+ "topic": "my/topic1"
+ }
+}
+```
+## get
**Request:**
```json
{
"id": 6,
"jsonrpc": "2.0",
+ "method": "get",
+ "params": {
+ "key": "my/key"
+ }
+}
+```
+
+**Response:**
+```json
+{
+ "id": 6,
+ "jsonrpc": "2.0",
+ "result": {
+ "action": "get",
+ "key": "my/key",
+ "value": "some value"
+ }
+}
+```
+
+## set
+**Request:**
+```json
+{
+ "id": 7,
+ "jsonrpc": "2.0",
+ "method": "set",
+ "params": {
+ "key": "my/key",
+ "value": "some value",
+ "ttl": 3600
+ }
+}
+```
+
+*If `ttl` attribute is omitted or set to `0` it means the key is stored without any expirity time.*
+
+**Response:**
+```json
+{
+ "id": 7,
+ "jsonrpc": "2.0",
+ "result": {
+ "action": "set",
+ "key": "my/key",
+ "success": true
+ }
+}
+```
+
+## del
+**Request:**
+```json
+{
+ "id": 8,
+ "jsonrpc": "2.0",
+ "method": "del",
+ "params": {
+ "key": "my/key"
+ }
+}
+```
+
+**Response:**
+```json
+{
+ "id": 8,
+ "jsonrpc": "2.0",
+ "result": {
+ "action": "del",
+ "key": "my/key",
+ "success": true
+ }
+}
+```
+
+## ping
+
+**Request:**
+```json
+{
+ "id": 9,
+ "jsonrpc": "2.0",
"method": "ping",
"params": []
}
@@ -169,7 +310,7 @@ All messages received on a subscribed topic or pattern will have the same `id` a
**Response:**
```json
{
- "id": 6,
+ "id": 9,
"jsonrpc": "2.0",
"result": {
"pong": 1574846750424
@@ -183,7 +324,7 @@ Contents of the `pong` attribute is the server time since epoch in milliseconds.
**Request:**
```json
{
- "id": 7,
+ "id": 10,
"jsonrpc": "2.0",
"method": "disconnect",
"params": []
diff --git a/docs/rate-limiting.md b/docs/rate-limiting.md
new file mode 100644
index 0000000..f375838
--- /dev/null
+++ b/docs/rate-limiting.md
@@ -0,0 +1,40 @@
+# Rate limiting
+
+Eventhub allows you to rate limit how many messages a user/token is allowed to publish within a given time period (interval). This is configured by adding ```rlimit``` configuration to the token used by the publisher.
+
+#### Syntax
+```json5
+ "sub": "user@domain.com", // Must be defined and unique for limits to work.
+ "write": [ "topic1/#" ],
+ "read": [ "topic1/#" ],
+ "rlimit": [
+ {
+ "topic": "topic1/#", // Topic or pattern to limit.
+ "interval": 10, // Bucket interval.
+ "max": 10 // Max allowed publishes within this interval.
+ }
+ ]
+```
+
+You can have multiple limit configuration under ```rlimit```.
+
+#### Example
+```json
+ "sub": "user@domain.com",
+ "write": [ "topic1/#", "topic2" ],
+ "read": [ "topic1/#", "topic2" ],
+ "rlimit": [
+ {
+ "topic": "topic1/#",
+ "interval": 10,
+ "max": 10
+ },
+ {
+ "topic": "topic2",
+ "interval": 10,
+ "max": 10
+ }
+ ]
+```
+
+In cases where you have multiple limits that matches a given topic, i.e patterns and distinct topic name, the closest match will be used.
diff --git a/docs/sse.md b/docs/sse.md
index 9e3ebbf..f1fa2f0 100644
--- a/docs/sse.md
+++ b/docs/sse.md
@@ -28,7 +28,7 @@ data: Foobar // Message content
: // Ping event.
```
-## Requesting cache / event history
+## Requesting cache / eventlog
| Header | Query parameter | Description |
|---------------|-----------------|---------------------------------------------------------------------|
| Last-Event-ID | since | Get all events since specified lastevent id when connecting |
diff --git a/eventhub.conf.example b/eventhub.conf.example
index 5128cf8..4588735 100644
--- a/eventhub.conf.example
+++ b/eventhub.conf.example
@@ -5,6 +5,7 @@ jwt_secret = FooBarBaz
log_level = info
disable_auth = true
prometheus_metric_prefix = eventhub
+enable_kvstore = true
# Redis settings.
redis_host = localhost
diff --git a/include/AccessController.hpp b/include/AccessController.hpp
index 4fd4d5a..bceea37 100644
--- a/include/AccessController.hpp
+++ b/include/AccessController.hpp
@@ -1,17 +1,39 @@
-#ifndef INCLUDE_ACCESSCONTROLLER_HPP_
-#define INCLUDE_ACCESSCONTROLLER_HPP_
+#pragma once
#include
#include
#include
+#include
-#include "Config.hpp"
#include "Forward.hpp"
#include "EventhubBase.hpp"
#include "jwt/jwt.hpp"
namespace eventhub {
+struct rlimit_config_t {
+ std::string topic;
+ unsigned long interval;
+ unsigned long max;
+};
+
+typedef struct rlimit_config_t rlimit_config_t;
+
+struct NoRateLimitForTopic : public std::exception {
+ const char* what() const throw() {
+ return "Token has no rate limits defined";
+ }
+};
+
+class RateLimitConfig final {
+ private:
+ std::vector _limitConfigs;
+
+ public:
+ bool loadFromJSON(const nlohmann::json::array_t& config);
+ const rlimit_config_t getRateLimitForTopic(const std::string& topic);
+};
+
class AccessController final : public EventhubBase {
private:
bool _token_loaded;
@@ -20,6 +42,7 @@ class AccessController final : public EventhubBase {
jwt::jwt_object _token;
std::vector _publish_acl;
std::vector _subscribe_acl;
+ RateLimitConfig _rlimit;
public:
AccessController(Config &cfg) :
@@ -31,8 +54,9 @@ class AccessController final : public EventhubBase {
bool allowSubscribe(const std::string& topic);
bool allowCreateToken(const std::string& path);
const std::string& subject();
+ RateLimitConfig& getRateLimitConfig() { return _rlimit; };
};
} // namespace eventhub
-#endif // INCLUDE_ACCESSCONTROLLER_HPP_
+
diff --git a/include/Common.hpp b/include/Common.hpp
index a264b85..9b34158 100644
--- a/include/Common.hpp
+++ b/include/Common.hpp
@@ -1,5 +1,4 @@
-#ifndef INCLUDE_COMMON_HPP_
-#define INCLUDE_COMMON_HPP_
+#pragma once
#include
#include
@@ -14,22 +13,22 @@ static constexpr unsigned int EPOLL_MAX_TIMEOUT = 100;
static constexpr unsigned int MAXEVENTS = 1024;
// Read buffer size.
-static constexpr size_t NET_READ_BUFFER_SIZE = 128;
+static constexpr std::size_t NET_READ_BUFFER_SIZE = 512;
// Max write buffer size.
-static constexpr size_t NET_WRITE_BUFFER_MAX = (1024 * 1000) * 8;
+static constexpr std::size_t NET_WRITE_BUFFER_MAX = (1024 * 1000) * 8;
// Hangup connection if data frame is larger than this.
-static constexpr size_t MAX_DATA_FRAME_SIZE = (1024 * 1000) * 8;
+static constexpr std::size_t MAX_DATA_FRAME_SIZE = (1024 * 1000) * 8;
// String used in Sec-WebSocket-Accept header during websocket handshake.
-static constexpr const char* WS_MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+static constexpr const char* WS_MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11\0";
// Will split up into continuation frames above this threshold.
-static constexpr size_t WS_MAX_CHUNK_SIZE = 1 << 15;
+static constexpr std::size_t WS_MAX_CHUNK_SIZE = 1 << 15;
// Hangup connection if control frame is larger than this.
-static constexpr size_t WS_MAX_CONTROL_FRAME_SIZE = 1024;
+static constexpr std::size_t WS_MAX_CONTROL_FRAME_SIZE = 1024;
// Delay metric sample rate.
static constexpr unsigned int METRIC_DELAY_SAMPLE_RATE_MS = 5000;
@@ -40,4 +39,4 @@ static constexpr unsigned int CACHE_PURGER_INTERVAL_MS = (60 * 1000);
// Maximum SSL handshake retries.
static const unsigned int SSL_MAX_HANDSHAKE_RETRY = 5;
-#endif // INCLUDE_COMMON_HPP_
+
diff --git a/include/Config.hpp b/include/Config.hpp
index 05be28a..c6614bb 100644
--- a/include/Config.hpp
+++ b/include/Config.hpp
@@ -1,6 +1,7 @@
#ifndef __INCLUDE_CONFIG_HPP__
#define __INCLUDE_CONFIG_HPP__
+#include
#include
#include
#include
@@ -8,6 +9,8 @@
#include
#include
#include
+#include
+#include
namespace eventhub {
@@ -194,4 +197,4 @@ class Config final {
};
}
-#endif
\ No newline at end of file
+#endif
diff --git a/include/Connection.hpp b/include/Connection.hpp
index 7a7040a..8119617 100644
--- a/include/Connection.hpp
+++ b/include/Connection.hpp
@@ -1,5 +1,4 @@
-#ifndef INCLUDE_CONNECTION_HPP_
-#define INCLUDE_CONNECTION_HPP_
+#pragma once
#include
#include
@@ -9,7 +8,6 @@
#include "EpollWrapper.hpp"
#endif
#include
-
#include
#include
#include
@@ -19,16 +17,13 @@
#include
#include
-#include "EventhubBase.hpp"
#include "Forward.hpp"
-#include "AccessController.hpp"
-#include "Common.hpp"
-#include "http/Parser.hpp"
+#include "EventhubBase.hpp"
+#include "websocket/Types.hpp"
+#include "http/Types.hpp"
#include "jsonrpc/jsonrpcpp.hpp"
-#include "websocket/Parser.hpp"
namespace eventhub {
-
using ConnectionPtr = std::shared_ptr;
using ConnectionWeakPtr = std::weak_ptr;
using ConnectionListIterator = std::list::iterator;
@@ -59,7 +54,7 @@ class Connection : public EventhubBase, public std::enable_shared_from_this::iterator connectionIterator);
ConnectionListIterator getConnectionListIterator();
ConnectionPtr getSharedPtr();
@@ -67,7 +62,7 @@ class Connection : public EventhubBase, public std::enable_shared_from_this listSubscriptions();
void onHTTPRequest(http::ParserCallback callback);
@@ -87,8 +82,8 @@ class Connection : public EventhubBase, public std::enable_shared_from_this _http_parser;
- websocket::Parser _websocket_parser;
- AccessController _access_controller;
+ std::unique_ptr _websocket_parser;
+ std::unique_ptr _access_controller;
ConnectionState _state;
bool _is_shutdown;
bool _is_shutdown_after_flush;
@@ -97,10 +92,8 @@ class Connection : public EventhubBase, public std::enable_shared_from_this
#include
@@ -7,15 +6,14 @@
#include
#include
#include
+#include
#include "Forward.hpp"
+#include "metrics/Types.hpp"
#include "EventhubBase.hpp"
-#include "Config.hpp"
-#include "Connection.hpp"
#include "EventLoop.hpp"
-#include "TopicManager.hpp"
#include "Worker.hpp"
-#include "metrics/Types.hpp"
+#include "Connection.hpp"
namespace eventhub {
@@ -26,7 +24,7 @@ class Worker final : public EventhubBase, public WorkerBase {
Worker(Server* srv, unsigned int workerId);
~Worker();
- TopicManager& getTopicManager() { return _topic_manager; }
+ TopicManager* getTopicManager() { return _topic_manager.get(); }
void subscribeConnection(ConnectionPtr conn, const std::string& topicFilterName);
void publish(const std::string& topicName, const std::string& data);
@@ -39,10 +37,10 @@ class Worker final : public EventhubBase, public WorkerBase {
unsigned int _workerId;
Server* _server;
int _epoll_fd;
- EventLoop _ev;
+ std::unique_ptr _ev;
ConnectionList _connection_list;
std::mutex _connection_list_mutex;
- TopicManager _topic_manager;
+ std::unique_ptr _topic_manager;
metrics::WorkerMetrics _metrics;
int64_t _ev_delay_sample_start;
@@ -56,4 +54,4 @@ class Worker final : public EventhubBase, public WorkerBase {
} // namespace eventhub
-#endif // INCLUDE_CONNECTIONWORKER_HPP_
+
diff --git a/include/EventLoop.hpp b/include/EventLoop.hpp
index 666cc83..87852e1 100644
--- a/include/EventLoop.hpp
+++ b/include/EventLoop.hpp
@@ -1,5 +1,4 @@
-#ifndef INCLUDE_EVENTLOOP_HPP_
-#define INCLUDE_EVENTLOOP_HPP_
+#pragma once
#include
#include
@@ -124,5 +123,3 @@ class EventLoop final {
};
} // namespace eventhub
-
-#endif // INCLUDE_EVENTLOOP_HPP_
diff --git a/include/EventhubBase.hpp b/include/EventhubBase.hpp
index ea6e49b..2884156 100644
--- a/include/EventhubBase.hpp
+++ b/include/EventhubBase.hpp
@@ -1,5 +1,4 @@
-#ifndef INCLUDE_EVENTHUBBASE_HPP_
-#define INCLUDE_EVENTHUBBASE_HPP_
+#pragma once
#include "Config.hpp"
@@ -19,6 +18,4 @@ class EventhubBase {
};
-}
-
-#endif
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/include/Forward.hpp b/include/Forward.hpp
index a91a201..bb92d53 100644
--- a/include/Forward.hpp
+++ b/include/Forward.hpp
@@ -1,13 +1,29 @@
-#ifndef INCLUDE_FORWARD_HPP_
-#define INCLUDE_FORWARD_HPP_
-
namespace eventhub {
-
-class Server;
-class Worker;
+class Config;
class Connection;
+class HandlerContext;
+class KVStore;
+class Redis;
+class Server;
class Topic;
+class Worker;
+class TopicManager;
+class AccessController;
+
+namespace http {
+class Parser;
+class Response;
+enum class RequestState;
+}
+namespace websocket {
+class Handler;
+class Parser;
+class Response;
}
-#endif
\ No newline at end of file
+namespace sse {
+class Handler;
+class Response;
+}
+}
\ No newline at end of file
diff --git a/include/HandlerContext.hpp b/include/HandlerContext.hpp
index 8af6303..1c424ac 100644
--- a/include/HandlerContext.hpp
+++ b/include/HandlerContext.hpp
@@ -1,16 +1,17 @@
-#ifndef INCLUDE_HANDLERCONTEXT_HPP_
-#define INCLUDE_HANDLERCONTEXT_HPP_
+#pragma once
#include
-#include "Server.hpp"
+
+#include "Forward.hpp"
#include "EventhubBase.hpp"
+#include "Config.hpp"
namespace eventhub {
class HandlerContext final : public EventhubBase {
public:
- HandlerContext(Server* server, Worker* worker, std::shared_ptr connection) :
- EventhubBase(server->config()), _server(server), _worker(worker), _connection(connection) {};
+ HandlerContext(Config& cfg, Server* server, Worker* worker, std::shared_ptr connection) :
+ EventhubBase(cfg), _server(server), _worker(worker), _connection(connection) {};
~HandlerContext() {}
@@ -26,4 +27,4 @@ class HandlerContext final : public EventhubBase {
} // namespace eventhub
-#endif // INCLUDE_HANDLERCONTEXT_HPP_
+
diff --git a/include/KVStore.hpp b/include/KVStore.hpp
new file mode 100644
index 0000000..64b76fc
--- /dev/null
+++ b/include/KVStore.hpp
@@ -0,0 +1,29 @@
+#pragma once
+
+#include
+
+#include "Forward.hpp"
+#include "EventhubBase.hpp"
+#include "Config.hpp"
+
+namespace eventhub {
+ class KVStore final : public EventhubBase {
+ private:
+ std::string _prefix;
+ Redis& _redis;
+
+ const std::string _prefix_key(const std::string& key) const;
+
+ public:
+ KVStore(Config& cfg, Redis& redis) :
+ EventhubBase(cfg),
+ _redis(redis) {
+ _prefix = config().get("redis_prefix");
+ }
+
+ bool is_enabled();
+ const std::string get(const std::string& key) const;
+ bool set(const std::string& key, const std::string& value, unsigned long ttl = 0) const;
+ long long del(const std::string& key) const;
+ };
+}
diff --git a/include/Logger.hpp b/include/Logger.hpp
index f10138b..ee7c839 100644
--- a/include/Logger.hpp
+++ b/include/Logger.hpp
@@ -1,5 +1,4 @@
-#ifndef INCLUDE_LOGGER_HPP_
-#define INCLUDE_LOGGER_HPP_
+#pragma once
#include
#include