-
Notifications
You must be signed in to change notification settings - Fork 13
Getting started
Camelot has the two main parts:
- Core
- Plugins
The core can be used as a distinguished server or can be embedded within your project.
The plugins are the separate modules that implement your own logic. They can also be embedded within your application or can be loaded dynamically by the core from any maven repository.
Make sure you have the following installed in your system:
- JDK >= 1.7
- Maven >= 3.2
mvn archetype:generate -DarchetypeArtifactId=camelot-plugin \
-DarchetypeGroupId=ru.yandex.qatools.camelot \
-DarchetypeVersion=2.1.3 \
-DartifactId=test-plugin \
-DgroupId=ru.yandex.qatools\
-Dpackage=ru.yandex.qatools.camelot
cd test-plugin && mvn clean compile camelot-test:run
You should be able to open http://localhost:8080/camelot in your browser.
curl -X PUT http://localhost:8080/camelot/input/events?event=Hello
Open http://localhost:8080/camelot/input in your browser. You should see the following:
[{"lastMessage":"Hello"}]
This file contains all the necessary declarations of aggregators and processors to provide some routes between them. It should be located within the resources directory, so it will be available in runtime from the classpath.
Here is an example of camelot.xml.
<?xml version="1.0" encoding="UTF-8"?>
<plugins-config xmlns="urn:config.camelot.qatools.yandex.ru">
<source>
<plugin id="my-processor">
<processor>com.mycompany.MyProcessor</processor>
<resource>com.mycompany.MyResource</resource>
</plugin>
<plugin id="my-aggregator" source="my-processor">
<aggregator>com.mycompany.MyAggregator</aggregator>
</plugin>
</source>
</plugins-config>
You can see that this config defines the set of plugins and the relations between them. In the given config the aggregator my-aggregator
consumes the messages from the processor my-processor
. This means that we have the simple route here my-processor
->my-aggregator
, and all the messages from the output of the processor will go to the input of the aggregator.
There is also an item called resource. This is a JAX-RS annotated class, that will be added to the REST API of the Camelot instance during the plugins loading.
In the terms of Camelot the plugin
means a single aggregator or processor. It can be combined with the resource class to interact with the rest api.
This class can be the entry point of your application or the API to retrieve the data. It is bound to the plugin context, where it is declared. You can use the most of the injections available for the plugin within this class. Typically you will use the @Repository
and @Input
or @MainInput
fields.
If you look at the resource class within the project generated using the archetype, you will find the following input method:
@MainInput
EventProducer input;
// ...
@PUT
@Path("/events")
public Response sendMessage(@QueryParam("event") String event) {
input.produce(event);
return Response.ok("ok").build();
}
This method will create the API point bound to the /events
path, which will receive messages in the query parameter event
and then pass them to the main input queue using input.produce
. The main input queue is a queue, which is processed by all the plugins (processors and aggregators) declared within camelot.xml
file that don't contain the source
attribute.
This class represents a set of specific tasks that should be performed depending on the input message type. Processors perform tasks concurrently. Output of a processor is downstreamed to the other plugins that have been configured to consume its results.
Here is a quick example of a processor class:
@Filter(instanceOf = String.class)
public static class StringToIntProcessor {
@Processor
public Integer convert(String message) {
return Integer.parseInt(message);
}
}
This processor consumes only String
events (since it has the @Filter
annotation). It converts the input messages to the Integer
type and passes to the downstream plugins.
This class represents the state machine, which reacts to the incoming messages and accumulates the stored state. Every message processed by the aggregator can affect the state if necessary.
Here is a quick example of an aggregator class:
@Filter(instanceOf = Integer.class)
@Aggregate
@FSM(start = CounterState.class)
@Transitions({
@Transit(on = Integer.class)
})
public static class AccumulateIntAggregator {
@OnTransit
public void add(CounterState state, Integer message) {
state.setCounter(state.getCounter() + message);
}
}
The above aggregator filters only the Integer
messages and accumulates the CounterState
state by adding each message value to the value stored within the state.