Wednesday, 20 November 2013

How to change the default key-value seperator of a mapreduce job

The default MapReduce output format, TextOutputFormat, writes records as lines of text. Its keys and values may be of any type, since TextOutputFormat turns them to strings by calling toString() on them.

Each key-value pair is separated by a tab character. We can change this separator to some character of our choice using the mapred.textoutputformat.separator propertty.

To do this you have to add this line in your job function: 

Configuration conf = new Configuration();

// by default \t
conf.set("mapred.textoutputformat.separator", "\t"); 

// to make ,
conf.set("mapred.textoutputformat.separator", ","); 

// to make :
conf.set("mapred.textoutputformat.separator", ":");  


TextInputFormat’s keys, being simply the offset within the file, are not normally very useful. It is common for each line in a file to be a key-value pair, separated by a delimiter such as a tab character. 

For example, this is the output produced by TextOutputFormat, Hadoop’s default OutputFormat. To interpret such files correctly, KeyValueTextInputFormat is appropriate.

You can specify the separator via the mapreduce.input.keyvaluelinerecordreader.key.value.separator property (In the older MapReduce API this was It is a tab character by default. 

Consider the following input file, where → represents a (horizontal) tab character:

line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.

Configuration conf = new Configuration();

// by default \t
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");  

// to make →
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "→");   

// to make :
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":");   

With TextInputFormat and KeyValueTextInputFormat, each mapper receives a variable number of lines of input. The number depends on the size of the split and the length of the lines. If you want your mappers to receive a fixed number of lines of input, then NLineInputFormat is the InputFormat to use. Like TextInputFormat, the keys are the byte offsets within the file and the values are the lines themselves.

N refers to the number of lines of input that each mapper receives. With N set to one (the default), each mapper receives exactly one line of input. The mapreduce.input.lineinputformat.linespermap property
(In the older MapReduce API this was mapred.line.input.format.linespermap) controls the value of N.

By way of example, consider these four lines again:

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

If, for example, N is two, then each split contains two lines. One mapper will receive the first two key-value pairs:

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
And another mapper will receive the second two key-value pairs:

(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
The keys and values are the same as TextInputFormat produces. What is different is the way the splits are constructed.

Usually, having a map task for a small number of lines of input is inefficient (due to the overhead in task setup), but there are applications that take a small amount of input data and run an extensive (that is, CPU-intensive) computation for it, then emit their output. Simulations are a good example. By creating an input file that specifies input parameters, one per line, you can perform a parameter sweep: run a set of simulations in parallel to find how a model varies as the parameter changes.

Binary Input
Hadoop MapReduce is not just restricted to processing textual data—it has support for binary formats, too.


Hadoop’s sequence file format stores sequences of binary key-value pairs. They are well suited as a format for MapReduce data since they are splittable (they have sync points so that readers can synchronize with record boundaries from an arbitrary point in the file, such as the start of a split), they support compression as a part of the format, and they can store arbitrary types using a variety of serialization frameworks.

To use data from sequence files as the input to MapReduce, you use SequenceFileInputFormat. The keys and values are determined by the sequence file, and you need to make sure that your map input types correspond. 

For example, if your sequence file has IntWritable keys and Text values, then the map signature would be Mapper<IntWritable, Text, K, V>, where K and V are the types of the map’s output keys and values.

Although its name doesn’t give it away, SequenceFileInputFormat can read MapFiles as well as sequence files. If it finds a directory where it was expecting a sequence file, SequenceFileInputFormat assumes that it is reading a MapFile and uses its data file. This is why there is no MapFileInputFormat class.


SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat that converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() on the keys and values. This format makes sequence files suitable input for Streaming.


SequenceFileAsBinaryInputFormat is a variant of SequenceFileInputFormat that retrieves the sequence file’s keys and values as opaque binary objects. They are encapsulated as BytesWritable objects, and the application is free to interpret the underlying byte array as it pleases. Combined with SequenceFile.Reader’s appendRaw() method, this provides a way to use any binary data types with MapReduce (packaged as a sequence file), although plugging into Hadoop’s serialization mechanism is normally a cleaner alternative.

Multiple Inputs

Although the input to a MapReduce job may consist of multiple input files (constructed by a combination of file globs, filters, and plain paths), all of the input is interpreted by a single InputFormat and a single Mapper. What often happens, however, is that over time, the data format evolves, so you have to write your mapper to cope with all of your legacy formats. Or, you have data sources that provide the same type of data but in different formats. This arises in the case of performing joins of different datasets; see Reduce-Side Joins. For instance, one might be tab-separated plain text, the other a binary sequence file. Even if they are in the same format, they may have different representations and, therefore, need to be parsed differently.

These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, if we had weather data from the UK Met Office that we wanted to combine with the NCDC data for our maximum temperature analysis, then we might set up the input as follows:

MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class)
MultipleInputs.addInputPath(conf, metOfficeInputPath, TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);

This code replaces the usual calls to FileInputFormat.addInputPath() and conf.setMapperClass(). Both Met Office and NCDC data is text-based, so we use TextInputFormat for each. But the line format of the two data sources is different, so we use two different mappers. The MaxTemperatureMapper reads NCDC input data and extracts the year and temperature fields. The MetOfficeMaxTemperatureMapper reads Met Office input data and extracts the year and temperature fields. The important thing is that the map outputs have the same types, since the reducers (which are all of the same type) see the aggregated map outputs and are not aware of the different mappers used to produce them.

The MultipleInputs class has an overloaded version of addInputPath() that doesn’t take a mapper:

public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass)

This is useful when you only have one mapper but multiple input formats.

Related Posts Plugin for WordPress, Blogger...