7
7
import io .rsocket .transport .netty .client .TcpClientTransport ;
8
8
import io .rsocket .transport .netty .server .TcpServerTransport ;
9
9
import io .rsocket .util .DefaultPayload ;
10
+ import io .vavr .control .Try ;
10
11
import lombok .extern .slf4j .Slf4j ;
12
+ import org .junit .jupiter .api .AfterEach ;
13
+ import org .junit .jupiter .api .BeforeEach ;
11
14
import org .junit .jupiter .api .Test ;
12
15
import reactor .core .Disposable ;
13
16
import reactor .core .publisher .Flux ;
@@ -38,10 +41,10 @@ public Flux<Payload> requestStream(Payload payload) {
38
41
class MyServer {
39
42
final Disposable listener ;
40
43
41
- MyServer () {
44
+ MyServer (int port ) {
42
45
listener = RSocketFactory .receive ()
43
46
.acceptor ((setup , sendingSocket ) -> Mono .just (new MyRequestStream ()))
44
- .transport (TcpServerTransport .create (7000 ))
47
+ .transport (TcpServerTransport .create (port ))
45
48
.start ()
46
49
.subscribe ();
47
50
}
@@ -50,10 +53,10 @@ class MyServer {
50
53
class MyClient {
51
54
final Mono <RSocket > requester ;
52
55
53
- MyClient () {
56
+ MyClient (int port ) {
54
57
requester = RSocketFactory .connect ()
55
58
.keepAliveAckTimeout (Duration .ofSeconds (3 ))
56
- .transport (TcpClientTransport .create (7000 ))
59
+ .transport (TcpClientTransport .create (port ))
57
60
.start ();
58
61
}
59
62
}
@@ -63,34 +66,52 @@ class SimpleTest {
63
66
64
67
@ Test
65
68
void test () {
66
- MyServer server = new MyServer ();
67
- MyClient client = new MyClient ();
68
- String payload = "Hello " ;
69
+ MyServer server = new MyServer (7001 );
70
+ MyClient client = new MyClient (7001 );
71
+ String payload = "Привет " ;
69
72
70
- // no back pressure:
71
73
StepVerifier .create (client .requester .flatMapMany (rr -> rr .requestStream (DefaultPayload .create (payload )))
72
- .map (Payload ::getDataUtf8 ))
73
- // .expectNextMatches(s -> s.contains("Hello-" ))
74
- .expectNextCount (5 )
74
+ .map (Payload ::getDataUtf8 )
75
+ . doOnEach ( stringSignal -> log . info ( "client 1: {}" , stringSignal . get ()) ))
76
+ .expectNextCount (payload . length () )
75
77
.verifyComplete ();
76
78
79
+ server .listener .dispose ();
80
+ }
81
+
82
+ @ Test
83
+ void test_back_pressure () {
84
+ MyServer server = new MyServer (7002 );
85
+ MyClient client = new MyClient (7002 );
86
+ String payload = "Hello" ;
87
+
77
88
// back-pressure: request only 2 items...
78
89
StepVerifier .create (client .requester .flatMapMany (rr -> rr .requestStream (DefaultPayload .create (payload ))
79
90
.take (2 )) // back-pressure
80
- .map (Payload ::getDataUtf8 ))
91
+ .map (Payload ::getDataUtf8 )
92
+ .doOnEach (stringSignal -> log .info ("client 2: {}" , stringSignal .get ())))
81
93
.expectNextMatches (s -> s .contains ("Hello-0" ))
82
94
.expectNextMatches (s -> s .endsWith ("Hello-1" ))
83
95
// .expectNextCount(2)
84
96
.verifyComplete ();
85
97
86
- // client.requester.flatMapMany(rr -> rr.requestStream(DefaultPayload.create(payload))
87
- // .take(2)) // back-pressure
88
- // .map(Payload::getDataUtf8)
89
- // .map(res -> assertThat(res).containsIgnoringCase("hello-"))
90
- // .subscribe(s -> log.info("client: {}", s));
91
- //
92
- // io.vavr.control.Try.run(() -> Thread.sleep(payload.length() * 1234))
93
- // .andFinally(server.listener::dispose);
94
98
server .listener .dispose ();
95
99
}
100
+
101
+ @ Test
102
+ void yet_another () {
103
+ MyServer server = new MyServer (7003 );
104
+ MyClient client = new MyClient (7003 );
105
+ String payload = "Hola" ;
106
+
107
+ client .requester .flatMapMany (rr -> rr .requestStream (DefaultPayload .create (payload ))
108
+ .take (2 )) // back-pressure
109
+ .map (Payload ::getDataUtf8 )
110
+ .map (res -> assertThat (res ).containsIgnoringCase ("hola-" ))
111
+ .subscribe (s -> log .info ("client 3: {}" , s ));
112
+
113
+ Try .run (() -> Thread .sleep (payload .length () * 1234 ))
114
+ .andFinally (server .listener ::dispose )
115
+ .onFailure (throwable -> log .info ("oops: {}" , throwable .getLocalizedMessage ()));
116
+ }
96
117
}
0 commit comments