Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Jul 1, 2017
0 parents commit ee6cd1d
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 0 deletions.
47 changes: 47 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mikerusoft.reactor</groupId>
<artifactId>example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.16</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target> <!-- storing here exact version because of IDE, else I would use ${java.version} -->
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.mikeruoft.reactor;

import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author Mikhail Grinfeld
*/
public class SimpliefiedMrExampleWithReactor {

private static Flux<List<RoutingInfo>> fromAkaDB() {
return Flux.defer(() -> Flux.just(Arrays.asList(
new RoutingInfo(1L, "1"),
new RoutingInfo(2L, "2"),
new RoutingInfo(3L, "3"),
new RoutingInfo(6L, "4"),
new RoutingInfo(2L, "5"),
new RoutingInfo(5L, "6"),
new RoutingInfo(1L, "7"),
new RoutingInfo(2L, "8"),
new RoutingInfo(6L, "9"),
new RoutingInfo(1L, "10"),
new RoutingInfo(7L, "11"),
new RoutingInfo(9L, "12"),
new RoutingInfo(1L, "13")
)));

}

private static final int MAX_BULK = 3;
private static final Random r = new Random();
private static int getBulkSize() {
return r.nextInt(MAX_BULK) + 1;
}

@Test
public void fluxTest() throws InterruptedException {

// thread pool to run individual tasks
ExecutorService executor = Executors.newFixedThreadPool(5);

// let's define some stream which emitted every 3 seconds
Flux.interval(Duration.ofSeconds(3))
// every 3 seconds when event is emitted - let's take from DB list of RoutingInfo
.flatMap(i -> fromAkaDB())
// subscribe - actually, starts the flow. Until this subscribe nothing still working
.subscribe(l -> {
// at this point we define what we will do when we receive list of RoutingInfos
Flux<List<List<RoutingInfo>>> sendMcStream =
// create stream of RoutingInfo from list
Flux.fromIterable(l)
// enrich individual RoutingInfo with additional data
.map(SimpliefiedMrExampleWithReactor::setSerices)
// grouped by same message id
.groupBy(RoutingInfo::getMessageId, Function.identity())
.flatMap(groups -> groups.collect(Collectors.toList()))
// filter empty lists
.filter(rl -> !rl.isEmpty())
// we divide into bulks with
// specified maximum size
.flatMap(rl -> rl.size() > MAX_BULK ?
Mono.just(Lists.partition(rl, getBulkSize())) :
Mono.just(Collections.singletonList(rl))
);

// here we actually starts the flow
sendMcStream.subscribe(
lr -> {
// converts LIst of List of RoutingInfo into stream of individual lists of routing info
Flux.fromIterable(lr)
.filter(rl -> !rl.isEmpty())
// convert to stream of MessageContainer
.map(sp -> getMC (
sp.get(0).getMessageId(),
sp.stream().map(RoutingInfo::getDevice).collect(Collectors.toList())
))
// assign to this stream thread pool executor to use for parallel execution
.subscribeOn(Schedulers.fromExecutor(executor))
// here we actually start sending MC
.subscribe(SimpliefiedMrExampleWithReactor::sendMc);
}
);
});

Thread.sleep(10000L);
}

private static void sendMc(MessageContainer mc) {
System.out.println(new Date() + " " + mc);
}

private static MessageContainer getMC(long messageId, List<String> devices) {
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + messageId + " " + devices.stream().collect(Collectors.joining(",")));
return new MessageContainer();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class MessageContainer {
private long messageId;
private String device;

@Override
public String toString() {
return Thread.currentThread().getName() + " MessageContainer{}";
}
}

private static RoutingInfo setSerices(RoutingInfo ri) {
return ri;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class RoutingInfo {
private long messageId;
private String device;
}
}

0 comments on commit ee6cd1d

Please sign in to comment.