Tuesday, 21 October 2014

Hadoop in Practice

Hadoop in Practice
By Alex Holmes

Working with simple data formats such as log files is straightforward and supported in MapReduce.  In this article based on Chapter 3 of Hadoop in Practice, author Alex Holmes shows you how to work with ubiquitous data serialization formats such as XML and JSON.  

Processing Common Serialization Formats

XML and JSON are industry-standard data interchange formats. Their ubiquity in our industry is evidenced in their heavy adoption in data storage and exchange. XML has existed since 1998 as a mechanism to represent data that is readable by machines and humans alike. It became a universal language to data exchange between systems and is employed by many standards today such as SOAP and RSS and used as an open data format for products such as Microsoft Office.

Technique 1: MapReduce and XML

Our goal is to be able to use XML as a data source for a MapReduce job. We’re going to assume that the XML documents that need to be processed are large and, as a result, we want to be able to process them in parallel with multiple mappers working on the same input file.


Working on a single XML file in parallel in MapReduce is tricky because XML does not contain a synchronization marker in its data format. Therefore, how do we work with a file format that’s not inherently splittable like XML?


MapReduce doesn’t contain built-in support for XML, so we have to turn to another Apache project, Mahout, a machine learning system, which provides an XML InputFormat. To showcase the XML InputFormat, let’s write a MapReduce job that uses the Mahout’s XML Input Format to read property names and values from Hadoop’s
configuration files. Our first step is to set up our job configuration.
1.conf.set("xmlinput.start", "<property>");            #1
2.conf.set("xmlinput.end", "</property>");             #2
3.job.setInputFormatClass(XmlInputFormat.class);       #3
#1 Defines the string form of the XML start tag. Our job is to take Hadoop config files as input, where each configuration entry uses the "property" tag.
#2 Defines the string form of the XML end tag.
#3 Sets the Mahout XML input format class. 
It quickly becomes apparent by looking at the code that Mahout’s XML InputFormat is rudimentary; you need to tell
it an exact sequence of start and end XML tags that will be searched in the file. Looking at the source of the
InputFormat confirms this:
01.private boolean next(LongWritable key, Text value)
02.throws IOException {
03.if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
04.try {
06.if (readUntilMatch(endTag, true)) {
08.value.set(buffer.getData(), 0, buffer.getLength());
09.return true;
11.finally {
15.return false;

Next, we need to write a Mapper to consume Mahout’s XML input format. We’re being supplied the XML element in
Text form, so we’ll need to use an XML parser to extract content from the XML.
01.public static class Map extends Mapper<LongWritable, Text,
02.Text, Text> {
04.protected void map(LongWritable key, Text value,
05.Mapper.Context context)
07.IOException, InterruptedException {
08.String document = value.toString();
09.System.out.println("‘" + document + "‘");
10.try {
11.XMLStreamReader reader =
14.String propertyName = "";
15.String propertyValue = "";
16.String currentElement = "";
17.while (reader.hasNext()) {
18.int code = reader.next();
19.switch (code) {
21.currentElement = reader.getLocalName();
24.if (currentElement.equalsIgnoreCase("name")) {
25.propertyName += reader.getText();
26.else if (currentElement.equalsIgnoreCase("value")) {
27.propertyValue += reader.getText();
33.context.write(propertyName.trim(), propertyValue.trim());
34.catch (Exception e) {
35.log.error("Error processing ‘" + document + "‘", e);

Our Map is given a Text instance, which contains a String representation of the data between the start and end tags. In our code we’re using Java’s built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them. If we run our MapReduce job against Cloudera’s core-site.xml and cat the output, we’ll see the output that you see below.
01.$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml
03.$ bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \
04.core-site.xml output
06.$ hadoop fs -cat output/part*
07.fs.default.name hdfs://localhost:8020
08.hadoop.tmp.dir /var/lib/hadoop-0.20/cache/${user.name}
09.hadoop.proxyuser.oozie.hosts *
10.hadoop.proxyuser.oozie.groups *

This output shows that we have successfully worked with XML as an input serialization format with MapReduce! Not only that, we can support huge XML files since the InputFormat supports splitting XML.


Having successfully read XML, the next question would be how do we write XML? In our Reducer, we have callbacks
that occur before and after our main reduce method is called, which we can use to emit a start and end tag.
01.public static class Reduce
02.extends Reducer<Text, Text, Text, Text> {
05.protected void setup(
06.Context context)
07.throws IOException, InterruptedException {
08.context.write(new Text("<configuration>"), null);            #1
12.protected void cleanup(
13.Context context)
14.throws IOException, InterruptedException {
15.context.write(new Text("</configuration>"), null);           #2
18.private Text outputKey = new Text();
19.public void reduce(Text key, Iterable<Text> values,
20.Context context)
21.throws IOException, InterruptedException {
22.for (Text value : values) {
23.outputKey.set(constructPropertyXml(key, value));           #3
24.context.write(outputKey, null);                            #4
28.public static String constructPropertyXml(Text name, Text value) {
29.StringBuilder sb = new StringBuilder();
33.return sb.toString();
#1 Uses the setup method to write the root element start tag.
#2 Uses the cleanup method to write the root element end tag.
#3 Constructs a child XML element for each key/value combination we get in the Reducer. #4 Emits the XML element.
This could also be embedded in an OutputFormat.


If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code) contains an XMLLoader. It works in a similar way to our technique and captures all of the content between a start and end tag and supplies it as a single bytearray field in a Pig tuple.


Currently, there doesn’t seem to be a way to work with XML in Hive. You would have to write a custom SerDe[1].


Mahout’s XML InputFormat certainly helps you work with XML. However, it’s very sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or the generation of the element can’t be controlled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element.
If you have control over how the XML is laid out in the input, this exercise can be simplified by having a single XML element per line. This will let you use the built-in MapReduce text-based InputFormats (such as TextInputFormat), which treat each line as a record and split accordingly to preserve that demarcation.
Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML elemen, or convert it into an altogether different data format such as a SequenceFile or Avro, both of which solve the splitting problem for you.
There’s a streaming class StreamXmlRecordReader to allow you to work with XML in your streaming code.
We have a handle on how to work with XML, so let’s move on to tackle another popular serialization format, JSON. JSON shares the machine and human-readable traits of XML and has existed since the early 2000s. It is less verbose than XML and doesn’t have the rich typing and validation features available in XML.

Technique 2: MapReduce and JSON

Our technique covers how you can work with JSON in MapReduce. We’ll also cover a method by which a JSON file can be partitioned for concurrent reads.


Figure 1 shows us the problem with using JSON in MapReduce. If you are working with large JSON files, you need to be able to split them. But, given a random offset in a file, how do we determine the start of the next JSON element, especially when working with JSON that has multiple hierarchies such as in the example below?

Figure 1 Example of issue with JSON and multiple input splits


JSON is harder to partition into distinct segments than a format such as XML because JSON doesn’t have a token (like an end tag in XML) to denote the start or end of a record.
ElephantBird[2], an open-source project that contains some useful utilities for working with LZO compression, has a LzoJsonInputFormat, which can read JSON, but it requires that the input file be LZOP compressed. We’ll use this code as a template for our own JSON InputFormat, which doesn’t have the LZOP compression requirement.

We’re cheating with our solution because we’re assuming that each JSON record is on a separate line. Our JsonRecordFormat is simple and does nothing other than construct and return a JsonRecordReader, so we’ll skip over that code. The JsonRecordReader emits LongWritable, MapWritable key/value pairs to the Mapper, where the Map is a map of JSON element names and their values. Let’s take a look at how this RecordReader works. It leverages the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, it uses the following method.
01.public static boolean decodeLineToJson(JSONParser parser, Text line,
02.MapWritable value) {
03.try {
04.JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
05.for (Object key: jsonObj.keySet()) {
06.Text mapKey = new Text(key.toString());
07.Text mapValue = new Text();
08.if (jsonObj.get(key) != null) {
12.value.put(mapKey, mapValue);
14.return true;
15.catch (ParseException e) {
16.LOG.warn("Could not json-decode string: " + line, e);
17.return false;
18.catch (NumberFormatException e) {
19.LOG.warn("Could not parse field into number: " + line, e);
20.return false;

It uses the json-simple[3] parser to parse the line into a JSON object and then iterates over the keys and puts the keys and values into a MapWritable. The Mapper is given the JSON data in LongWritable, MapWriable pairs and can process the data accordingly. The code for the MapReduce job is very basic. We’re going to demonstrate the code using the JSON below.
02."results" :
05."created_at" "Thu, 29 Dec 2011 21:46:01 +0000",
06."from_user" "grep_alex",
07."text" "RT @kevinweil: After a lot of hard work by ..."
10."created_at" "Mon, 26 Dec 2011 21:18:37 +0000",
11."from_user" "grep_alex",
12."text" "@miguno pull request has been merged, thanks again!"

Since our technique assumes a JSON object per line, the actual JSON file we’ll work with is shown below.
1.{"created_at" "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...
2.{"created_at" "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...

We’ll copy the JSON file into HDFS and run our MapReduce code. Our MapReduce code simply writes each JSON
key/value to the output.
01.$ hadoop fs -put test-data/ch3/singleline-tweets.json \
04.$ bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \
05.singleline-tweets.json output
07.$ fs -cat output/part*
08.text RT @kevinweil: After a lot of hard work by ...
09.from_user grep_alex
10.created_at Thu, 29 Dec 2011 21:46:01 +0000
11.text @miguno pull request has been merged, thanks again!
12.from_user grep_alex
13.created_at Mon, 26 Dec 2011 21:18:37 +0000


An approach similar to what we looked at for writing XML could also be used to write JSON.


ElephantBird contains a JsonLoader and LzoJsonLoader, which can be used to work with JSON in Pig. It also works for JSON that is line based. Each Pig tuple contains a field for each JSON element in the line as a chararray.


Hive contains a DelimitedJSONSerDe, which can serialize JSON but unfortunately not deserialize it, so you can’t load data into Hive using this SerDe.


Our solution works with the assumption that the JSON input is structured with a line per JSON object. How would we work with JSON objects that are across multiple lines? The authors have an experimental project on GitHub[4], which works with multiple input splits over a single JSON file. The key to this approach is searching for a specific JSON member and retrieving the containing object. There’s a Google Code project called hive-json-serde[5], which can support both serialization and deserialization.


As you can see, using XML and JSON in MapReduce is kludgy and has rigid requirements about how your data is laid out. Supporting them in MapReduce is complex and error prone, as they don’t naturally lend themselves to splitting. Alternative file formats, such as Avro and SequenceFiles, have built-in support for splittability.

If you would like to purchase Hadoop in Practice, DZone members can receive a 38% discount by entering the Promotional Code: dzone38 during checkout at Manning.com.

[1] SerDe is a shortened form of Serializer/Deserializer, the mechanism that allows Hive to read and write data in HDFS.
[2] https://github.com/kevinweil/elephant-bird
[3] http://code.google.com/p/json-simple/
[4] A multiline JSON InputFormat. https://github.com/alexholmes/json-mapreduce.
[5] http://code.google.com/p/hive-json-serde/

Source: http://www.manning.com/holmes/

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...