Hadoop by default splits an input data file into 64MB Of
multiple logical partitions. Each of these chunk data files will be passed to
individual mappers, thus providing the data parallelism. If the file size is
less than 64MB then only one mapper will be used for computation. Let’s
consider a scenario where we have a file less than 64MB but the logical computation
within the file is too high. If this is to be handled by a single mapper, it
will take significant time. Let’s say we
have 10 node cluster. Thus we want at least 10 mapper to be run, one on each
node so that the overall computation time could be significantly reduced by 10 times.
Furthermore, it is assumed that network transfer time << total computation
time.
There are multiple ways to custom split the file in hadoop. You
can set your custom split size through the terminal while running the job,
setting the configuration property at run time or at configuration fie, & overriding
the method at input format class. Let’s look at briefly how you can do it.
There are multiple ways to custom split the file in hadoop. You
can set your custom split size through the terminal while running the job,
setting the configuration property at run time or at configuration fie, & overriding
the method at input format class. Let’s look at briefly how you can do it.
1) Setting split size using terminal:
hadoop jar movie.jar -D mapred.max.split.size=8388608 inputFile.txt
outputLocation
As in the above command , the property mapred.max.split.size is used to specify the particular custom split value
for the particular job. The parameter –D is used set any configuration
properties through the terminal.
2) Setting split size through Configuration
file:
conf.set("mapred.max.split.size", MAX_SPLIT_SIZE);
conf.set("mapred.min.split.size", MIN_SPLIT_SIZE);
JobConf jobConf = new JobConf(conf, MaxTemperatureDriver.class);
Job job = new Job(jobConf);
Take an instance of
configuration file and set the property mapred.max.split.size
to the specified no of bytes. Here MAX_SPLIT_SIZE is any predefined
value in bytes. After setting the
configuration, take an instance of JobConf
which will be used to configure all the properties related to particular
job. Then specify job configuration instance as a parameter to a specified job.
3) Setting split size through
CustomInputFormat Class:
Any class which extends FileInputFormat class can override
computeSplitSize() method . This method has to return a long value which will determine the maximum split size length in
terms of bytes.
@Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return super.computeSplitSize(blockSize, minSize, MAX_SPLIT_SIZE); }
Here is a simple maximum temperature file example for custom
split and NLineRecord Reader.
Directory Structure:
MaxTemperatureDriver.java
package com.HdMovie.customFormatter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /*This class is responsible for running map reduce job*/ public class MaxTemperatureDriver extends Configured implements Tool{ private static final String MAX_SPLIT_SIZE = "8388608"; private static final String MIN_SPLIT_SIZE = "0"; public int run(String[] args) throws Exception { if(args.length !=2) { System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>"); System.exit(-1); } Configuration conf = getConf(); conf.setBoolean("mapred.task.profile", true); conf.set("mapred.task.profile.params", "-agentlib:hprof=cpu=samples," + "heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s"); conf.set("mapred.task.profile.maps", "0-2"); conf.set("mapred.task.profile.reduces", ""); // no reduces //conf.set("mapred.max.split.size", MAX_SPLIT_SIZE); //conf.set("mapred.min.split.size", MIN_SPLIT_SIZE); JobConf jobConf = new JobConf(conf, MaxTemperatureDriver.class); //Setting a job Job job = new Job(jobConf); job.setJarByClass(MaxTemperatureDriver.class); job.setJobName("Max Temperature"); job.getConfiguration(); /** * If we want to enable compression of map output * We can use the below configuration * Libraries can be GzipCodec , LzoCodec,SnappyCodec use following: * job.getConfiguration().setBoolean("mapred.compress.map.output", true); * job.getConfiguration().setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class); */ //Adding input path of the file FileInputFormat.addInputPath(job, new Path(args[0])); job.setInputFormatClass(NLinesInputFormat.class); //Set output directory FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0:1); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { MaxTemperatureDriver driver = new MaxTemperatureDriver(); int exitCode = ToolRunner.run(driver, args); System.exit(exitCode); } }
MaxTemperatureMapper.java
package com.HdMovie.customFormatter; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; static Logger log = Logger.getLogger(MaxTemperatureMapper.class); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("Map key:"+key); log.setLevel(Level.INFO); log.info("Map key:-"+key); log.debug("Map key:-"+key); String lines = value.toString(); String []lineArr = lines.split("\n"); int lcount = lineArr.length; for (String line : lineArr) { line = value.toString(); //convert a value to string String year = line.substring(15, 19); int airTemperature; try{ if (line.charAt(87) == '+') { // parseInt doesn't like leading plus // signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } }catch(NumberFormatException e){ log.error("Error formatting number for given string.."); airTemperature = 0; } String quality = line.substring(92, 93);//begin index inclusive, end index is exclusive if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } }
MaxTemperatureReducer.java
package com.HdMovie.customFormatter; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class NLinesInputFormat extends FileInputFormat<LongWritable,Text> { private static final long MAX_SPLIT_SIZE = 8388608; //8MB SPLIT @Override public RecordReader<LongWritable,Text> createRecordReader(InputSplit inputsplit, TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException { return new NLinesRecordReader(); } @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return super.computeSplitSize(blockSize, minSize, MAX_SPLIT_SIZE); } }
NLinesRecordReader.java
package com.HdMovie.customFormatter; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.LineRecordReader.LineReader; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class NLinesRecordReader extends RecordReader<LongWritable, Text>{ private final int NLINESTOPROCESS = 3; private LineReader in; private LongWritable key; private Text value = new Text(); private long start =0; private long end =0; private long pos =0; private int maxLineLength; @Override public void close() throws IOException { if (in != null) { in.close(); } } @Override public LongWritable getCurrentKey() throws IOException,InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { FileSplit split = (FileSplit) genericSplit; final Path file = split.getPath(); Configuration conf = context.getConfiguration(); this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); FileSystem fs = file.getFileSystem(conf); start = split.getStart(); end= start + split.getLength(); boolean skipFirstLine = false; FSDataInputStream filein = fs.open(split.getPath()); if (start != 0){ skipFirstLine = true; --start; filein.seek(start); } in = new LineReader(filein,conf); if(skipFirstLine){ start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } value.clear(); final Text endline = new Text("\n"); int newSize = 0; for(int i=0;i<NLINESTOPROCESS;i++){ Text v = new Text(); while (pos < end) { newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); value.append(v.getBytes(),0, v.getLength()); value.append(endline.getBytes(),0, endline.getLength()); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } }
That's it from my side. Cheer up ....
Happy Learning.. !! :)
Thanks for the info...
ReplyDeleteHi Can you please add your Reducer logic here
ReplyDeleteIt'sVery informative blog and useful article thank you for sharing with us , keep posting
ReplyDeletelearn more
Hadoop Admin Online Course Hyderabad