Thursday, 12 June 2014

Custom input split and custom NLineInputFormat Record in Hadoop

Hadoop by default splits an input data file into 64MB Of multiple logical partitions. Each of these chunk data files will be passed to individual mappers, thus providing the data parallelism. If the file size is less than 64MB then only one mapper will be used for computation. Let’s consider a scenario where we have a file less than 64MB but the logical computation within the file is too high. If this is to be handled by a single mapper, it will take significant time.  Let’s say we have 10 node cluster. Thus we want at least 10 mapper to be run, one on each node so that the overall computation time could be significantly reduced by 10 times. Furthermore, it is assumed that network transfer time << total computation time.    

There are multiple ways to custom split the file in hadoop. You can set your custom split size through the terminal while running the job, setting the configuration property at run time or at configuration fie, & overriding the method at input format class. Let’s look at briefly how you can do it.

There are multiple ways to custom split the file in hadoop. You can set your custom split size through the terminal while running the job, setting the configuration property at run time or at configuration fie, & overriding the method at input format class. Let’s look at briefly how you can do it.

1)      Setting split size using terminal:
hadoop jar movie.jar  -D mapred.max.split.size=8388608  inputFile.txt  outputLocation
As in the above command , the property mapred.max.split.size is used to specify the particular custom split value for the particular job. The parameter –D is used set any configuration properties through the terminal.

2)      Setting split size through Configuration file:


              conf.set("mapred.max.split.size", MAX_SPLIT_SIZE);
       conf.set("mapred.min.split.size", MIN_SPLIT_SIZE);
              JobConf jobConf = new JobConf(conf, MaxTemperatureDriver.class);
              Job job = new Job(jobConf);


Take an instance of configuration file and set the property mapred.max.split.size to the specified no of bytes. Here MAX_SPLIT_SIZE is any predefined value in bytes.  After setting the configuration, take an instance of JobConf which will be used to configure all the properties related to particular job. Then specify job configuration instance as a parameter to a specified job.

3)      Setting split size through CustomInputFormat Class:

Any class which extends FileInputFormat class can override computeSplitSize() method . This method has to return a long value which will determine the maximum split size length in terms of bytes.



@Override
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
          return super.computeSplitSize(blockSize, minSize, MAX_SPLIT_SIZE);
}

Here is a simple maximum temperature file example for custom split and NLineRecord Reader.

Directory Structure:








MaxTemperatureDriver.java

package com.HdMovie.customFormatter;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*This class is responsible for running map reduce job*/
public class MaxTemperatureDriver extends Configured implements Tool{
 private static final String MAX_SPLIT_SIZE = "8388608";
 private static final String MIN_SPLIT_SIZE = "0";
 public int run(String[] args) throws Exception
 {
 
  if(args.length !=2) {
   System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>");
   System.exit(-1);
  }
  
  Configuration conf = getConf();
  conf.setBoolean("mapred.task.profile", true);
  conf.set("mapred.task.profile.params", "-agentlib:hprof=cpu=samples," +
  "heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s");
  conf.set("mapred.task.profile.maps", "0-2");
  conf.set("mapred.task.profile.reduces", ""); // no reduces
  //conf.set("mapred.max.split.size", MAX_SPLIT_SIZE);
  //conf.set("mapred.min.split.size", MIN_SPLIT_SIZE);
  
  JobConf jobConf = new JobConf(conf, MaxTemperatureDriver.class);
  
  //Setting a job
  Job job = new Job(jobConf);
  
  job.setJarByClass(MaxTemperatureDriver.class);
  job.setJobName("Max Temperature");
  job.getConfiguration();
  
  
  /**
   * If we want to enable compression of map output
   * We can use the below configuration
   * Libraries can be GzipCodec , LzoCodec,SnappyCodec use following:
   * job.getConfiguration().setBoolean("mapred.compress.map.output", true);
   * job.getConfiguration().setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
   */
  
  //Adding input path of the file
  FileInputFormat.addInputPath(job, new Path(args[0]));
  job.setInputFormatClass(NLinesInputFormat.class);
  //Set output directory 
  FileOutputFormat.setOutputPath(job,new Path(args[1]));
  
  job.setMapperClass(MaxTemperatureMapper.class);
  job.setReducerClass(MaxTemperatureReducer.class);
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  System.exit(job.waitForCompletion(true) ? 0:1); 
  boolean success = job.waitForCompletion(true);
  return success ? 0 : 1;
 }
 

 public static void main(String[] args) throws Exception {
  MaxTemperatureDriver driver = new MaxTemperatureDriver();
  int exitCode = ToolRunner.run(driver, args);
  System.exit(exitCode);
 }
}

MaxTemperatureMapper.java

package com.HdMovie.customFormatter;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class MaxTemperatureMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {

 private static final int MISSING = 9999;
 static Logger log = Logger.getLogger(MaxTemperatureMapper.class);

 @Override
 public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

  System.out.println("Map key:"+key);
  log.setLevel(Level.INFO);
  log.info("Map key:-"+key);
  log.debug("Map key:-"+key);
  
  
   String lines = value.toString();
   String []lineArr = lines.split("\n");
   int lcount = lineArr.length;

   
   for (String line : lineArr) {
  
  line = value.toString(); //convert a value to string
  String year = line.substring(15, 19);
  int airTemperature;
  try{
   if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
           // signs
    airTemperature = Integer.parseInt(line.substring(88, 92));
   } else {
    
     airTemperature = Integer.parseInt(line.substring(87, 92));
   }
  }catch(NumberFormatException e){
   log.error("Error formatting number for given string..");
   airTemperature = 0;
  }
  String quality = line.substring(92, 93);//begin index inclusive, end index is exclusive
  if (airTemperature != MISSING && quality.matches("[01459]")) {
   context.write(new Text(year), new IntWritable(airTemperature));
  }
  
   }
 }
}

MaxTemperatureReducer.java


package com.HdMovie.customFormatter;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public  class NLinesInputFormat extends FileInputFormat<LongWritable,Text> {
 private static final long MAX_SPLIT_SIZE = 8388608;  //8MB SPLIT

 @Override
 public RecordReader<LongWritable,Text> createRecordReader(InputSplit inputsplit,
   TaskAttemptContext taskattemptcontext) throws IOException,
   InterruptedException { 
  
  return new NLinesRecordReader();
 }
 @Override
 protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
  return super.computeSplitSize(blockSize, minSize, MAX_SPLIT_SIZE);
 }

}

NLinesRecordReader.java


package com.HdMovie.customFormatter;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class NLinesRecordReader extends RecordReader<LongWritable, Text>{
    private final int NLINESTOPROCESS = 3;
    private LineReader in;
    private LongWritable key;
    private Text value = new Text();
    private long start =0;
    private long end =0;
    private long pos =0;
    private int maxLineLength;
 
@Override
    public void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }
 
@Override
    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return key;
    }
 
@Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
 
@Override
    public float getProgress() throws IOException, InterruptedException {
        if (start == end) {
            return 0.0f;
        }
        else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }
 
@Override
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
        FileSplit split = (FileSplit) genericSplit;
        
        final Path file = split.getPath();
        Configuration conf = context.getConfiguration();
        this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
        FileSystem fs = file.getFileSystem(conf);
        start = split.getStart();
        end= start + split.getLength();
        boolean skipFirstLine = false;
        FSDataInputStream filein = fs.open(split.getPath());
 
        if (start != 0){
            skipFirstLine = true;
            --start;
            filein.seek(start);
        }
        in = new LineReader(filein,conf);
        if(skipFirstLine){
            start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }
 
@Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        value.clear();
        final Text endline = new Text("\n");
        int newSize = 0;
        for(int i=0;i<NLINESTOPROCESS;i++){
            Text v = new Text();
            while (pos < end) {
                newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                value.append(v.getBytes(),0, v.getLength());
                value.append(endline.getBytes(),0, endline.getLength());
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                if (newSize < maxLineLength) {
                    break;
                }
            }
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

That's it from my side. Cheer up ....
Happy Learning.. !! :)

2 comments: