For a primer on Lambda Architecture see [http://lambda-architecture.net/]. At Interactive Intelligence we're applying Lamda architecture to Elasticsearch. In our case that means we're streaming data into Elasticsearch in real time generated by Storm, and then re-building the whole thing every night in Hadoop. This allows some architecture nicities such as
- Changes to analyzers & tokenizers are rolled out for all historical data every night, automatically
- New features & bug fixes that affect the data being indexed are rolled out every night, automatically. No data repair/backpopulation scripts are ever required.
- Tune the # shards and shard routing strategies on data already written. Changes roll out every night, automatically.
- With the button that rebuilds the cluster getting hit nightly, it is a well oiled button.
- If data gets corrupt, no heroics are required. Hit the rebuild button and grab a beer.
- Backups? Why bother when you can just hit the rebuild button?
Obviously there's a decent bit of work up front to get all this working. Being a fairly generic problem, we decided to open source our infastructure.
A new way to bulk load elasticsearch from hadoop
- Build indexes offline, without touching your production cluster
- Run Elasticsearch unmodified, entirely within YARN
- Build snapshots of indexes without requiring enough disk space on task trackers to hold an entire index
- Load those indexes into your cluster using the snapshot restore functionality built into Elasticsearch
- .. and more to come. We're in the process of pulling as much into this repo as we can.
The meat is in BaseEsReducer where individual reducer tasks recieve all the data for a single shard of a single index. It creates an embeded Elasticsearch instance, bulk loads it locally in-jvm, and then creates a snapshot. Discovery is disabled and the elasticsearch instances do not form a cluster with each other. Once bulk loading a shard is complete it is flushed, optimized, snapshotted, and then transfered to a snapshot repository (S3, HDFS, or Local FS). After the job is complete, any shards that have no data get placeholder shards generated to make the index complete.
By making reducers only responsible for a single shard worth of data at a time, the total disk space required on task trackers is roughly
(shard data) + (shard snapshot) * (num reducers per task tracker)
After indexes have been generated they can be loaded in using the snapshot restore functionality built into Elasticsearch. The index promotion process maintains state in Zookeeper. This is in the process of being open sourced.
<repository>
<id>oss-sonatype</id>
<name>oss-sonatype</name>
<url>https://oss.sonatype.org/content/groups/public/</url>
</repository>
<dependency>
<artifactId>elasticsearch-lambda</artifactId>
<groupId>com.inin.analytics</groupId>
<version>1.0.25</version>
</dependency>
In order to index 1 shard per reducer at a time, elasticsearch-lambda relies on manual shard routing. If you've got big indexes (probably why you're here), then you'll almost certainly want a custom routing strategy so that searches can hit a subset of shards.
To create your own you would implement the ElasticsearchRoutingStrategy interface and make use of it during the setup method of the ExampleJobPrep job. The default works as such:
ElasticsearchRoutingStrategyV1: Two parameters: numShards & numShardsPerOrg. A nieve apprach would be routing all data for 1 customer to 1 shard. To avoid hotspotting shards with large customers, this lets you spread the load across multiple shards. For example with 10 shards and 3 per customer, customer A might sit on shards 1,3,5 while customer B sits on shards 2,3,8. Setting the inputs to 10 & 10 would spread all customers evenly across all 10 shards.
- generateExampleData 1000 hdfs:///tmp/test/data
- examplePrep hdfs:///tmp/test/data/ hdfs:///tmp/test/json/ _rebuild_20141030012508 5 2
- esIndexRebuildExample hdfs:///tmp/test/json/ /media/ephemeral0/tmp/bulkload/ hdfs:///tmp/snapshotrepo/ my_backup /media/ephemeral0/tmp/esrawdata/ 1 5 100 hdfs:///tmp/manifest/
You can experiment via these run configs ran in series
- com.inin.analytics.elasticsearch.driver.Driver
Lets build some dummy data
- generateExampleData 1000 /tmp/data/part1
Prepare some data for the indexing job
- examplePrep /tmp/data/ /tmp/datajson/ _rebuild_20141030012508 5 2
Build Elasticsearch indexes, snapshot them, and transport them to a snapshot repository on hdfs (s3 paths also allowed)
- esIndexRebuildExample /tmp/datajson/ /tmp/bulkload110/ hdfs:///tmp/snapshotrepo110/ my_backup /tmp/esrawdata1010/ 1 5 /tmp/manifest110/
Elasticsearch does not currently support backing it's data with HDFS, so this project makes use of local disks on the task trackers. Given that Solr Cloud already supports HDFS backed data, it's concievable that one day Elasticsearch might.
When considering NFS you must first consider how different hadoop distributions have implemented it. The apache version of hadoop implements NFS with large local disk buffers, so it may or may not save you any trouble. The Mapr NFS implementation is more native and performant. In our tests, running Elasticsearch on YARN and backing the data directories by NFS mounts backed by MapR-FS ran roughly half as fast. While impressive, it's up to you to balance the cost of using MapR for it's NFS cabilitiy to run Elasticsearch. Note, this requires substituting Elasticsearch's locking mechanism for a non-filesystem based implementation.