forked from ad-freiburg/qlever
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MessageSenderTest.cpp
108 lines (79 loc) · 3.71 KB
/
MessageSenderTest.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2023, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Robin Textor-Falconi <[email protected]>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include "util/AsyncTestHelpers.h"
#include "util/http/websocket/MessageSender.h"
#include "util/http/websocket/QueryHub.h"
using ad_utility::websocket::MessageSender;
using ad_utility::websocket::OwningQueryId;
using ad_utility::websocket::QueryHub;
using ad_utility::websocket::QueryId;
using ad_utility::websocket::QueryRegistry;
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::string_literals;
using ::testing::Pointee;
using ::testing::VariantWith;
ASYNC_TEST(MessageSender, destructorCallsSignalEnd) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};
auto distributor =
queryHub.createOrAcquireDistributorForReceiving(queryId.toQueryId());
// Create and immediately destroy a `MessageSender` s.t. we can test the side
// effects of the destructor.
{ MessageSender dummy{std::move(queryId), queryHub}; }
auto impl = [&]() -> net::awaitable<void> {
net::deadline_timer timer{ioContext, boost::posix_time::seconds(2)};
auto result = co_await (distributor->waitForNextDataPiece(0) ||
timer.async_wait(net::use_awaitable));
using PayloadType = std::shared_ptr<const std::string>;
EXPECT_THAT(result, VariantWith<PayloadType>(PayloadType{}));
};
co_await net::co_spawn(distributor->strand(), impl(), net::deferred);
}
// _____________________________________________________________________________
ASYNC_TEST(MessageSender, callingOperatorBroadcastsPayload) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryHub queryHub{ioContext};
{
auto distributor =
queryHub.createOrAcquireDistributorForReceiving(queryId.toQueryId());
MessageSender updateWrapper{std::move(queryId), queryHub};
updateWrapper("Still");
updateWrapper("Dre");
net::deadline_timer timer{ioContext, boost::posix_time::seconds(2)};
auto impl = [&]() -> net::awaitable<void> {
auto result = co_await (distributor->waitForNextDataPiece(0) ||
timer.async_wait(net::use_awaitable));
using PayloadType = std::shared_ptr<const std::string>;
EXPECT_THAT(result, VariantWith<PayloadType>(Pointee("Still"s)));
result = co_await (distributor->waitForNextDataPiece(1) ||
timer.async_wait(net::use_awaitable));
EXPECT_THAT(result, VariantWith<PayloadType>(Pointee("Dre"s)));
};
co_await net::co_spawn(distributor->strand(), impl, net::deferred);
}
// The destructor of `MessageSender` calls `signalEnd` on the distributor
// instance asynchronously, so we need to wait for it to be executed before
// destroying the backing `QueryHub` instance.
co_await net::post(net::use_awaitable);
}
// _____________________________________________________________________________
ASYNC_TEST(MessageSender, testGetQueryIdGetterWorks) {
QueryRegistry queryRegistry;
OwningQueryId queryId = queryRegistry.uniqueId("my-query");
QueryId reference = queryId.toQueryId();
QueryHub queryHub{ioContext};
{
MessageSender messageSender{std::move(queryId), queryHub};
EXPECT_EQ(reference, messageSender.getQueryId());
}
// The destructor of `MessageSender` calls `signalEnd` on the underlying
// distributor instance asynchronously, so we need to wait for it to be
// executed before destroying the backing `QueryHub` instance.
co_await net::post(net::use_awaitable);
}