This plugin provides an extension to implement built-in support for Kafka systems and manipulation in Nextflow scripts.
It provides the ability to create a Nextflow channel to listening from topics as send message.
The current version provides out-of-the-box support for the following systems:
NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.
Make sure to have Nextflow 22.10.0
or later. Add the following snippet to your nextflow.config
file.
plugins {
id '[email protected]'
}
The plugin configuration is specified using the kafka
scope:
Config option | Description | Mandatory |
---|---|---|
kafka.url |
The connection url. | yes |
kafka.group |
The group where the plugin will be attached. | yes |
kafka.pollTimeout |
The time that consumer will wait to consume from topic. | no |
For example:
kafka {
url = 'localhost:902'
group = 'group'
pollTimeout = 2500 // ms
}
This plugin adds to the Nextflow DSL the following extensions
The fromTopic
factory method allows for performing a query against the topic specified and creating a Nextflow channel emitting
a tuple for each record in the corresponding group with the key and value. For example:
include { fromTopic } from 'plugin/nf-kafka'
ch = channel.fromTopic('my-topic')
The watchTopic
factory method allows to watch against the topic specified and creating a Nextflow channel emitting
a tuple for each record in the corresponding group. For example:
include { watchTopic } from 'plugin/nf-kafka'
ch = channel.watchTopic('my-topic')
.subscribe { println "new message received ${it[0]} = ${it[1]}" }
.until{ it[1] == 'done' }
The writeMessage
operator allows to send a message (with and optional key) to a topic
include { writeMessage; watchTopic } from 'plugin/nf-kafka'
process listener{
input: val(msg)
output: stdout
script:
writeMessage('my-topic', 'Hi folks')
"echo ${msg[1]}"
}
workflow{
listener( Channel.of("msg") )
}
You need to provide the topic and the message plus an optional key
-
build the plugin
./gradlew copyPluginZip
-
create a
docker-compose.yml
and rundocker-compose up -d
to have a local instance running at localhost:29092
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- create a Nextflow project and configure it
plugins{
id '[email protected]'
}
kafka{
url='localhost:29092'
group='group'
}
4.- create a run a listener pipeline
include { watchTopic } from 'plugin/nf-kafka'
process listener1{
input: val(msg)
output: stdout
script:
"echo '${msg[1]}'"
}
workflow{
chn1 = channel.watchTopic("test").until{ it[1]=='done' }
listener1(chn1) | view
}
NXF_PLUGINS_DIR=//build/plugins/ nextflow run ./kafka-listener.nf
5.- create a run (in another shell) a producer
include { publishTopic } from 'plugin/nf-kafka'
workflow{
Channel.of(
"Hi folks",
"Another message",
"done"
).publishTopic("test")
}
NXF_PLUGINS_DIR=//build/plugins/ nextflow run ./kafka-producer.nf
If all goes well, producer will send 3 messages to the topic and listener will print them and exit