-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPart11BlockingToReactive.java
48 lines (40 loc) · 1.84 KB
/
Part11BlockingToReactive.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.BlockingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* Learn how to call blocking code from Reactive one with adapted concurrency strategy for
* blocking code that produces or receives data.
*
* For those who know RxJava:
* - RxJava subscribeOn = Reactor subscribeOn
* - RxJava observeOn = Reactor publishOn
* - RxJava Schedulers.io <==> Reactor Schedulers.elastic
*
* @see <a href="https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/BlockingToReactive">BlockingToReactive Questions Desc</a>
* @see Flux#subscribeOn(Scheduler)
* @see Flux#publishOn(Scheduler)
* @see Schedulers
*/
public class Part11BlockingToReactive {
//========================================================================================
// TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with an elastic scheduler
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() ->
Flux.fromIterable(repository.findAll())
.subscribeOn(Schedulers.elastic())
);
}
//========================================================================================
// TODO Insert users contained in the Flux parameter in the blocking repository using an elastic scheduler and return a Mono<Void> that signal the end of the operation
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic()).doOnNext(repository::save).then();
}
}