Thursday, 26 June 2014

Introduction to Hadoop

In 1990’s around 1400 MB of data was the maximum and had  a transfer speed of 4.4 MB/s, i.e. It would take around 311 sec or almost 5 minutes to transfer data. 20 years later, one terabyte drives are the norm, but the transfer speed is around 100 MB/S. Time required to read the whole disk is (1*1024*1024/(100*60*60)) = 2.9 hours.

The obvious way to reduce the time is to read from multiple disks at once. Imagine if we had 100 drives, each holding one hundredth of the data. Working in parallel, we could read the data in under two minutes.  ==>2.9*100/60 .The first problem to solve is hardware failure: as soon as you start using many pieces of hardware, the chance that one will fail is fairly high. A common way of avoiding data loss is through replication: redundant copies of the data are kept by the system so that in the event of failure, there is another copy available. This is how RAID works, for instance, although Hadoop’s file system, the Hadoop Distributed File system (HDFS), takes a slightly different approach.

The second problem is that most analysis tasks need to be able to combine the data in some way; data read from one disk may need to be combined with the data from any of the other 99 disks. Hadoop provides: a reliable shared storage and analysis system. The storage is provided by HDFS and analysis by Map Reduce.

Seeking:  is the process of moving the disk’s head to a particular place on the disk to read or write data. It characterizes the latency of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth.

Map Reduce VS RDBMS:

Map Reduce is a good fit for problems that need to analyze the whole data set, in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries or updates, where the data set has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data.

Map Reduce:  Suits applications where the data is written once, and read many times, relational database is good for data sets that are continually updated. Relational data is often normalized to retain its integrity and remove redundancy. Normalization poses problems for Map Reduce. A web server log is a good example of a set of records that is not normalized.

If you double the size of the input data, a job will run twice as slow. But if you also double the size of the cluster, a job will run as fast as the original one. This is not generally true of SQL queries. Map Reduce tries to collocate the data with the compute node, so data access is fast since it is local. This feature, known as data locality, is at the heart of Map Reduce and is the reason for its good performance.

Map Reduce are limited to key and value types that are related in specified ways, and mappers and reducers run with very limited coordination between one another (the mappers pass keys and values to reducers).


Hadoop introduction:

Created by Doug Cutting In 2004, Google published the paper that introduced Map Reduce to the world.
Nutch developers had a working Map Reduce implementation in Nutch(Apache) 

Hadoop & Hadoop Ecosystem:

Common: A set of components and interfaces for distributed file systems and general I/O
(Serialization, Java RPC, persistent data structures).
  • HDFS: Distributed file system that runs on large clusters of commodity machines
  • Pig:  Data flow language, explores very large datasets.
  • Hive: Distributed data warehouse
  • HBase: Column-oriented database
  • Zookeeper:  Distributed, highly available coordination service.
  • Sqoop: Moving data between relational databases and HDFS.

Map Reduce:
  • Programming model for data processing
  • Map Reduce programs are inherently parallel.
  • What’s the highest recorded global temperature for each year in the dataset?
  • Performance baseline

Map Reduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.

Choose text input format each line can be a text values, & key be the offset.

Use case find max temperature

Sample data:

0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...


Data presented to map function (Key -Value pair)

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)




Map function output

(1950, 0)
(1950, 22)
(1950, 11)
(1949, 111)
(1949, 78)


The output from the map function is processed by the Map Reduce framework before
Being sent to the reduce function. =>Sorts & Groups [Combiner]


(1949, [111, 78])
(1950, [0, 22, 11])


Reducer:

All the reduce function has to do now is iterate through the list and pick up the maximum reading:

(1949, 111)
(1950, 22)






Java Map Reduce Code:


3 things =>Map function, Reduce function, some code to run job.


Creating project:


Create a new java project

Add jar from: Hadoop home & Hadoop Home/lib
C:\Hadoop\hadoop-1.1.0-SNAPSHOT
C:\Hadoop\hadoop-1.1.0-SNAPSHOT\lib




General procedure:
Set a job
Assign mapper class
Assign reducer class


CustomMapper class extends Mapper (input key, input value, outputKey, outputValue)



Input key -- some key
Input value --Line
Output Key -- year
Output Value --MaxTemp


map function (inputKey, inputValue)



Hadoop own data types:


These are found in the org.apache.hadoop.io package.
LongWritable ==>Like  Long
Text  =>Like String
IntWritable ==>Like Integer




Setting jar by class:

Hadoop will distribute this file around the cluster, & locate the relevant jar.
job.setJarByClass(MaxTemperatureDriver.class);




Specify input and output path:


FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));


Setting mapper and reducers:


Hadoop send individual jar to each
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);


Waiting for completion:

After setting the job, hadoop will wait for completion of job. This is done by :
(job.waitForCompletion(true) ? 0:1)




Data Flow:


Hadoop runs the job by dividing it into tasks:  map tasks and reduce tasks.

Job Execution process is controlled by 2 types of nodes:
  • A job tracker : coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers.
  • Task trackers: run tasks and send progress reports to the job tracker. Task fails, the job tracker can reschedule it on a different task tracker.


Splits:
  • Input to Map reduce job are split into fixed pieces by Hadoop.
  • Creates one map task for each split.
  • Many splits, less time to process rather than whole input.
  • Processing splits in parallel will make it better load balanced.
  • Faster machine will be able to process it more proportionally.
  • Splits are too small managing the splits and of map task creation begins to dominate the total job execution time.
  • Good split size tends to be the size of an HDFS block, 64MB by default.


Data locality optimization:

  • Run the map task on a node where the input data resides in HDFS.
  • Helps reduce cluster b/w
  • Splits can be spanned over multiple blocks in HDFS.


Map tasks & Reduce task:

  • As maps tasks are intermediate steps, those are stored in local disk instead of HDFS because it is processed by reduced task to produce the final output.
  • The input to the single reducer is normally the output from the all the mappers.
  • The output of reduce are tasks are stored in HDFS.
  • The first replica stored at local node & other replicas are stored at off rack node.


Combiner:
  • To minimize the data transferred between map and reduce tasks, Hadoop uses combiners after mapper to combine multiple mappers output which is then send to reducer.
  • After setting mapper class, set combiner if you need by and defined on Reducer class :
job.setCombinerClass(MaxTemperatureReducer.class);




Reducer:
  • Hadoop Streaming uses UNIX standard streams as the interface between Hadoop and your program.
  • Streaming is naturally suited for text processing
  • Hadoop Pipes: C++ interface to Hadoop Map Reduce





Configuration:


C:\hadoop\hadoop-1.1.0-SNAPSHOT\conf
hdfs-site.xml:


dfs.replication This property set the replication factor for data.
dfs.webhdfs.enabled



Summary:

Active Hadoop node || Passive Hadoop (Secondary name node)




Name node server: is a Hadoop name node server that runs


20-35 Peta Bytes of data large cluster can support.
Underlying file System options [HDFS]:


You do not format the hard drive with HDFS but use ext3, ext4 or XFS slave hard drive.
HDFS =>Abstract file system
Ext4 recommendations for hard drive format.




Commands:


hadoop fs -help


bin
/hadoop fs
-ls
-mkdir
-copyFromLocal
-copyToLocal
-moveToLocal
-rm
-tail
-chmod
-setrep -w 4 -R  /dir1/s-dir/   //Replication factor change to 4 recursively for this subdirectory


Yahoo 4500 node clusters:


110 Racks
Each rack 40 slaves
At top there is rack switch for each rack
8 core switches
Every slaves machine has 2 Cat five cables  going at top of rack switch, that means each to of rack switch has 40*2 = 80 Ports in each rack.


Within rack its’ 1GB network, However core switch layer is 10GB network, of which 8GB           is dedicated for HDFS & rest 2GB for Map reduce administration, & user traffic on the network.


Rack awareness:
 Name node is rack aware.


Commands:


Hadoop
     FS




What are the files at root of HDFS?
haddop fs -ls /


dfs -ls /user/hduser/  ==>works at both linux & windows


Make new dir:


mkdir  /user/clusdra/newDir


hadoop fs -ls copyFromLocaal shakesspear.txt /user/username/newDir


Filesystem check command:


% hadoop fsck / -files -blocks
in windows: hadoop fsck \ -files -blocks


-copyToLocal:
  • hadoop fs -copyToLocal hdfs://localhost/user/hadoop/test1.txt C:\Users\prems.bist\Desktop\test.txt


-copyFromLocal:
hadoop fs -copyFromLocal D:\tutorials\test.txt hdfs://localhost/user/hadoop/test1.txt


Listing files inside the directory of hdfs:


hadoop fs -ls hdfs://localhost/user/hadoop/yourFolder => Enlists all the files at yourFolder dir.


can use the relative path as well as:
hadoop fs -copyFromLocal input/docs/quangle.txt  /user/tom/quangle.txt


command Examples in windows:


hadoop fs -mkdir books
 //Creates directory book at C:\users\prems.bist  but it will not be seen thorough user                                                              prems.bist




hadoop fs -ls




Example 2 -command :


hadoop fs -mkdir input               //makes directory named input
hadoop fs -ls input                     //It will so nothing as there is no files
hadoop fs -put *.xml input       //Puts all the files starting with .xml  in input directory


output:
Found 1 items
drwxr-xr-x   - prems.bist supergroup          0 2014-03-13 13:09 /user/prems.bist/books


Column first -->filePermssion
Second column =>replication factory which is (-) thats means not defined yet.
Third column : Owner
Fourth column : group
Fifth column: size of the file in byte 0 for dierctory
6th & seventh column: Last modified date & time
8th Column: Absolutely name of file or directory




Examples 3 - command Running jar through command line:


Create directory
hadoop dfs -mkdir hdfs:/inputFolder
Show directory
hadoop dfs -ls hdfs:/


Copy the files from localSystem
hadoop dfs -copyFromLocal D:\SampleData\input hdfs:/inputFolder


Run the Jar file
hadoop jar jarName runningClassName inputFileLocation outputFileLocation
ex-
hadoop jar D:\target\MapReduce-1.0-SNAPSHOT.jar com.impetus.hadoop.WordCount hdfs:/wordCountFolder/sampleData hdfs:/outputFolder


Viewing file output:
hadoop dfs -cat /outputFolder/part-00000




List all the files in root directory of local filesystem ie( files of c drive):


hadoop fs -ls file:///  




HDFS:

Distributed filesystems:
  • Filesystems that manage the storage across a network of machines.


Commodity hardware: Normal hardware devices
Streaming data access: Time to read all dataset is more important than reading a first one.
Low-latency data access:
Lots of small files: Since the namenode holds filesystem metadata in memory, the limit to the number of files in a filesystem is governed by the amount of memory on the namenode.
Multiple writers, arbitrary file modifications : Files in HDFS may be written to by a single writer




Block:


  • Is the minimum amount of data that it a disk can read or write.
  • File system blocks are typically a few kilobytes in size, while disk blocks are normally 512 bytes.
  • HDFS, too, has the concept of a block, but it is a much larger unit—64 MB. Like in a file system for a single disk, files in HDFS are broken into block-sized chunks,
  • Which are stored as independent units. HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost of seeks.
  • A quick calculation shows that if the seek time is around 10 ms, and the transfer rate is 100 MB/s, then to make the seek time 1% of the transfer time, we need to make the block size around 100 MB.
  • +
Fail over and Fencing:

The transition from the active namenode to the standby is managed by a new entity in
The system called the failover controller.  The first implementation uses ZooKeeper to ensure that only one name node is active. Failover may also be initiated manually by an administrator, in the case of routine maintenance, for example. This is known as a graceful failover, since the failover controller
Arranges an orderly transition for both name nodes to switch roles. STONITH =>Shoot the other node in the head.


Default HDFS port:  8020



org.apache.hadoop.fs.FileSystem represents a file system in Hadoop.




Accessing HDFS over Http:


2 ways:
1) Using HDFS daemons that server http request to clients
2) Using distributed file system API’s (Using proxies)


Hadoop file systems:


Name
URI scheme
Class
Description
Local
file
fs.LocalFileSystem
HDFS
hdfs
hdfs.DistributedFileSystem
HFTP
hftp
hdfs.HftpFileSystem
Provides read only access to HDFS over http
HSFTP
hsftp
hdfs.HsftpFileSystem
Providing read-only access to HDFS over HTTPS.
WebHDFS
webhdfs
hdfs.web.WebHdfsFile
System
Secured read write access to http.
HAR
har
fs.HarFileSystem
Archiving
KFS
kfs
fs.kfs.KosmosFileSystem
CloudStore writtenn in C++
FTP
ftp
fs.ftp.FTPFileSystem
FTP server backed file system
S3
s3
Amazon  backed
Distributed
RAID
hdfs
hdfs.DistributedRaidFi
leSystem
“RAID” version of HDFS
View
viewfs
viewfs.ViewFileSystem
client-side mount table for other Hadoop filesystems

File System functions:

FilSystem dfs = FileSystem.get(config);
getWorkingDirectory => Returns working path
delete To delete file.
createNewFile()
exists(src)
getDefaultBlockSize()

available() //returns estimated number of byte remaining



//Create directory

hadoop dfs -mkdir hdfs:/inputFolder
//Show directory
fs:/ //Copy the files
hadoop dfs -ls h
d from localSystem
mLocal D:\SampleData\input hdfs:/inputFolder //Run the Jar file
hadoop dfs -copyFr
o
hadoop jar jarName runningClassName inputFileLocation outputFileLocation
hdfs:/wordCountFolder/sampleData hdfs:/outputFolder //output Can be seen as : hadoop dfs -cat /outputFolder/part-00000
hadoop jar D:\target\MapReduce-1.0-SNAPSHOT.jar com.impetus.hadoop.WordCount


References: Hadoop Defnitive Guide
Happy Learning cheers.!!


1 comment:

  1. Good work Sir, Thanks for the proper explanation about HDFS. I found one of the good resource related to HDFS and Hadoop. It is providing in-depth knowledge on HDFS and HDFS Architecture. which I am sharing a link with you where you can get more clear on HDFS and Hadoop. To know more Just have a look at Below link

    HDFS
    Hadoop
    HDFS Architecture

    ReplyDelete