- Native For Comprehensions for Flux and Monos
- Flux based ReactiveSeq implementation
- Implement an extended Java 8 Stream using Reactor Flux
- Full integration with cyclops-react Xtended collections
- Asynchronously populate an Xtended Collection with Reactor Fluxs, materialize / block on first access
- AnyM monad wrapper for Monadic types (with full integration with cyclops-react Monad abstractions such as Kleisli)
- Monad wrapper uses native Flux and Mono operators
- Xtended Collections backed by Flux operate on Flux directly
- StreamT monad transformer operates directly with Flux
- MonoT monad transformer operates directly on Mono
- Extension Operators for Flux and ReactiveSeq (extend ReactiveSeq with Flux and Flux with ReactiveSeq)
- Companion classes for Fluxs and Monos offering :
- For comprehensions
- Helper functions for combining / accumulating and zipping values
In the example below we asynchronously populate an Xtended list using an Reactor Flux. Additional, reactive operations can be performed on the List asynchronously. The ListX only blocks on first access to the data.
import static cyclops.collections.mutable.ListX.listX;
import static cyclops.companion.rx.ObservableReactiveSeq.reactiveSeq;
AtomicBoolean complete = new AtomicBoolean(false);
Flux<Integer> async = Flux.just(1,2,3)
.subscribeOn(Schedulers.fromExecutor(ForkJoinPool.commonPool());
ListX<Integer> asyncList = listX(reactiveSeq(async))
.map(i->i+1);
System.out.println("Blocked? " + complete.get());
System.out.println("First value is " + asyncList.get(0));
System.out.println("Completed? " + complete.get());
Which will print
Blocked? false
First value is 101
Completed? true
For Fluxs
Flux<Integer> result = Fluxs.forEach(Flux.just(10, 20),
a -> Flux.<Integer> just(a + 10),
(a, b) -> a + b);
result.collect(Collectors.toList())
.block();
//[30, 50]
For Monos
Mono<Tuple<Integer,Integer>> result = Monos.forEach(Mono.just(10),
a -> Mono.<Integer> just(a + 10),
Tuple::tuple);
//[10, 20]
Use the Fluxs companion class to create Flux backed ReactiveSeqs
Create an Flux-backed ReactiveSeq directly or from an Observable
ReactiveSeq<Integer> seq = FluxReactiveSeq.just(1,2,3);
ReactiveSeq<Integer> seq2 = FluxReactiveSeq.reactiveSeq(FluxReactiveSeq.just(1,2,3));
With an Flux-back ReactiveSeq we can create Reactive Xtended Collections e.g. an extended j.u.List
import static cyclops.collections.mutable.ListX.listX;
import static cyclops.companion.reactor.FluxReactiveSeq.reactiveSeq;
ListX<Integer> asyncList = listX(reactiveSeq(flux))
.map(i->i+1);
Or a reactive Cyclops Vector
import static cyclops.companion.reactor.FluxReactiveSeq.reactiveSeq;
VectorX<Integer> asyncList = VectorX.vectorX(reactiveSeq(flux))
.map(i->i+1);
//vector is asynchronously populated by our Flux
//we can continue processing and block on first access or
//unwrap to raw vector type
asyncList.get(1); //will bock until data is available
//will also block until data is available
Vector<Integer> raw = asyncList.to(VavrConverters::Vector);
Use the visit method on ReactiveSeq to pattern match over it's reactive nature
- Synchronous
- reactive-streams based async backpressure
- pure asynchronous execution
For FluxReactiveSeq the visit method always executes the #2 function
ReactiveSeq<Integer> seq = FluxReactiveSeq.just(1,2,3);
String type = seq.fold(sync->"synchronous",rs->"reactive-streams",async->"pure async");
//"reactive-streams"
Use Reactor to extend cyclops-react's array of operations for and from Flux
import static cyclops.streams.ReactorOperators.flux;
ReactiveSeq<List<Integer>> seq = FluxReactiveSeq.of(1,2,3)
.map(i->i+1)
.to(flux(o->o.buffer(10)));
For and from Mono
import static cyclops.streams.ReactorOperators.mono;
Future<Integer> future;
Future<Integer> useMonoLogOp = future.to(mono(m->m.log()));
Also use cyclops-react operators in your Flux and Mono computations
AnyM is a type that can represent any Java Monad (allowing us to write more abstract code).
There are three types. AnyM abstracts over all monadic types. AnyMValue represents Monad types that resolve to a single scalar value (like Mono), AnyMSeq represents monad types that are sequences of values (just like Flux)
AnyM
|
|
__________________________
| |
AnyMValue AnyMSeq
We can create an AnyM instance for an Flux via Fluxs
Flux<Integer> myFlux;
AnyMSeq<flux,Integer> monad = Fluxs.anyM(myFlux);
monad.map(i->i*2)
.zipWithIndex();
.filter(t->t._1()<100l);
We can create an AnyM instance for an Mono via Monos
Mono<Integer> mono;
AnyMValue<mono,Integer> monad = Monos.anyM(mono);
monad.map(i->i*2);
Convert back to Flux via Fluxs.raw (or ReactorWitness.flux) and to Mono via Monos.raw (or ReactorWitness.mono)
AnyMSeq<flux,Integer> monad;
Flux<Integer> obs = Fluxs.raw(monad);
We can write generic methods that accept any Monad type
public <W extends WitnessType<W>> AnyMSeq<W,Integer> sumAdjacent(AnyMSeq<W,Integer> sequence){
return sequence.sliding(1)
.map(t->t.sum(i->i).get())
}
AnyM manages your Fluxs, they still behave reactively like Fluxs should!
AtomicBoolean complete = new AtomicBoolean(false);
Flux<Integer> async = Flux.just(1,2,3)
.subscribeOn(Schedulers.fromExecutor(ForkJoinPool.commonPool());
AnyMSeq<flux,Integer> monad = Fluxs.anyM(async);
monad.map(i->i*2)
.forEach(System.out::println,System.err::println,()->complete.set(true));
System.out.println("Blocked? " + complete.get());
while(!complete.get()){
Thread.yield();
}
Blocked? false
2
4
6
Fluxs can also be defined as part of the reactiveSeq family of types inside AnyM - AnyM<reactiveSeq,Integer>
AnyM<reactiveSeq,Integer> anyM = FluxReactiveSeq.just(1,2,3)
.anyM();
ReactiveSeq<Integer> seq = Witness.reactiveSeq(anyM);
Monad Transformers allow us to manipulate nested types - for example we could use the StreamT monad Transformer to manipulate a List of Fluxs as if it was a single Flux. Or an Flux inside an Mono as if we were operating directly on the Flux within.
Via liftM in Fluxs
ListX<Flux<Integer>> nested = ListX.of(Flux.just(10));
StreamT<list,Integer> listOfFluxs = Fluxs.liftM(nested.anyM());
StreamT<list,Integer> doubled = listOfFluxs.map(i->i*2);
Via Flux backed ReactiveSeq
ReactiveSeq<Integer> reactive = FluxReactiveSeq.just(1,2,3);
StreamT<optional,Integer> transformer = reactive.liftM(Witness.optional.INSTANCE);
Extacting Flux from StreamT
Use the unwrapTo method in conjunction with Fluxs::fromStream to get an
StreamT<list,Integer> trans = FluxReactiveSeq.just(1,2,3).liftM(list.INSTANCE);
AnyM<list,Flux<T>> anyM = trans.unwrapTo(Fluxs::fromStream);
Use Witness.list to convert to a List
StreamT<list,Integer> trans = FluxReactiveSeq.just(1,2,3).liftM(list.INSTANCE);
ListX<Flux<Integer>> listObs = Witness.list(trans.unwrapTo(Fluxs::fromStream));
The MonoT monad transformer is a monad transformer for working directly with nested Mono types
Via liftM in Monos
ListX<Mono<Integer>> nested = ListX.of(Mono.just(10));
MonoT<list,Integer> listOfMonos = Monos.liftM(nested.anyM());
MonoT<list,Integer> doubled = listOfMonos.map(i->i*2);
or just MonoT of
ListX<Mono<Integer>> nested = ListX.of(Mono.just(10));
MonoT<list,Integer> listOfMonos = MonoT.of(nested.anyM());
MonoT<list,Integer> doubled = listOfMonos.map(i->i*2);
Extracting nested Monos from MonoT
MonoT<list,Integer> trans = Monos.liftM(ListX.of(Mono.just(1)).anyM());
AnyM<list,Mono<Integer>> anyM = trans.unwrap();
System.out.println(anyM);
import static cyclops.stream.Generator.suspend;
import static cyclops.stream.Generator.times;
i = 100;
k = 9999;
Flux<Integer> fi = Flux.fromIterable(suspend((Integer i) -> i != 4, s -> {
Generator<Integer> gen1 = suspend(times(2), s2 -> s2.yield(i++));
Generator<Integer> gen2 = suspend(times(2), s2 -> s2.yield(k--));
return s.yieldAll(gen1.stream(), gen2.stream());
}
));
//Flux(100, 101, 9999, 9998, 102)