Apache Camel endpoints can be integrated into FS2 applications with a DSL.
The DSL is provided by the streamz-camel-fs2
artifact which is available for Scala 2.11 and 2.12:
resolvers += Resolver.bintrayRepo("krasserm", "maven")
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2"
The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in application.conf
:
streamz.camel.consumer.receive.timeout = 10s
The DSL can be imported with:
import streamz.camel.fs2.dsl._
Its usage requires an implicit StreamContext
in scope. A StreamContext
uses a CamelContext
to manage the endpoints that are created and referenced by applications. A StreamContext
with an internally managed CamelContext
can be created with StreamContext()
:
import streamz.camel.StreamContext
// contains an internally managed CamelContext
implicit val streamContext: StreamContext = StreamContext()
Applications that want to re-use an existing, externally managed CamelContext
should create a StreamContext
with StreamContext(camelContext: CamelContext)
:
import org.apache.camel.CamelContext
import streamz.camel.StreamContext
// externally managed CamelContext
val camelContext: CamelContext = ???
// re-uses the externally managed CamelContext
implicit val streamContext: StreamContext = StreamContext(camelContext)
A StreamContext
internally manages an executorService
for running blocking endpoint operations. Applications can configure a custom executor service by providing an executorServiceFactory
during StreamContext
creation. See API docs for details.
After usage, a StreamContext
should be stopped with streamContext.stop()
.
An FS2 stream that emits messages consumed from a Camel endpoint can be created with receive
. Endpoints are referenced by their endpoint URI. For example,
import cats.effect.IO
import fs2.Stream
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.fs2.dsl._
val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1")
creates an FS2 stream that consumes messages from the SEDA endpoint seda:q1
and converts them to StreamMessage[String]
s. A StreamMessage[A]
contains a message body
of type A
and message headers
. Calling receive
with a String
type parameter creates an FS2 stream that converts consumed message bodies to type String
before emitting them as StreamMessage[String]
. Type conversion internally uses a Camel type converter. An FS2 stream that only emits the converted message bodies can be created with receiveBody
:
val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1")
This is equivalent to receive[IO, String]("seda:q1").map(_.body)
.
receive
and receiveBody
can only be used with endpoints that create in-only message exchanges.
...
For sending a StreamMessage
to a Camel endpoint, the send
combinator should be used:
val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")
This initiates an in-only message exchange with an endpoint and continues the stream with the sent StreamMessage
.
val s2b: Stream[IO, String] = s1b.send("seda:q2")
If A
is not a StreamMessage
, send
automatically wraps the message into a StreamMessage[A]
before sending it to the endpoint and continues the stream with the unwrapped A
.
For sending a request StreamMessage
to an endpoint and obtaining a reply, the sendRequest
combinator should be used:
val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
This initiates an in-out message exchange with the endpoint and continues the stream with the output StreamMessage
. Here, a Bean endpoint is used to call the weight(String): Int
method on an object that is registered in the CamelContext
under the name service
. The input message body is used as weight
call argument, the output message body is assigned the return value. The sendRequest
type parameter (Int
) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (Double
, for example).
val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")
If A
is not a StreamMessage
, sendRequest
automatically wraps the message into a StreamMessage[A]
before sending it to the endpoint and continues the stream with the unwrapped message body B
of the output StreamMessage[B]
.