forked from valebes/ppl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_ordered_producer.rs
103 lines (95 loc) · 2.33 KB
/
test_ordered_producer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
Ordered FlatMap example.
*/
use ppl::prelude::*;
// Source produces strings.
struct Source {
strings: Vec<String>,
}
impl Out<String> for Source {
fn run(&mut self) -> Option<String> {
if !self.strings.is_empty() {
Some(self.strings.remove(0))
} else {
None
}
}
}
// Stage that produces 5 replicas of each input.
#[derive(Clone)]
struct WorkerA {
number_of_messages: usize,
queue: Vec<String>,
}
impl InOut<String, String> for WorkerA {
fn run(&mut self, input: String) -> Option<String> {
for _i in 0..self.number_of_messages {
self.queue.push(input.clone())
}
None
}
// This stage is a producer.
// In this method we produce the messages that will be sent to the next stage.
fn produce(&mut self) -> Option<String> {
if !self.queue.is_empty() {
Some(self.queue.pop().unwrap())
} else {
None
}
}
// This stage is a producer.
// Here we specify that, after run the run method,
// the framework must also call the produce method.
fn is_producer(&self) -> bool {
true
}
fn number_of_replicas(&self) -> usize {
2
}
fn is_ordered(&self) -> bool {
true
}
}
// Sink receives strings.
struct Sink {
queue: Vec<String>,
}
impl In<String, Vec<String>> for Sink {
fn run(&mut self, input: String) {
println!("{}", input);
self.queue.push(input)
}
fn finalize(self) -> Option<Vec<String>> {
Some(self.queue)
}
fn is_ordered(&self) -> bool {
true
}
}
#[test]
fn test_ordered_producer() {
env_logger::init();
let mut p = pipeline![
Source {
strings: vec![
"pippo".to_string(),
"pluto".to_string(),
"paperino".to_string(),
"topolino".to_string()
],
},
WorkerA {
number_of_messages: 5,
queue: Vec::new()
},
Sink { queue: Vec::new() }
];
p.start();
let res = p.wait_end().unwrap();
let a = vec!["pippo".to_string(); 5];
let b = vec!["pluto".to_string(); 5];
let c = vec!["paperino".to_string(); 5];
let d = vec!["topolino".to_string(); 5];
let check = [a, b, c, d].concat();
assert_eq!(res, check)
}