Hadoop Programming

*********************************************************************************************

Very simple and Well explained hadoop mapreduce programs are given below. 

If you wanna become master in Hadoop programming come to ORIEN IT Hadoop Training

*********************************************************************************************

How to find the word count in a file using hadoop mapreduce functionality and partition the data based on words starting character in best way?

package com.orienit.hadoop.training.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountJob implements Tool {

    private Configuration conf;

    @Override
    public Configuration getConf() {
        return conf;    // getting the configuration
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;    // setting the configuration
    }

    @Override
    public int run(String[] args) throws Exception {
        

     // initializing the job configuration
     Job wordCountJob = new Job(getConf());   
        
     // setting the job name
     wordCountJob.setJobName("Orien IT WordCount Job");   
        
      // to call this as a jar
      wordCountJob.setJarByClass(this.getClass());  
     

      // setting custom mapper class        
      wordCountJob.setMapperClass(WordCountMapper.class);    
 
       // setting custom reducer class
      wordCountJob.setReducerClass(WordCountReducer.class);           

       // setting custom combiner class
       wordCountJob.setCombinerClass(WordCountCombiner.class);  
 
       // setting no of reducers
       wordCountJob.setNumReduceTasks(26);      

        // setting custom partitioner class
        wordCountJob.setPartitionerClass(WordCountPartitioner.class);
       

        // setting mapper output key class: K2       
        wordCountJob.setMapOutputKeyClass(Text.class);   

        
        // setting mapper output value class: V2
        wordCountJob.setMapOutputValueClass(LongWritable.class);    

       
// setting reducer output key class: K3
        wordCountJob.setOutputKeyClass(Text.class);    
        

        // setting reducer output value class: V3
        wordCountJob.setOutputValueClass(LongWritable.class);          

       
// setting the input format class ,i.e for K1, V1 

        wordCountJob.setInputFormatClass(TextInputFormat.class);     

         // setting the output format class 
         wordCountJob.setOutputFormatClass(TextOutputFormat.class);        


         // setting the input file path

         FileInputFormat.addInputPath(wordCountJob, new Path(args[0]));

        // setting the output folder path

         FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));
       
         Path outputpath = new Path(args[1]);

         // delete the output folder if exists

         outputpath.getFileSystem(conf).delete(outputpath,true);
       

         // to execute the job and return the status

         return wordCountJob.waitForCompletion(true) ? 0 : -1;  
       
    }

    public static void main(String[] args) throws Exception {

        // start the job providing arguments and configurations

         ToolRunner.run(new Configuration(), new WordCountJob(), args);  
    }

}


package com.orienit.hadoop.training.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends 

    Mapper<LongWritable, Text, Text, LongWritable> {

    private Text text = new Text();
    private final static LongWritable one = new LongWritable(1);

    enum MyCounter {
        MAPS
    };

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer words = new StringTokenizer(line, " ");
        Counter staticCounter = context.getCounter(MyCounter.MAPS);
        Counter dynamicCounter = context.getCounter("OrienIT", "how many maps");
        while (words.hasMoreTokens()) {
            staticCounter.increment(1); // increment the static counter
            dynamicCounter.increment(1); // increment the dynamic counter
            text.set(words.nextToken());
            context.write(text, one); // write the map output
        }
    };
}


package com.orienit.hadoop.training.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, LongWritable> {

    @Override
    public int getPartition(Text text, LongWritable lw, int noOfReducers) {
        String word = text.toString().toLowerCase();
        return (Math.abs(word.charAt(0) - 'a')) % noOfReducers;        // return the partition number to write the reducer output
    }
}

package com.orienit.hadoop.training.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException,
            InterruptedException {
        long sum = 0;
        while (value.iterator().hasNext()) {
            sum += value.iterator().next().get();
        }
        context.write(key, new LongWritable(sum));    // write the reducer output
    };
}

package com.orienit.hadoop.training.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {

    protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException,
            InterruptedException {
        long sum = 0;
        while (value.iterator().hasNext()) {
            sum += value.iterator().next().get();
        }
        context.write(key, new LongWritable(sum));   
    };
}

*********************************************************************************************


Write a Distributed Grep programming

package com.orienit.hadoop.training.grep;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GrepJob implements Tool {

    private Configuration conf;

    @Override
    public Configuration getConf() {
        return conf;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Job grepJob = new Job(getConf());
        grepJob.setJobName("OrienIT Grep Job");
        grepJob.setJarByClass(this.getClass());
        grepJob.setMapperClass(GrepMapper.class);
        grepJob.setNumReduceTasks(0);
        grepJob.setOutputKeyClass(Text.class);
        grepJob.setOutputValueClass(
NullWritable.class);

        grepJob.setInputFormatClass(TextInputFormat.class);
        grepJob.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(grepJob, new Path(args[0]));
        FileOutputFormat.setOutputPath(grepJob, new Path(args[1]));
      
        Path outputpath = new Path(args[1]);
        outputpath.getFileSystem(conf).delete(outputpath,true);
      
        return grepJob.waitForCompletion(true) == true ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf1 = new Configuration();
        conf1.set("grep-arg", "Hyderabad");
        ToolRunner.run(conf1, new GrepJob(), args);
    }

}



package com.orienit.hadoop.training.grep;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GrepMapper extends Mapper<LongWritable, Text,
Text, NullWritable> {

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (value.toString().contains(context.getConfiguration().get("grep-arg"))) {
            context.write( value,
NullWritable.get());
        }
    };

}

  

*********************************************************************************************

Write a Distributed Sed programming

package com.orienit.hadoop.training.sed;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SedJob implements Tool {

    private Configuration conf;

    @Override
    public Configuration getConf() {
        return conf;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Job sedjob = new Job(getConf());
        sedjob.setJobName("OrienIT sed Count");
        sedjob.setJarByClass(this.getClass());
        sedjob.setMapperClass(SedMapper.class);
        sedjob.setNumReduceTasks(0);
        sedjob.setOutputKeyClass(Text.class);
        sedjob.setOutputValueClass(NullWritable.class);

        sedjob.setInputFormatClass(TextInputFormat.class);
        sedjob.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(sedjob, new Path(args[0]));
        FileOutputFormat.setOutputPath(sedjob, new Path(args[1]));
       
        Path outputpath = new Path(args[1]);
        outputpath.getFileSystem(conf).delete(outputpath,true);
       
        return sedjob.waitForCompletion(true) == true ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf1 = new Configuration();

        conf1.set("sed-arg1", "hadoop");
        conf1.set("sed-arg2", "BigData");
        ToolRunner.run(conf1, new SedJob(), args);
    }

}



package com.orienit.hadoop.training.sed;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SedMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (value.toString().contains(context.getConfiguration().get("sed-arg1"))) {
            context.write( new Text(value.toString().replaceAll(
                    context.getConfiguration().get("sed-arg1"), context.getConfiguration().get("sed-arg2"))),NullWritable.get());
        } else {
            context.write(value,NullWritable.get());
        }

    }
}


*********************************************************************************************

 

1 comment:

  1. Excellent post, must say thanks for the information you added to this post. I appreciate your post and looking forward for more.
    IOT training in mumbai
    IOT course in mumbai
    Big data hadoop training in mumbai
    Big data hadoop course in mumbai

    ReplyDelete

Related Posts Plugin for WordPress, Blogger...