forked from ad-freiburg/qlever
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AsyncStreamTest.cpp
54 lines (42 loc) · 1.61 KB
/
AsyncStreamTest.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Copyright 2022, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Robin Textor-Falconi ([email protected])
#include <gtest/gtest.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <semaphore>
#include "../src/util/AsyncStream.h"
using ad_utility::streams::runStreamAsync;
cppcoro::generator<std::string> generateNChars(
size_t n, std::atomic_size_t& totalProcessed) {
for (size_t i = 0; i < n; i++) {
co_yield "A";
totalProcessed = i + 1;
}
}
TEST(AsyncStream, EnsureMaximumBufferLimitWorks) {
std::atomic_size_t totalProcessed = 0;
size_t bufferLimit = 10;
auto stream = runStreamAsync(generateNChars(bufferLimit + 2, totalProcessed),
bufferLimit);
auto iterator = stream.begin();
while (totalProcessed <= bufferLimit) {
std::this_thread::sleep_for(std::chrono::milliseconds{10});
}
// stream.begin() consumes a single element, and bufferLimit elements are
// stored in the queue inside of stream.
ASSERT_EQ(totalProcessed, bufferLimit + 1);
// One element has been retrieved, so another one may enter the buffer.
++iterator;
while (totalProcessed == bufferLimit + 1) {
std::this_thread::sleep_for(std::chrono::milliseconds{10});
}
ASSERT_EQ(totalProcessed, bufferLimit + 2);
}
TEST(AsyncStream, EnsureBuffersArePassedCorrectly) {
const std::vector<std::string> testData{"Abc", "Def", "Ghi"};
auto generator = runStreamAsync(testData, 2);
ASSERT_TRUE(ql::ranges::equal(testData.begin(), testData.end(),
generator.begin(), generator.end()));
}