Skip to content

Commit

Permalink
Added cases using subscribeOn and publishOn when debuggin reactive st…
Browse files Browse the repository at this point in the history
…reams, for analysis (eugenp#5841)
  • Loading branch information
rozagerardo authored and maibin committed Dec 5, 2018
1 parent c4d92d1 commit a21f820
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,36 @@ public void consumeFiniteFluxWithCheckpoint4() {
logger.info("process 4 with approach 4");
service.processUsingApproachFourWithCheckpoint(fluxFoo);
}

@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWitParallelScheduler() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 5-parallel with approach 5-parallel");
service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo);
}

@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithSingleSchedulers() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 5-single with approach 5-single");
service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Data
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Foo {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.baeldung.debugging.consumer.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Data
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class FooDto {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.baeldung.debugging.consumer.model.Foo;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Component
public class FooService {
Expand Down Expand Up @@ -88,4 +89,32 @@ public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) {
flux.subscribe();
}

public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux<Foo> flux) {
logger.info("starting approach five-parallel!");
flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo"))
.log();
flux = concatAndSubstringFooName(flux);
flux = divideFooQuantity(flux);
flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar"));
flux = concatAndSubstringFooName(flux).doOnError(error -> {
logger.error("Approach 5-parallel failed!", error);
});
flux.subscribeOn(Schedulers.newParallel("five-parallel-starter"))
.subscribe();
}

public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux<Foo> flux) {
logger.info("starting approach five-single!");
flux = flux.log()
.subscribeOn(Schedulers.newSingle("five-single-starter"));
flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo"));
flux = concatAndSubstringFooName(flux);
flux = divideFooQuantity(flux);
flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar"));
flux = concatAndSubstringFooName(flux).doOnError(error -> {
logger.error("Approach 5-single failed!", error);
});
flux.subscribe();
}

}

0 comments on commit a21f820

Please sign in to comment.