-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
ObservableDelay.java
105 lines (93 loc) · 4.21 KB
/
ObservableDelay.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package rx.observables.utils;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observers.TestSubscriber;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Delay operator will delay transformation of our Observable from lazy to eager the time that you specify
*/
public class ObservableDelay {
/**
* Using the delay operator we delay the creation of the pipeline from lazy to eager.
* But once start emitting the delay operator does not affect the items emitted
*/
@Test
public void delayCreation() {
long start = System.currentTimeMillis();
Subscription subscription = Observable.just("hello reactive world")
.delay(200, TimeUnit.MICROSECONDS)
.subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start)));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
/**
* If we want to delay the every single item emitted in the pipeline we will need constantClass hack,
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item.
* Shall print
* <p>
* time:586
* time:783
* time:982
*/
@Test
public void delayWithZipAndInterval() {
long start = System.currentTimeMillis();
Subscription subscription =
Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS),
(i, t) -> i)
.subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start)));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS);
}
/**
* Another elegant solution it would be to create an observable with the list of items, and then use
* concatMap to pass all items from the first observable to the second, then this second observable
* can be created used delay operator afterwards.
*/
@Test
public void delayWithConcatMap() {
Observable.from(Arrays.asList(1, 2, 3, 4, 5))
.concatMap(s -> Observable.just(s).delay(100, TimeUnit.MILLISECONDS))
.subscribe(n -> System.out.println(n + " just came..."),
e -> {
},
() -> System.out.println("Everybody came!"));
new TestSubscriber().awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
/**
* Another elegant solution it would be to create an observable with the list of items, and then use
* concatMap to pass all items from the first observable to the second, then this second observable
* can be created used delay operator afterwards.
*/
@Test
public void delayObservablesWithConcatMap() {
Observable.from(Arrays.asList(Observable.just(1), Observable.just(2), Observable.just(3)))
.concatMap(s -> s.delay(100, TimeUnit.MILLISECONDS))
.subscribe(n -> System.out.println(n + " just came..."),
e -> {
},
() -> System.out.println("Everybody came!"));
new TestSubscriber().awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
long total = 0;
@Test
public void customDelay() {
long start = System.currentTimeMillis();
Subscription subscription = Observable.just("hello reactive world with custom delay")
.map(value -> {
try {
Thread.sleep(new Random().nextInt(600));
} catch (InterruptedException e) {
e.printStackTrace();
}
total = System.currentTimeMillis() - start;
if (total > 500) total = 0;
return value;
})
.delay(total, TimeUnit.MILLISECONDS)
.subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start)));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
}