The example application consumes file content line by line, either from a TCP endpoint or a from file endpoint, and prints the consumed lines prefixed with a formatted line number to stdout
:
- The TCP endpoint is implemented with the Netty4 component. It listens on
localhost:5051
and is configured to use a text line codec (seetcpEndpointUri
below) so that consumers receive a separate message for each line. - The file endpoint is implemented with the File component. It scans the
input
directory for new files and serves them asString
s to consumers. The consumed file content is split into lines in a separate Line Split step. - Lines consumed from both endpoints are merged into a single stream in a Merge step.
- To generate line numbers for the consumed lines, a Line Number Source is configured to generate numbers 1, 2, ..., n. These numbers are then formatted to a line prefix using the
[$lineNumber]
template. The line number formatter is an object registered in theCamelContext
under the nameexampleService
and accessed with a Bean endpoint configured to call thelinePrefix
method. - The line prefixes are then concatenated with the actual lines in a ZipWith step.
- Finally, the concatenation results are sent to
stream:out
, a Stream endpoint that writes messages tostdout
.
The following subsections show implementations of the example application with
The source code is available in the streamz-examples module. Section Example application usage shows how to run and use the example application.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
import streamz.camel.StreamContext;
import streamz.camel.akka.javadsl.JavaDsl;
import static java.util.Arrays.asList;
public class JExampleService {
public String linePrefix(int lineNumber) {
return String.format("[%d] ", lineNumber);
}
}
public class JExampleContext implements JavaDsl {
public static String tcpEndpointUri =
"netty4:tcp://localhost:5150?sync=false&textline=true&encoding=utf-8";
public static String fileEndpointUri =
"file:input?charset=utf-8";
public static String serviceEndpointUri =
"bean:exampleService?method=linePrefix";
public static String printerEndpointUri =
"stream:out";
private StreamContext streamContext;
public JExampleContext() throws Exception {
SimpleRegistry camelRegistry = new SimpleRegistry();
DefaultCamelContext camelContext = new DefaultCamelContext();
camelRegistry.put("exampleService", new JExampleService());
camelContext.setRegistry(camelRegistry);
camelContext.start();
streamContext = StreamContext.create(camelContext);
}
@Override
public StreamContext streamContext() {
return streamContext;
}
}
public class JExample extends JExampleContext {
private ActorMaterializer actorMaterializer;
public JExample() throws Exception {
super();
ActorSystem actorSystem = ActorSystem.create("example");
actorMaterializer = ActorMaterializer.create(actorSystem);
}
public Runnable setup() {
Source<String, NotUsed> tcpLineSource =
receiveBody(tcpEndpointUri, String.class);
Source<String, NotUsed> fileLineSource =
receiveBody(fileEndpointUri, String.class).mapConcat(s -> asList(s.split("\\r\\n|\\n|\\r")));
Source<String, NotUsed> linePrefixSource =
Source.range(1, Integer.MAX_VALUE).via(sendRequestBody(serviceEndpointUri, String.class));
Source<String, NotUsed> stream =
tcpLineSource
.merge(fileLineSource)
.zipWith(linePrefixSource, (l, n) -> n.concat(l))
.via(sendBody(printerEndpointUri));
return () -> { stream.runWith(Sink.ignore(), actorMaterializer); };
}
public static void main(String... args) throws Exception {
new JExample().setup().run();
}
}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import streamz.camel.StreamContext
import streamz.camel.akka.scaladsl._
import org.apache.camel.impl.{ DefaultCamelContext, SimpleRegistry }
import scala.collection.immutable.Iterable
class ExampleService {
def linePrefix(lineNumber: Int): String = s"[$lineNumber] "
}
trait ExampleContext {
private val camelRegistry = new SimpleRegistry
private val camelContext = new DefaultCamelContext
camelContext.start()
camelContext.setRegistry(camelRegistry)
camelRegistry.put("exampleService", new ExampleService)
implicit val context: StreamContext =
StreamContext(camelContext)
val tcpEndpointUri: String =
"netty4:tcp://localhost:5150?sync=false&textline=true&encoding=utf-8"
val fileEndpointUri: String =
"file:input?charset=utf-8"
val serviceEndpointUri: String =
"bean:exampleService?method=linePrefix"
val printerEndpointUri: String =
"stream:out"
}
object Example extends ExampleContext with App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
val tcpLineSource: Source[String, NotUsed] =
receiveBody[String](tcpEndpointUri)
val fileLineSource: Source[String, NotUsed] =
receiveBody[String](fileEndpointUri).mapConcat(_.lines.to[Iterable])
val linePrefixSource: Source[String, NotUsed] =
Source.fromIterator(() => Iterator.from(1)).sendRequest[String](serviceEndpointUri)
val stream: Source[String, NotUsed] =
tcpLineSource
.merge(fileLineSource)
.zipWith(linePrefixSource)((l, n) => n concat l)
.send(printerEndpointUri)
stream.runWith(Sink.ignore)
}
Here, we re-use ExampleService
and ExampleContext
from the previous section.
import cats.effect.IO
import fs2.{ Stream, text }
import streamz.camel.fs2.dsl._
import streamz.examples.camel.ExampleContext
object Example extends ExampleContext with App {
import scala.concurrent.ExecutionContext.Implicits.global // needed for merge
val tcpLineStream: Stream[IO, String] =
receiveBody[IO, String](tcpEndpointUri)
val fileLineStream: Stream[IO, String] =
receiveBody[IO, String](fileEndpointUri).through(text.lines)
val linePrefixStream: Stream[IO, String] =
Stream.iterate(1)(_ + 1).sendRequest[String](serviceEndpointUri)
val stream: Stream[IO, String] =
tcpLineStream
.merge(fileLineStream)
.zipWith(linePrefixStream)((l, n) => n concat l)
.send(printerEndpointUri)
stream.compile.drain.unsafeRunSync()
}
For running the example application you first need to checkout the project:
$ git clone https://github.com/krasserm/streamz.git
From the project’s root directory, the example application can be started with one of the following commands (depending on the implementation):
$ sbt 'examples/runMain streamz.examples.camel.akka.JExample'
$ sbt 'examples/runMain streamz.examples.camel.akka.Example'
$ sbt 'examples/runMain streamz.examples.camel.fs2.Example'
Before submitting data to the application, create an input file with two lines:
$ cat >> example.txt
hello
streamz
^D
Copy the generated file to the input
directory so that it can be consumed by the file endpoint:
$ cp example.txt input/
You should see the following stream output:
[1] hello
[2] streamz
Then send the file content to the TCP endpoint (with nc
on Mac OS X or netcat
on Linux):
$ cat example.txt | nc localhost 5150
You should see the following stream output:
[3] hello
[4] streamz