Chaining is an instructional
procedure. As Hadoop is designed for batch processing of data, it is worth
important to know the execution of series of Jobs without manual
intervention.
In practical situations, not every
task can be solved using single map reduce program. We may need multiple
mappers & reducers to complete a specific task where the output of previous
map reduce program may be required as input to the successive mapper, & so
on.
Mapper1 -->Reducer1->Mapper2
-->Reducer2-->Mapper3 -->Reducer3 ...
Furthermore there might be
conditional dependencies existing across different jobs. If the output of
certain kinds of job is of X type, feed output to the Y job else feed output to
Z job.
Hadoop provides an easiest way
to chain job using JobControl class which construct a job control for a group of jobs. JobControl gives
flexibility to add collection of jobs, its job states, get ready, running,
waiting, & failed jobs. We add jobs using JobControl instance addJob(controlledJob1)
method. Any depending job can be added using addDependingJob(jobN)
method.
Any normal job has to be an instance
of Job class. This job has to be explicitly set as
ControlledJob in order to chain in JobControl.
Job job1 = JSONFileConvertJob(args);
ControlledJob cJob1 = new
ControlledJob(conf);
cJob1.setJob(job1);
JobControl jobctrl = new
JobControl("JobController");
jobctrl.addJob(cJob1);
The ControlledJob encapsulates a MapReduce job and its
dependency. It monitors the states of the depending jobs and updates the
state of this job. ControlledJob provides various
information about job such as Job Id, Job Name etc. Furthermore it provides an
extensive control over a particular job such as setting Job Id, Job Name, Job
state, submitting job, killing job etc. The related interdependent groups of
jobs can be placed under ControlledJob which will be placed at JobControl
being a top level master.
Finally the JobControl instance
is run using run() method. We can use a thread to start
JobControl providing an instance of JobRunner having JobControl
instance as argument of Runnable JobRunner as below.
Thread jobRunnerThread = new
Thread(new JobRunner(jobControlInstance))
jobRunnerThread.start()
JobControl provides a method allFinished() to check
whether all the jobs are finished or not. We can provide a continuous loop to
get latest statics of running jobs periodically at certain interval using
Thread.sleep(sleepTime). Finally when all jobs are executed successfully,
JobControl stop() will be used to set the thread state to STOPPING so that the thread
will stop when it wakes up.
Here is a simple example of job
chaining where the first job converts an input textFile of temperature data
into JSON file storing it in HDFS. The second Job takes the JSON file as input
at performs the map-reduce operation.
Add the below arguments at run
configuration of CustomFileConverterJob :
sample.txt
hdfs://localhost/sequentialJobOutput/
hdfs://localhost/sequentialJobOutput/reduce/
You can use the following data set
as sample.txt file.
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
Here the first argument will be the
simple input temperature driver data file, the second argument provides the
directory to store the output of the first job, which will be JSON file, &
the last argument will provide the directory to store reduced job output.
public class CustomFileConverterJob extends Configured implements Tool {
Configuration conf = new Configuration();
public int run(String[]
args) throws Exception
{
if(args.length !=3)
{
System.err.println("Usage: Temperature driver:
<Input file> <input path to
converted file> <outputpath>");
System.exit(-1);
}
conf = getConf();
/**
* Add Jobs in chaining
*/
Job job1 = JSONFileConvertJob(args);
ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);
/**
* JSON input file job
*/
Job job2 = mapJobFromJSONInputFile(args);
ControlledJob cJob2 = new ControlledJob(conf);
cJob2.setJob(job2);
JobControl jobctrl = new JobControl("JobController");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
Thread jobRunnerThread = new Thread(new JobRunner(jobctrl));
jobRunnerThread.start();
while (!jobctrl.allFinished()) {
System.out.println("Still running...");
Thread.sleep(5000);
}
System.out.println("done");
jobctrl.stop();
return 0;
}
/**
* Deletes the specified file or directory
*/
public static void deleteFileOrDirectory(String
inputPathOrFile) throws IOException{
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:8020/");
FileSystem fs = FileSystem.get(conf);
Path path = new Path(inputPathOrFile);
if(fs.exists(path)){
fs.delete(path, true);
}
}
/**
*
This Job takes input from JSON file
*
Performs map reduce operation
*/
public Job mapJobFromJSONInputFile(String[] args) throws Exception{
conf.set("jsonInput.start", "{");
conf.set("jsonInput.end", "}");
Job job = new Job(conf);
job.setJarByClass(CustomFileConverterJob.class);
job.setJobName("Process map reduce job from
JSON file");
job.setMapperClass(MaxTemperatureJSONMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(JsonInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]+"part*"));
FileOutputFormat.setOutputPath(job,new Path(args[2]));
return job;
}
/**
*
This Job takes any raw text file
*
Converts it into JSON file
*/
public Job JSONFileConvertJob(String[] args) throws IOException{
Job job = new Job();
job.setJarByClass(CustomFileConverterJob.class);
job.setJobName("JSON file converter job");
job.setMapperClass(JSONFileConverterMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
/**
* Custom JSON OutputFormatter Class
* This class will assist you to write you custom JSON class
*/
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job;
}
}
Here is the custom input
mapper file which is used to convert the sample.txt input file into JSON.
public class JSONFileConverterMapper extends Mapper<LongWritable, Text,
LongWritable, Text> {
static Logger log = Logger.getLogger(JSONFileConverterMapper.class);
private String line;
private String year;
private int airTemperature;
private String quality;
@Override
public void map(LongWritable
key, Text value, Context context) throws
IOException, InterruptedException{
line = value.toString(); //convert a value to string
year = line.substring(15,
19);
if (line.charAt(87)
== '+') { //
parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
quality = line.substring(92,
93);
Gson gson = new Gson();
JSONFileConverterMapper obj = new JSONFileConverterMapper();
obj.line = line;
obj.airTemperature = airTemperature;
obj.year = year;
obj.quality = quality;
String json = gson.toJson(obj);
Text
jsonText = new Text();
jsonText.set(json);
IntWritable intt = new IntWritable();
intt.set(123);
context.write(null, jsonText);
}
}
Here is the custom input
format reader used for parsing JSON string.
public class JsonParser {
public static class JsonInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "jsonInput.start";
public static final String END_TAG_KEY = "jsonInput.end";
public RecordReader<LongWritable,
Text> createRecordReader(
InputSplit
split, TaskAttemptContext context) {
return new JsonRecordReader();
}
public static class JsonRecordReader extends
RecordReader<LongWritable,
Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit
split, TaskAttemptContext context)
throws IOException, InterruptedException
{
Configuration
conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit
fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start +
fileSplit.getLength();
Path
file = fileSplit.getPath();
FileSystem
fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException
{
if (fsin.getPos()
< end) {
if (readUntilMatch(startTag, false))
{
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true))
{
key.set(fsin.getPos());
value.set(buffer.getData(),
0,
buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException
{
return key;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException
{
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos()
- start) / (float)
(end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
int c = 0;
while (true)
{
int b = fsin.read();
System.out.println((char)b);
System.out.println((char)match[i]);
System.out.println((char)startTag[i]);
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock) {
if (b == startTag[i])
{
c++;
}
buffer.write(b);
}
// check if we're matching:
if (b == match[i]) {
if (withinBlock) {
if (c == 0) {
i++;
} else {
i
= 0;
c--;
}
}
else{
i++;
}
if (i >= match.length)
return true;
} else
i
= 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0
&& fsin.getPos() >= end)
return false;
}
}
}
}
}
Here is the custom input JSON mapper
file which is used to parse the JSON input string into individual
elements.
public class MaxTemperatureJSONMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
static Logger log = Logger.getLogger(MaxTemperatureJSONMapper.class);
@Override
public void map(LongWritable
key, Text value, Context context)
throws IOException, InterruptedException
{
log.setLevel(Level.INFO);
log.info("Map
key:-" + key);
String line = value.toString();
JSONObject myJson;
try {
myJson = new JSONObject(line);
String year = myJson.getString("year");
int airTemperature = myJson.getInt("airTemperature");
String quality = myJson.getString("quality");
context.write(new Text(year), new IntWritable(airTemperature));
} catch (JSONException e) {
log.error("JSON
conversion exception");
e.printStackTrace();
}
}
}
Here is the Reducer class
implementation.
public class MaxTemperatureReducer extends
Reducer<Text,
IntWritable, Text, IntWritable> {
@Override
public void reduce(Text
key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue
= Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
That's it on job chaining. Here I
have shown a simplest example of taking the raw text file as input to first
job, feeding the output- the JSON file as input to the second Job & perform
map-reduce operation on it. There are multiple other ways to chain a job in
hadoop.
Cheers
Happy Learning :)