Saturday 12 April 2014

Job Chaining in Hadoop

Chaining is an instructional procedure. As Hadoop is designed for batch processing of data, it is worth important to know the execution of series of Jobs without manual intervention. 
In practical situations, not every task can be solved using single map reduce program. We may need multiple mappers & reducers to complete a specific task where the output of previous map reduce program may be required as input to the successive mapper, & so on.
Mapper1 -->Reducer1->Mapper2 -->Reducer2-->Mapper3 -->Reducer3  ...
 Furthermore there might be conditional dependencies existing across different jobs. If the output of certain kinds of job is of X type, feed output to the Y job else feed output to Z job. 


 Hadoop provides an easiest way to chain job using JobControl class which construct a job control for a group of jobs. JobControl gives flexibility to add collection of jobs, its job states, get ready, running, waiting, & failed jobs.  We add jobs using JobControl instance addJob(controlledJob1) method. Any depending job can be added using addDependingJob(jobN) method. 

Any normal job has to be an instance of Job class. This job has to be explicitly set as ControlledJob in order to chain in JobControl. 
Job job1 = JSONFileConvertJob(args);

ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);

JobControl jobctrl = new JobControl("JobController");
jobctrl.addJob(cJob1);

The  ControlledJob encapsulates a MapReduce job and its dependency. It monitors the states of the depending jobs and updates the state of this job. ControlledJob provides various information about job such as Job Id, Job Name etc. Furthermore it provides an extensive control over a particular job such as setting Job Id, Job Name, Job state, submitting job, killing job etc. The related interdependent groups of jobs can be placed under ControlledJob which will be placed at JobControl  being a top level master. 

Finally the JobControl instance is run using run() method. We can use a thread  to start JobControl providing an instance of  JobRunner having JobControl  instance as argument of Runnable JobRunner as below. 
Thread jobRunnerThread = new Thread(new JobRunner(jobControlInstance))
jobRunnerThread.start()

JobControl provides a method allFinished() to check whether all the jobs are finished or not. We can provide a continuous loop to get latest statics of running jobs periodically at certain interval using Thread.sleep(sleepTime). Finally when all jobs are executed successfully, JobControl stop() will be used to set the thread state to STOPPING so that the thread will stop when it wakes up.


Here is a simple example of job chaining where the first job converts an input textFile of temperature data into JSON file storing it in HDFS. The second Job takes the JSON file as input at performs the map-reduce operation.  


Add the below arguments at run configuration of CustomFileConverterJob :
sample.txt hdfs://localhost/sequentialJobOutput/ hdfs://localhost/sequentialJobOutput/reduce/

You can use the following data set as sample.txt file.

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999



Here the first argument will be the simple input temperature driver data file, the second argument provides the directory to store the output of the first job, which will be JSON file, & the last argument will provide the directory to store reduced job output.   

public class CustomFileConverterJob extends Configured implements Tool  {
       Configuration conf = new Configuration();
       public int run(String[] args) throws Exception
       {
      
              if(args.length !=3) {
                     System.err.println("Usage: Temperature driver:
                     <Input file> <input path to converted file> <outputpath>");
                     System.exit(-1);
              }
             
              conf  = getConf();
             
              /**
               * Add Jobs in chaining
               */
              Job job1 = JSONFileConvertJob(args);
                           
              ControlledJob cJob1 = new ControlledJob(conf);
              cJob1.setJob(job1);
         
              /**
               * JSON input file job
               */
              Job job2 =  mapJobFromJSONInputFile(args);
              ControlledJob cJob2 = new ControlledJob(conf);
              cJob2.setJob(job2);
             
             
              JobControl jobctrl = new JobControl("JobController");
              jobctrl.addJob(cJob1);
             
              jobctrl.addJob(cJob2);
              cJob2.addDependingJob(cJob1);
              
              
              Thread jobRunnerThread = new Thread(new JobRunner(jobctrl));
              jobRunnerThread.start();

              while (!jobctrl.allFinished()) {
                     System.out.println("Still running...");
                     Thread.sleep(5000);
              }
              System.out.println("done");
              jobctrl.stop();
             
            
              return 0;
        }

       /**
         * Deletes the specified file or directory
       */
       public static void deleteFileOrDirectory(String inputPathOrFile) throws IOException{
        
              Configuration conf = new Configuration();
              conf.set("fs.default.name""hdfs://localhost:8020/");
             
              FileSystem fs =  FileSystem.get(conf);
             
              Path path = new Path(inputPathOrFile);
              if(fs.exists(path)){
                     fs.delete(path, true);
              }
       }

/**
        * This Job takes input from JSON file
        * Performs map reduce operation
        */
       public Job mapJobFromJSONInputFile(String[] args) throws Exception{
              conf.set("jsonInput.start""{");
              conf.set("jsonInput.end""}");
              Job job = new Job(conf);
              job.setJarByClass(CustomFileConverterJob.class);
              job.setJobName("Process map reduce job from JSON file");
              job.setMapperClass(MaxTemperatureJSONMapper.class);
              job.setReducerClass(MaxTemperatureReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              job.setInputFormatClass(JsonInputFormat.class);
              FileInputFormat.addInputPath(job, new Path(args[1]+"part*"));
              FileOutputFormat.setOutputPath(job,new Path(args[2]));
                    
              return job;
       }
/**
        * This Job takes any raw text file
        * Converts it into JSON file
       */
       public Job JSONFileConvertJob(String[] args) throws IOException{
             
              Job job = new Job();
              job.setJarByClass(CustomFileConverterJob.class);
              job.setJobName("JSON file converter job");
             
             
              job.setMapperClass(JSONFileConverterMapper.class);
              job.setReducerClass(MaxTemperatureReducer.class);
              job.setNumReduceTasks(0);
             
             
                          
               job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
          
              FileInputFormat.setInputPaths(job,  new Path(args[0]));
              /**
               * Custom JSON OutputFormatter Class
               * This class will assist you to write you custom JSON class
               */
              
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
             
              return job;
       }
}

Here is the custom input  mapper file which is used to convert the sample.txt input file into JSON.

public class JSONFileConverterMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
      
       static Logger log = Logger.getLogger(JSONFileConverterMapper.class);
      
       private String line;
       private String year;
       private int airTemperature;
       private String quality;
      

       @Override
       public void map(LongWritable key, Text value, Context context) throws  IOException, InterruptedException{
             
              line = value.toString(); //convert a value to string
              year = line.substring(15, 19);
              if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
                                                                     // signs
                     airTemperature = Integer.parseInt(line.substring(88, 92));
              } else {
                     airTemperature = Integer.parseInt(line.substring(87, 92));
              }
              quality = line.substring(92, 93);
             
             
              Gson gson = new Gson();
              JSONFileConverterMapper obj = new JSONFileConverterMapper();
                     obj.line = line;
                     obj.airTemperature = airTemperature;
                     obj.year = year;
                     obj.quality = quality;
           String json = gson.toJson(obj);
           Text jsonText = new Text();
           jsonText.set(json);

             
                          
              IntWritable intt = new IntWritable();
              intt.set(123);
      
              context.write(null, jsonText);
       }
}

Here is the custom  input format reader used for parsing JSON string.  

public class JsonParser {

       public static class JsonInputFormat extends TextInputFormat {

              public static final String START_TAG_KEY = "jsonInput.start";
              public static final String END_TAG_KEY = "jsonInput.end";

              public RecordReader<LongWritable, Text> createRecordReader(
                           InputSplit split, TaskAttemptContext context) {
                     return new JsonRecordReader();
              }

              public static class JsonRecordReader extends
                           RecordReader<LongWritable, Text> {
                     private byte[] startTag;
                     private byte[] endTag;
                     private long start;
                     private long end;
                     private FSDataInputStream fsin;
                     private DataOutputBuffer buffer = new DataOutputBuffer();

                     private LongWritable key = new LongWritable();
                     private Text value = new Text();

                     @Override
                     public void initialize(InputSplit split, TaskAttemptContext context)
                                  throws IOException, InterruptedException {
                           Configuration conf = context.getConfiguration();
                           startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                           endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                           FileSplit fileSplit = (FileSplit) split;

                           // open the file and seek to the start of the split
                           start = fileSplit.getStart();
                           end = start + fileSplit.getLength();
                           Path file = fileSplit.getPath();
                           FileSystem fs = file.getFileSystem(conf);
                           fsin = fs.open(fileSplit.getPath());
                           fsin.seek(start);

                     }

                     @Override
                     public boolean nextKeyValue() throws IOException,
                                  InterruptedException {
                           if (fsin.getPos() < end) {
                                  if (readUntilMatch(startTagfalse)) {
                                         try {
                                                buffer.write(startTag);
                                                if (readUntilMatch(endTagtrue)) {
                                                       key.set(fsin.getPos());
                                                       value.set(buffer.getData(), 0,
                                                                     buffer.getLength());
                                                       return true;
                                                }
                                         } finally {
                                                buffer.reset();
                                         }
                                  }
                           }
                           return false;
                     }

                     @Override
                     public LongWritable getCurrentKey() throws IOException,
                                  InterruptedException {
                           return key;
                     }

                     @Override
                     public Text getCurrentValue() throws IOException,
                                  InterruptedException {
                           return value;
                     }

                     @Override
                     public void close() throws IOException {
                           fsin.close();
                     }

                     @Override
                     public float getProgress() throws IOException {
                           return (fsin.getPos() - start) / (float) (end - start);
                     }

                     private boolean readUntilMatch(byte[] match, boolean withinBlock)
                                  throws IOException {
                           int i = 0;
                           int c = 0;
                           while (true) {
                                  int b = fsin.read();
                                  System.out.println((char)b);
                                  System.out.println((char)match[i]);
                                  System.out.println((char)startTag[i]);
                                  // end of file:
                                  if (b == -1)
                                         return false;
                                  // save to buffer:
                                  if (withinBlock) {
                                         if (b == startTag[i]) {
                                                c++;
                                         }
                                         buffer.write(b);
                                  }
                                  // check if we're matching:
                                  if (b == match[i]) {
                                         if (withinBlock) {
                                                if (c == 0) {
                                                       i++;
                                                } else {
                                                       i = 0;
                                                       c--;
                                                }
                                         }
                                         else{
                                                i++;
                                         }

                                         if (i >= match.length)
                                                return true;
                                  } else
                                         i = 0;
                                  // see if we've passed the stop point:
                                  if (!withinBlock && i == 0 && fsin.getPos() >= end)
                                         return false;
                           }
                     }
              }
       }

}



Here is the custom input JSON mapper file which is used to parse the JSON input string into individual elements. 

public class MaxTemperatureJSONMapper extends
              Mapper<LongWritable, Text, Text, IntWritable> {

       private static final int MISSING = 9999;
       static Logger log = Logger.getLogger(MaxTemperatureJSONMapper.class);

       @Override
       public void map(LongWritable key, Text value, Context context)
                     throws IOException, InterruptedException {

              log.setLevel(Level.INFO);
              log.info("Map key:-" + key);

              String line = value.toString();

              JSONObject myJson;
              try {
                     myJson = new JSONObject(line);
                     String year = myJson.getString("year");
                     int airTemperature = myJson.getInt("airTemperature");
                     String quality = myJson.getString("quality");
                     context.write(new Text(year), new IntWritable(airTemperature));

              } catch (JSONException e) {
                     log.error("JSON conversion exception");
                     e.printStackTrace();
              }

       }
}

Here is the Reducer class implementation.

public class MaxTemperatureReducer extends
              Reducer<Text, IntWritable, Text, IntWritable> {

       @Override
       public void reduce(Text key, Iterable<IntWritable> values, Context context)
                     throws IOException, InterruptedException {

              int maxValue = Integer.MIN_VALUE;
              for (IntWritable value : values) {
                     maxValue = Math.max(maxValue, value.get());
              }
              context.write(key, new IntWritable(maxValue));
       }
}


That's it on job chaining. Here I have shown a simplest example of taking the raw text file as input to first job, feeding the output- the JSON file as input to the second Job & perform map-reduce operation on it. There are multiple other ways to chain a job in hadoop.  

Cheers 
Happy Learning :)