CAVEAT: This adapter library is not mature so take precaution when
using it for production. This library also may not be as lightweight,
eficient, thoughtout. As an example, this library uses
honeydew as a worker pool to
throttle sending and receiving messages instead of the builtin
GenStage
.
The package can be installed by adding conduit_nsq
to your list of dependencies in mix.exs
:
def deps do
[{:conduit_nsq, "~> 0.1.4"}]
end
Once you created your own MyApp.Broker
, remember to add it in your
application:
def start(_type, _args) do
children = [
MyApp.Broker
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
This library uses elixir_nsq to
connect to the message queue. Aside from specifiying the :adapter
,
every option in the following snippet below is passed down to NSQ.Config
:
config :my_app, MyApp.Broker,
adapter: ConduitNSQ,
producer_nsqds: [
"127.0.0.1:12150",
],
nsqds: [
"127.0.0.1:12150",
"127.0.0.1:13150",
"127.0.0.1:14150"
],
nsqlookupds: ["127.0.0.1:12161"],
backoff_multiplier: 2_000
Checkout the list of supported options.
The only different option is :producer_nsqds
which is publisher
specific endpoints than consumer specific endpoints. This option is to
support the idiomatic strategy of publishing to colocated nsqds
while
listening in to external producers. (See Eliminating
SPOFs).
For more adapter specific options:
# Default config
config :conduit_nsq,
publisher_workers: 3,
processor_workers: 10,
publish_timeout: 60_000,
process_timeout: 60_000
Aside from `:adapter`, the current options should be good defaults.
:adapter
- The message queue adapter to use, should beConduitNSQ
.:publisher_workers
- The number of workers for publishing messages. See Honeydew.start_workers/3:processor_workers
- The number of workers for receiving messages. See Honeydew.start_workers/3:publish_timeout
- Timeout in publishing messages. See Honeydew.yield/2:process_timeout
- Timeout in receiving messages. See Honeydew.yield/2
Inside the configure
block of a broker, you can define topics via
queue
that will be created at application startup with the options you specify.
defmodule MyApp.Broker do
configure do
queue "my-topic"
queue "other-topic"
end
end
All topics that the application will publish to must be defined here
since each message is routed to the corresponding
NSQ.Producer.Supervisor
otherwise the message might be unsent.
You can suffix topics and channels with #ephemeral
based on bounded
memory
footprint).
However, when publishing or subscribing to the topic, also add the
same suffix to the topic or channel to match
Inside an incoming
block for a broker, you can define subscriptions to
topics. Conduit will route messages on those topics to your subscribers.
defmodule MyApp.Broker do
incoming MyApp do
subscribe :my_subscriber, MySubscriber,
topic: "my-queue",
channel: "my-channel"
subscribe :my_other_subscriber, MyOtherSubscriber,
topic: "my-other-queue",
from: "my-other-channel"
end
end
Make sure the :topic
s here are found in the configure
block to
receive the messages.
The only required options are :topic
and :channel
which follows
NSQ.Consumer.Supervisor
.
:topic
- Topic to connect with:channel
- Topic channel to listen into
Inside an outgoing
block for a broker, you can define publications to topics.
defmodule MyApp.Broker do
outgoing do
publish :something, topic: "my-topic"
publish :something_else, topic: "my-other-topic",
end
end
The only required option is :topic
which follows
NSQ.Producer.Supervisor
.
:topic
- Topic to publish to