Tuesday, 21 October 2014

Elasticsearch in Map Reduce

Elasticsearch is a great piece of software, it’s a distributed and fault tolerant indexing engine based on Lucene. It’s based on a very simple peer to peer technology, you just run an Elasticsearch instance on each node belonging to your cluster and voilà you have your powerful distributed indexing and searching engine available.
However, when it’s about to install a distributed platform, there are some intricacies, you have to copy the software on all the nodes, copy configuration files and using some mechanism for starting in one shot all the instances from a central place.
I’m not saying that it’s particular difficult, it’s plenty of tools like Ansible, Puppet, Chef that could make these kind of activities pretty simple. But let’s suppose you could simply run an executable from your console that magically deploys Elasticsearch and run it on all the nodes.
Yes, it’s doable, if you have an Hadoop cluster available, you can force the job trackers to become a sort of remote agents able to run your distributed application. The trick is to pack everything into an Hadoop map/reduce job which is then run on the cluster.
To force the job manager to run exactly one instance of Elasticsearch per node the trick is to implement a fake input format that generates fake splits one per job tracker.
That fake input format will make running an instance of your mapper one per node, then in your mapper you can just embed an instance of Elasticsearch.
An important point is to keep the job manager thinking that your long running Elasticsearch job is doing something, otherwise after a while it’s not receiving any heartbeat from the mappers it kills the job. I’ll show how to do this in the code below.
As usual my examples are in Scala, so let’s start from an sbt file:
Don’t forget to install the sbt-assembly plugin that will generate just a big jar containing everything we need for running Elasticsearch without the need to copy anything on the cluster’s nodes.
Then the next piece is the fake input format:
As you can see it does very little, it defines a fake reader and it just creates as many FileSplit instances as the number of job trackers running (lines 10-15).
You need also a fake output format which does nothing but it’s neeeded for making the job configuration happy:
Now it’s time to see the mapper:
In the map method you can embed your Elasticsearch instance (or whatever you want to make running on all the slave nodes of your Hadoop cluster), the lines 14-19 schedule a task that every eight minutes calls context.progress() . That method notifies the job manager that the job is still alive doing something.
Finally the driver:
The driver does a couple of things, first of all allows passing different options (lines 17-35), for example it’s possible to pass the java options you want to propagate to the task that the job tracker creates embedding Elasticsearch. The driver can also get as an argument an Elasticsearch configuration file that will ultimately accessed by all the embedded Elasticsearch instances running on your cluster.
The lines 40-80 show how to configure and submit a job that using those fake input/output formats will run Elasticsearch on your cluster.
So, after packaging everything in one jar you could run one or more Elasticsearch clusters on top of your Hadoop infrastructure simply using this command:
hadoop jar elimr-assembly-1.0.jar ElimrDriver -n ElasticsearchName -f elasticsearch.yml
Just changing the name (-n parameter) and using different configuration files you could run multiple times that command allowing to run multiple Elasticsearch clusters on the same cluster infrastructure.
That’s all folks.
Related Posts Plugin for WordPress, Blogger...