Alyxstream is a library that simplify stream processing in Node.js. We use it in production to make real time logs analysis, errors detection, parallel job processing, using mainly Kafka as source and Cassandra and Redis as sinks. Although it's not perfect and still under active development, this library could help you to solve a lot of processing problems, with a nice dataflow syntax.
Out-of-the-box sources/sinks:
- Kafka
- Redis
- Pulsar
- Cassandra
- Nats Jetstream
- Etcd
Working usage examples are in the usage-examples folder.
- Introduction
- Stream/Batch sources
- Operators
- Custom functions
- Window processing
- Kafka source and sink
- Redis queues
- Storage
- Extend the library
- Kafka Exchange Mode
- Multiprocess/Parallel Mode
- Nats JetStream
- Distributed locks
- Pulsar
Install it:
npm install @dev.smartpricing/alyxstream
import {
Task,
MakeStorage,
StorageKind,
KafkaClient,
KafkaSource
} from '@dev.smartpricing/alyxstream'
const kafkaSource = KafkaSource(KafkaClient())
await Task()
.withLocalKVStorage()
.withStorage(MakeStorage(StorageKind.Cassandra, null, 'hotel.errors.count'))
.fromKafka(kafkaSource)
.setLocalKV('kafka-mex', x => x)
.withEventTime(x => x.eventTime)
.keyBy(x => x.partition)
.map(x => x.value)
.filter(x => x.warningLevel == 'error')
.slidingWindowTime(MakeStorage(StorageKind.Redis, null, 'my.window'), 5 * 60 * 1000, 60 * 1000)
.groupBy(x => x.hotelName)
.sumMap()
.toStorage(x => 'result', x => x)
.getLocalKV('kafka-mex')
.kafkaCommit(kafkaSource)
.close()
Alyxstream supports multiple sources by default, both for streaming and batch processing. It'also very easy to build your custom sources.
Always import Task, and Kafka Client/Source if you need Kafka support.
import {
Task,
KafkaClient,
KafkaSource
} from '@dev.smartpricing/alyxstream'
For every Task, remember to call the close method at the end of the task pipeline. The close method signal the Task that it can start to process the data stream
Array source, the downstream pipeline is called for every element of the array:
await Task().fromArray([1,2,3]).close()
Object source, the downstream pipeline is called once:
await Task().fromObject([1,2,3]).close()
await Task().fromObject({name: 'alice'}).close()
String source:
await Task().fromString('Alice').close()
From readable stream:
await Task().fromReadableStream('/path/to/file.csv').close()
// With integrated unzip
await Task().fromReadableStream('/path/to/file.csv.gz', true).close()
From Kafka [The Kafka client/source/sink it's exaplained well below]
const kafkaSource = await KafkaSource(KafkaClient({
clientId: 'clientId'
}), {
groupId: 'groupId',
topics: ['mytopic']
})
await Task().fromKafka(kafkaSource).close()
You can also define a Task without a source, and then inject the payload
For the inject source, the close function it's not needed
const task = await Task().print('>')
for (var i = 0; i < 10; i += 1) {
await task.inject(i)
}
To get back the last result of a Task, use the finalize method:
const t = await Task().fromString('Alice').close()
const result = await t.finalize()
Alyxstream support keyed stream processing. In case you don't need it, you can set the key to default with the withDefaultKey() operator.
await Task().withDefaultKey()
Instead if you need a keyed processing (like in windows), you have to set it with the keyBy operator, usually after the source operator.
await Task().keyBy(x => x.myKey)
await Task().keyBy(x => 'customKey')
If you want to use event time based processing, you have to specify it, usually after the source operator. If not specified, processing time is used.
await Task().withEventTime(x => new Date(x.originalDate))
You can stop the pipeline execution for a message that is not conforming with your needs:
await Task().filter(x => x % 2 == 0)
And in every pipeline step, you can print the current operator state:
await Task().print('1 >')
The brach operator helps you to build DAG flow processing (the syntax will be improved):
await Task()
.fromArray([1,2,3])
.branch([
async () => { return await Task().print('1 >') },
async () => { return await Task().print('2 >') }
])
.close()
Array operators help you to transform array data:
map take an input array and apply the map function to every element:
await Task()
.fromObject([1,2,3])
.map(x => x * 2)
.print() // [2,4,6]
.close()
each will call the downstream pipeline steps for every array argument (the same of fromArray source operator)
await Task()
.fromObject([1,2,3])
.each()
.print()
.close()
// 1
// 2
// 3
groupBy take an array, and returns an object.
await Task()
.fromObject([ {name: 'Alice'}, {name: 'Andrea'}, {name: 'Paolo'}, {name: 'Alice'} ])
.groupBy(x => x.name)
.print()
.close()
// {
// Alice: [{name: 'Alice'}, {name: 'Alice'}],
// Paolo: [{name: 'Paolo'}],
// Andrea: [{name: 'Andrea'}]
// }
sumMap take an object and count the array elements for every key.
await Task()
.fromObject([ {name: 'Alice'}, {name: 'Andrea'}, {name: 'Paolo'}, {name: 'Alice'} ])
.groupBy(x => x.name)
.sumMap()
.print()
.close()
// {
// Alice: 2,
// Paolo: 1,
// Andrea: 1
// }
const storage = MakeStorage(StorageKind.Memory, null, 'example')
const t = await Task()
.fromArray(['hello', 'hello', 'alice'])
.aggregate(storage, 'ex', x => x)
.sumMap()
.print('> step result:')
You can of course use any JS custom made function to process your data. You have to wrap your code inside a function (or an async function)
With synchronous functions:
await Task()
.fromArray([1,2,3])
.fn(x => {
// x will be 1, then 2, then 3
return x * 2
})
.close()
or asynchronous:
await Task()
.fromArray([1,2,3])
.fn(async x => {
return await asyncFunctionYouHaveToCall(x)
})
.close()
The x callback variable is the data flowing in the pipeline (your payload).
Alyxstream wraps your payload inside an internal datastructure, that is an object:
Message = {
payload: 'YOUR_PAYLOAD',
metadata: {}, // Where keyBy, eventTime metadata are keept in memory,
globalState: {} // Here you can place state that must persist within steps
}
In case you need to access the raw message with your custom functions, uses the fnRaw variant:
await Task()
.fromArray([1,2,3])
.fnRaw(x => {
// x = {payload: 1, metadata: {}, globalState: {}}
x.payload *= 2
return x
})
.close()
Alyxstream has five kind of windows:
- tumblingWindowCount: (Storage, WindowElementsLength, MaxInactivityMilliseconds [optional])
- tumblingWindowTime: (Storage, WindowTimeMsLength, MaxInactivityMilliseconds [optional])
- slidingWindowCount: (Storage, WindowElementsLength, SlideLength, MaxInactivityMilliseconds [optional])
- slidingWindowTime: (Storage, WindowTimeMsLength, SlideMs, MaxInactivityMilliseconds [optional])
- sessionWindowTime: (Storage, MaxInactivityMilliseconds)
Window's state storage can be in memory or inside Redis (or Redis compatible DB like KVRocks) instance. Every window has an inactivity time period, to emit the window result even if no events can trigger it.
In time based windows, there is a watermark concept, so when using event time processing, later records are not allowed (a grace period will be implemented soon).
Windows split stream based on the key you have defined (keyBy operator). They flush the storage every time a window is ready to be emitted.
Base config:
import {
Task,
MakeStorage,
StorageKind
} from '@dev.smartpricing/alyxstream'
const redisConfig = {} // default to localhost:6379
const exampleWindowStorage = MakeStorage(StorageKind.Redis, redisConfig, 'windowStorageId')
tumblingWindowCount, with 100 elements length and 10 seconds max inactivity time:
await Task()
.fromKafka(...)
.tumblingWindowCount(exampleWindowStorage, 100, 10000)
.close()
tumblingWindowTime, 1 minute length and 10 seconds max inactivity time:
await Task()
.fromKafka(...)
.tumblingWindowTime(exampleWindowStorage, 60000, 10000)
.close()
slidingWindowCount, with 100 elements length, 25 elements slide, and 10 seconds max inactivity time:
await Task()
.fromKafka(...)
.slidingWindowCount(exampleWindowStorage, 100, 25, 10000)
.close()
slidingWindowTime, 1 minute length, 5 seconds slide, and 10 seconds max inactivity time:
await Task()
.fromKafka(...)
.slidingWindowTime(exampleWindowStorage, 60000, 5000, 10000)
.close()
sessionWindowTime, max 15 seconds inactivity
await Task()
.fromKafka(...)
.sessionWindowTime(exampleWindowStorage, 150000)
.close()
KafkaClient:
import {
KafkaClient
} from '@dev.smartpricing/alyxstream'
const kafkaClient = KafkaClient({
clientId: 'my-client'
brokers: ['localhost:9092'],
ssl: false,
sasl: ({ // for Confluent access
mechanism: 'plain',
username: 'username',
password: 'password'
})
})
KafkaSource:
import {
KafkaSource
} from '@dev.smartpricing/alyxstream'
const topic = {
topic: 'my-topic',
fromBeginning: false,
autoCommit: false,
autoHeartbeat: 5000
}
const kafkaSource = KafkaSource(kafkaClient, {
groupId: 'my-group-id',
topics: [topic]
})
Redis queue use Redis list with BRPOP command in order to distrubute jobs between workers:
import { Task, MakeStorage, StorageKind } from '@dev.smartpricing/alyxstream'
const queueStorage = MakeStorage(StorageKind.Redis, null, 'my-queue')
async function producer () {
const t = await Task()
.fromReadableStream('data.csv.gz', true)
.readline()
.tumblingWindowCount(MakeStorage(StorageKind.Memory, null, 'my-queue-win'), 10)
.enqueue(queueStorage)
.fn(async (x) => {
while (true) {
const queueSize = await queueStorage.queueSize()
if (queueSize > 100) {
await new Promise(resolve => setTimeout(resolve, 1000))
} else {
break
}
}
})
.close()
}
async function consumer () {
const t = await Task()
.dequeue(queueStorage)
.fn(async x => {
console.log(x)
return x
})
.close()
}
async function run () {
if (process.env.RUN == 'producer') {
await producer()
} else {
await consumer()
}
}
run()
Alyxstream supports out of the box three kind of storage: memory, Redis and Cassandra. Memory and Redis storage are suitable for windows state storage.
import { MakeStorage, StorageKind } from '@dev.smartpricing/alyxstream'
const memStorage = MakeStorage(StorageKind.Memory, null, 'storage-1')
const redisStorage = MakeStorage(StorageKind.Redis, null, 'storage-1')
const cassandraStorage = MakeStorage(StorageKind.Cassandra, null, 'storage-1')
Accessing raw client:
const redisStorage = MakeStorage(StorageKind.Redis, null, 'storage-1')
const redisStorageClient = redisStorage.db() // io-redis
const cassandraStorage = MakeStorage(StorageKind.Cassandra, null, 'storage-1')
const cassandraStorageClient = cassandraStorage.db() // cassandra-driver
import { Task } from '@dev.smartpricing/alyxstream'
await Task()
.withLocalKVStorage()
.setLocalKV('my-var', x => x * 2)
.getLocalKV('my-var')
.mergeLocalKV('my-var')
You can create your custom functions and call the functions from a task:
import { Task, ExtendTask, ExtendTaskRaw } from '@dev.smartpricing/alyxstream'
export const multiplyBy = ExtendTask('multiplyBy', async function (x, multiplier) {
return x * multiplier
})
export const multiplyByRaw = ExtendTaskRaw('multiplyByRaw', async function (x, multiplier) {
return x.payload * multiplier
})
await Task()
.multiplyBy(2)
.multiplyByRaw(2)
.inject(3) // output 12
Alyxstream contains a wrapper around itself in order to simplify a special use case of Kafka communication between microservices. It's called Exchange and allow bidirectional communication between two (or more) services.
import {
Exchange,
KafkaClient
} from '@dev.smartpricing/alyxstream'
const client = KafkaClient({
clientId: 'my-client-1',
brokers: ['localhost:9092'],
})
let mex = {
kind: 'TestObject',
metadata: {
key: '1'
},
spec: {
value: 1
}
}
/** Exchange(KafkaClient, topicName, groupId, sourceoptions as kafka js options) */
const ex1 = await Exchange(client, 'alyxstream-exchange-01', 'ae-01', {autoCommit: false})
const ex2 = await Exchange(client, 'alyxstream-exchange-02', 'ae-02', {autoCommit: false})
await ex1.emit(mex)
await ex1.on(async (messagge) => {
console.log('sub', messagge)
messagge.spec.value += 1
await ex2.emit(messagge)
})
In order to override the default key parser and messagge validator:
const ex1 = await Exchange(client, 'alyxstream-exchange-01', 'ae-01', {autoCommit: false})
ex1.setKeyParser(x => x.metadata.myKey)
ex1.setValidationFunction(x => {
if (x.spec.myValue == undefined) {
return false
}
return true
})
You can process a stream of data using multiple Node.js process:
import { Task } from '@dev.smartpricing/alyxstream'
await Task()
.parallel(3)
.dequeue(STORAGE)
.close()
Producer:
import { Task, NatsClient } from '@dev.smartpricing/alyxstream';
const nc = await NatsClient()
const t = await Task()
.toNats(nc, 'sp-test.a.a')
for (var i = 0; i < 100; i += 1) {
await t.inject({key: i})
}
Consumer:
import { Task, NatsClient, NatsJetstreamSource } from '@dev.smartpricing/alyxstream';
const nc = await NatsClient()
const source = await NatsJetstreamSource(nc, [{
stream: 'sp-test',
durable_name: 'worker-4',
ack_policy: 'Explicit',
filter_subjects: ['sp-test.a.a', 'sp-test.a.b']
}])
await Task()
.fromNats(source)
.print('>')
.close()
Using Smartlocks libs, we can acquire global locks and have atomic counters.
Lock/Release:
import { Mutex, StorageKind as MSKind } from 'smartlocks'
const lockStorage = Mutex(MSKind.Cassandra, null)
await Task()
.parallel(5)
.fromArray([{i: 1}, {i: 2}, {i: 3}])
.fn(x => {
x.i = x.i * 2
return x
})
.lock(lockStorage, x => 'my-lock')
.fn(x => {
console.log(x)
})
.release(lockStorage, x => 'my-lock')
.close()
Pulsar functions are not maintained.
Pulsar producer:
import { Task, PulsarClient, PulsarSink } from '@dev.smartpricing/alyxstream';
(async () => {
const client = PulsarClient({
serviceUrl: 'pulsar://localhost:6650'
})
const pulsarSink = await PulsarSink(client, {
topic: 'non-persistent://public/default/my-topic-1',
batchingEnabled: true,
batchingMaxPublishDelayMs: 10
})
const t = await Task()
.toPulsar(pulsarSink, x => x.val, x => x)
.flushPulsar(pulsarSink)
for (var i = 0; i < 1000; i += 1) {
await t.inject({val: i})
}
})()
Pulsar consumer:
import { Task, PulsarClient, PulsarSource } from '@dev.smartpricing/alyxstream';
(async () => {
const client = PulsarClient({
serviceUrl: 'pulsar://localhost:6650'
})
const pulsarSource = await PulsarSource(client, {
topic: 'non-persistent://public/default/my-topic-1',
subscription: 'sub1',
subscriptionType: "Shared", //'Failover'
parseWith: (x) => x
})
await Task()
.withLocalKVStorage()
.fromPulsar(pulsarSource)
.setLocalKV('local-mex', x => x)
.parsePulsar()
.print('>>>')
.getLocalKV('local-mex')
.ackPulsar(pulsarSource)
.close()
})()