-
Notifications
You must be signed in to change notification settings - Fork 13
Aggregators
The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.
In the terms of Camelot aggregator is a state collector represented as a state machine. It collects the incoming events and produces the computed state if it is necessary. Yatomata framework is used as an implementation of FSM.
Aggregators are stateful and hold their states consistently as a distributed map of correlation identifiers and values. Aggregation can happen on any node of the cluster and thus must be performed concurrently. Camelot uses the distributed locks for that purpose.
Here is a quick example of an aggregator.
@SuppressWarnings("unused")
@Filter(instanceOf = {Event.class})
@FSM(start = State.class)
@Transitions(
@Transit(on = Event.class)
)
public class Sender extends AbstractPlugin {
@ClientSender
private ClientMessageSender client;
@AggregationKey
public String calcKey(Event event) {
return event.getName();
}
@OnTransit
public void message(Event event) throws InterruptedException {
out.produce(event);
}
@OnTimer(cron = "*/10 * * * * ?", perState = false)
public void sendSomethingToClient() {
client.send(new Event(2, 123));
}
}
Aggregation key method returns the string value representing the correlation identifier.
For each incoming message Camelot finds best matched transit method. See the details of the Yatomata abilities here.
If you need to execute some logic regardless events coming you can add @OnTimer
method.
- cron - Indicates the cron schedule for the timer in the Quartz format an example "*/10 * * * * ?".
- cronMethod - Indicates the method within aggregator to be called when it is needed to generate the cron string.
- skipIfNotCompleted - Indicates that we should skip execution of this task if the previous execution is still executing, default false.
- readOnly - Indicates whatever or not we should use the state exclusively (otherwise the state will be readonly), default true.
- perState - Indicates if this timer must be used for each state or it's a global timer used for the aggregator itself, default true.