Already 6000+ students are trained in ORIENIT under Mr.Kalyan, Cloudera CCA175 Certified Consultant, Apache Contributor, 12+ years of IT exp, IIT Kharagpur, Gold Medalist
A small file is one which is significantly smaller than the HDFS block size (default 64MB).
If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop),
and the problem is that HDFS can’t handle lots of files.
In my benchmark, just using a custom CombineFileInputFormat can speedup the program from 3 hours to 23 minutes, and after some further tuning, the same task can be run in 6 minutes!
Benchmark Setup
To test the raw performance of different approaches to solve small
problems, I setup a map only hadoop job that basically just do grep and
perform a small binary search. The binary search part is to generate the
reduce side keys that I’ll use in further data processing; it took only
a little resource (8MB index) to run, so it does not affect the result
of the benchmark.
The data to process is some server log data, 53.1 GB in total. The
hadoop clusters consist 6 nodes, using hadoop version 1.1.2. In this
benchmark I implemented CombineFileInputFormat to shrink
the map jobs; I also tested the difference of reusing JVM or not, and
different number of block sizes to combine files.
CombineFileInputFormat
The code listed here is modified from Hadoop example code. To use CombineFileInputFormat you need to implement three classes. The class CombineFileInputFormat is an abstract class with no implementation, so you must create a subclass to support it; we’ll name the subclass CFInputFormat. The subclass will initiate a delegate CFRecordReader that extends RecordReader; this is the code that does the file processing logic. We’ll also need a class for FileLineWritable, which replaces LongWritable normally used as a key to file lines.
CFInputFormat.java
The CFInputFormat.java doesn’t do much. You implement createRecordReader to pass in the record reader that does the combine file logic, that’s all. Note that you can call setMaxSplitSize in the initializer to control the size of each chunk of files; if you don’t want to split files into half, remember to return false in isSplitable method, which defaults to true.
package com.orienit.kalyan.hadoop.training.combinefiles;importjava.io.IOException;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.InputSplit;importorg.apache.hadoop.mapreduce.JobContext;importorg.apache.hadoop.mapreduce.RecordReader;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importorg.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;importorg.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import com.orienit.kalyan.hadoop.training.combinefiles.CFRecordReader;importcom.orienit.kalyan.hadoop.training.combinefiles.FileLineWritable;publicclassCFInputFormatextendsCombineFileInputFormat<FileLineWritable,Text>{publicCFInputFormat(){super();setMaxSplitSize(67108864);// 64 MB, default block size on hadoop}publicRecordReader<FileLineWritable,Text>createRecordReader(InputSplitsplit,TaskAttemptContextcontext)throwsIOException{returnnewCombineFileRecordReader<FileLineWritable,Text>((CombineFileSplit)split,context,CFRecordReader.class);}@OverrideprotectedbooleanisSplitable(JobContextcontext,Pathfile){returnfalse;}}
CFRecordReader.java
CFRecordReader is a delegate class of CombineFileRecordReader, a built in class that pass each split (typically a whole file in this case) to our class CFRecordReader. When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process, and decides how many splits base on the MaxSplitSize we defined in CFInputFormat. For every split (must be a file, because we set isSplitabe to false), CombineFileRecordReader creates a CFRecrodReader instance via a custom constructor, and pass in CombineFileSplit, context, and index for CFRecordReader to locate the file to process with.
When processing the file, the CFRecordReader creates a FileLineWritable as the key for hadoop mapper class. With each line a FileLineWritable consists the file name and the offset length of that line. The difference between FileLineWritable and the normally used LongWritable in mapper is LongWritable only denote the offset of a line in a file, while FileLineWritable adds the file information into the key.
Finally is the job setup for hadoop cluster to run. We just need to assign the classes to job:
123456789
importorg.apache.hadoop.mapreduce.Job;// standard hadoop confJobjob=newJob(getConf());FileInputFormat.addInputPath(job,newPath(args[0]));job.setInputFormatClass(CFInputFormat.class);job.setMapperClass(MyMapper.class);job.setNumReduceTasks(0);// map onlyFileOutputFormat.setOutputPath(job,newPath(args[1]));job.submit();
I ran several benchmarks and tuned the performance from 3 hours 34 minutes to 6 minutes 8 seconds!
Original job without any tuning
job_201406051010_0001
NumTasks: 9790
Reuse JVM: false
mean complete time: 05-Jul-2014 10:08:47 (17sec)
Finished in: 3hrs, 34mins, 26sec
We had 9790 files to process, and the total size of the files is 53
GB. Note that for every task it still took 17 seconds to process the
file.
Using CombineFileInputFormat without setting the MaxSplitSize
job_201406051010_0002
NumTasks: 1
Reuse JVM: false
In this benchmark I didn’t set the MaxSplitSize in CFInputFormat.java, and thus Hadoop merge all the files into one super big task.
After running this task for 15 minutes, hadoop killed it. Maybe its a timeout issue, I didn’t dig into this.
The start and the end of the task logs look like this:
14/06/05 16:17:29 INFO mapred.JobClient: map 0% reduce 0%
14/06/05 16:32:45 INFO mapred.JobClient: map 40% reduce 0%
14/06/05 16:33:02 INFO mapred.JobClient: Task Id : attempt_201406051010_0002_m_000000_0, Status : FAILED
java.lang.Throwable: Child Error
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Task process exit with nonzero status of 255.
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)
Using CombineFileInputFormat with block size 64 MB
job_201406051010_0003
Reuse JVM = false
max split size = 64MB
NumTasks: 760
mean complete time: 05-Jul-2014 16:55:02 (24sec)
Finished in: 23mins, 6sec
After modifying MaxSplitSize the total runtime has
reduced to 23 minutes! The total tasks drops from 9790 to 760, about 12
times smaller. The time difference is 9.3 times faster, pretty nice!
However, the mean complete time doesn’t scale like other factors. The
reason was it’s a big overhead to start JVM over and over again.
Using CombineFileInputFormat with block size 64MB and reuse JVM
To reuse the JVM, just set mapred.job.reuse.jvm.tasks to -1.
The result is awesome! 6 minutes and 8 seconds, wow!
job_201406051010_0004
Reuse JVM = true
max split size = 64MB
NumTasks: 760
mean complete time: 05-Jul-2014 17:30:23 (5sec)
Finished in: 6mins, 8sec
Use FileInputFormat and reuse JVM
Just curious the performance difference if we only change the JVM parameter:
job_201406051010_0005
NumTasks: 9790
mean complete time: 05-Jul-2014 17:04:18 (3sec)
Reuse JVM = true
Finished in: 24mins, 49sec
Tuning performance over block size
Let’s jump to the conclusion first: changing the block size doesn’t
affect the performance that much, and I found 64 MB is the best size to
use. Here are the benchmarks:
512 MB
job_201406051010_0006
Reuse JVM = true
max split size = 512MB
NumTasks: 99
mean complete time: 05-Jul-2014 11:55:26 (24sec)
Finished in: 7min 13sec
128 MB
job_201406051010_0007
Reuse JVM = true
max split size = 128 MB
NumTasks: 341
mean complete time: 05-Jul-2014 13:13:20 (9sec)
Finished in: 6mins, 41sec
Conclusion
So far the best practice I learned from these benchmarks are:
Setup the mapred.job.reuse.jvm.num.tasks flag in configuration. This is the easiest tuning to do, and it makes nearly 10 times performance improvement.
Write your own CombineFileInputFormat implementation.
The block size can be 64 MB or 128 MB, but doesn’t make big difference between the two.
Still, try to model your problems into sequence file or map file in
hadoop. HDFS should handle localities with these files automatically.
What about CFInputFormat? Does it handle locality in HDFS
system too?
I can’t confirm it but I guess sorting the keys based on line offset
first then file name also guarantees the locality of assigning data to
mapper. When I have time to dig more from HDFS API, I’ll look back to
this benchmark and see what can I further tune the program.
No comments:
Post a Comment