Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question regarding Reactor 3 #6

Open
petivagyok16 opened this issue May 16, 2018 · 1 comment
Open

Question regarding Reactor 3 #6

petivagyok16 opened this issue May 16, 2018 · 1 comment

Comments

@petivagyok16
Copy link

petivagyok16 commented May 16, 2018

I know this is kind of irrelevant here, but i don't know any other channel to ask Reactor 3 questions. If you know a place i can ask my question feel free to share :)

So, i'd like to refactor my code, which consists of 3 Publishers (ReactiveMongoRepository responses), and i need all the responses from the repository in the same context to make calculations on them. Other difficulty is that 1 of the publisher is not the same type as the others (2 Mono, 1 Mono). How can i compose these publishers into one single stream in a form that i can access all the three values?

One more thing: does it make any sense to use .publishOn(Schedulers.parallel()) at all the three repository requests?

It looks like this:

return this.userRepository.findById(raterUserId)
	.publishOn(Schedulers.parallel())
	.flatMap(raterUser -> Mono.just(raterUser)
	.then(
		this.messageRepository.findById(messageId)
		.publishOn(Schedulers.parallel())
	        .single()
		.doOnError(this::messageNotFound)
		.flatMap(ratedMessage -> this.userRepository.findById(ratedMessage.getUserId())
								.publishOn(Schedulers.parallel())
								.flatMap(messageOwner -> { ... }
@petivagyok16
Copy link
Author

I refactored the code above to:

return Flux.zip(
		this.messageRepository.findById(messageId),
		this.userRepository.findById(raterUserId),
		this.userRepository.findUserByMessage(messageId))
	.single()
	.doOnError(this::handleDatabaseError)
	.publishOn(Schedulers.parallel())
	.flatMap(publisherList -> this.rateMessage(rating, prevRating, raterUserId, messageId, publisherList))
	.then(Mono.just(ResponseEntity.ok().build()));

What do you think? .publishOn does any make sense there? Does it runs the zip() and repository processes on another thread?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant