In 1990’s around 1400 MB
of data was the maximum and had a transfer speed of 4.4 MB/s, i.e. It
would take around 311 sec or almost 5 minutes to transfer data. 20 years later,
one terabyte drives are the norm, but the transfer speed is around 100 MB/S. Time
required to read the whole disk is (1*1024*1024/(100*60*60)) = 2.9 hours.
The obvious way to
reduce the time is to read from multiple disks at once. Imagine if we had
100 drives, each holding one hundredth of the data. Working in parallel, we
could read the data in under two minutes. ==>2.9*100/60 .The
first problem to solve is hardware failure: as soon as you start using many
pieces of hardware, the chance that one will fail is fairly high. A common way
of avoiding data loss is through replication: redundant copies of the data are
kept by the system so that in the event of failure, there is another copy
available. This is how RAID works, for instance, although Hadoop’s file
system, the Hadoop Distributed File system (HDFS), takes a slightly different
approach.
The second problem is
that most analysis tasks need to be able to combine the data in some way;
data read from one disk may need to be combined with the data from any of
the other 99 disks. Hadoop provides: a reliable shared storage and analysis system.
The storage is provided by HDFS and analysis by Map Reduce.
Seeking: is the process of moving the disk’s head
to a particular place on the disk to read or write data. It characterizes the
latency of a disk operation, whereas the transfer rate corresponds to a disk’s
bandwidth.
Map Reduce VS RDBMS:
Map Reduce is a good fit
for problems that need to analyze the whole data set, in a batch fashion,
particularly for ad hoc analysis. An RDBMS is good for point queries or
updates, where the data set has been indexed to deliver low-latency retrieval
and update times of a relatively small amount of data.
Map Reduce: Suits
applications where the data is written once, and read many times,
relational database is good for data sets that are continually updated. Relational
data is often normalized to retain its integrity and remove
redundancy. Normalization poses problems for Map Reduce. A web server
log is a good example of a set of records that is not normalized.
If you double the size
of the input data, a job will run twice as slow. But if you also double the
size of the cluster, a job will run as fast as the original one. This is not
generally true of SQL queries. Map Reduce tries to collocate the data with
the compute node, so data access is fast since it is local. This feature, known
as data locality, is at the heart of Map Reduce and is the reason for its
good performance.
Map Reduce are limited
to key and value types that are related in specified ways, and mappers and
reducers run with very limited coordination between one another (the
mappers pass keys and values to reducers).
Hadoop introduction:
Created by Doug
Cutting In 2004, Google published the paper that introduced Map Reduce to
the world.
Nutch developers had a
working Map Reduce implementation in Nutch(Apache)
Hadoop & Hadoop
Ecosystem:
Common: A set of
components and interfaces for distributed file systems and general I/O
(Serialization, Java
RPC, persistent data structures).
- HDFS: Distributed file system that runs on large clusters
of commodity machines
- Pig: Data flow language, explores very large
datasets.
- Hive: Distributed data warehouse
- HBase: Column-oriented database
- Zookeeper: Distributed, highly available coordination service.
- Sqoop: Moving data between relational databases and HDFS.
Map Reduce:
- Programming model for data
processing
- Map Reduce programs are
inherently parallel.
- What’s the highest recorded
global temperature for each year in the dataset?
- Performance baseline
Map Reduce works by
breaking the processing into two phases: the map phase and the reduce
phase. Each phase has key-value pairs as input and output, the types of
which may be chosen by the programmer. The programmer also specifies two
functions: the map function and the reduce function.
Choose text input format each
line can be a text values, & key be the offset.
Use case find max
temperature
Sample data:
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
Data presented to map
function (Key -Value pair)
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...) (106, 0043011990999991950051512004...9999999N9+00221+99999999999...) (212, 0043011990999991950051518004...9999999N9-00111+99999999999...) (318, 0043012650999991949032412004...0500001N9+01111+99999999999...) (424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
Map function output
The output from the map
function is processed by the Map Reduce framework before
Being sent to the reduce
function. =>Sorts & Groups [Combiner]
(1949, [111, 78])
(1950, [0, 22, −11])
Reducer:
All the reduce function
has to do now is iterate through the list and pick up the maximum reading:
(1949, 111) (1950, 22)
Java
Map Reduce Code:
3 things =>Map
function, Reduce function, some code to run job.
Creating project:
Create a new java
project
Add jar from: Hadoop
home & Hadoop Home/lib
C:\Hadoop\hadoop-1.1.0-SNAPSHOT
C:\Hadoop\hadoop-1.1.0-SNAPSHOT\lib
General procedure:
Set a job
Assign mapper class
Assign reducer class
CustomMapper class
extends Mapper (input key, input value, outputKey, outputValue)
Input key -- some key
Input value --Line
Output Key -- year
Output Value --MaxTemp
map function (inputKey,
inputValue)
Hadoop own data types:
These are found in the org.apache.hadoop.io
package.
LongWritable ==>Like Long
Text =>Like String
IntWritable ==>Like Integer
Setting
jar by class:
Hadoop will distribute
this file around the cluster, & locate the relevant jar.
job.setJarByClass(MaxTemperatureDriver.class);
Specify
input and output path:
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
Setting
mapper and reducers:
Hadoop send individual
jar to each
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
Waiting
for completion:
After setting the job,
hadoop will wait for completion of job. This is done by :
(job.waitForCompletion(true) ? 0:1)
Data
Flow:
Hadoop runs the job by
dividing it into tasks: map tasks and reduce tasks.
Job Execution process is
controlled by 2 types of nodes:
- A
job tracker : coordinates all the jobs run on the system by
scheduling tasks to run on tasktrackers.
- Task
trackers: run tasks and send progress
reports to the job tracker. Task fails, the job tracker can reschedule it
on a different task tracker.
Splits:
- Input
to Map reduce job are split into fixed pieces by Hadoop.
- Creates
one map task for each split.
- Many
splits, less time to process rather than whole input.
- Processing
splits in parallel will make it better load balanced.
- Faster
machine will be able to process it more proportionally.
- Splits
are too small managing the splits and of map task creation begins to
dominate the total job execution time.
- Good
split size tends to be the size of an HDFS block, 64MB by default.
Data locality
optimization:
- Run
the map task on a node where the input data resides in HDFS.
- Helps
reduce cluster b/w
- Splits
can be spanned over multiple blocks in HDFS.
Map tasks & Reduce
task:
- As
maps tasks are intermediate steps, those are stored in local disk instead
of HDFS because it is processed by reduced task to produce the final
output.
- The
input to the single reducer is normally the output from the all the
mappers.
- The
output of reduce are tasks are stored in HDFS.
- The
first replica stored at local node & other replicas are stored at off rack
node.
Combiner:
- To
minimize the data transferred between map and reduce tasks, Hadoop uses
combiners after mapper to combine multiple mappers output which is then
send to reducer.
- After
setting mapper class, set combiner if you need by and defined on Reducer
class :
job.setCombinerClass(MaxTemperatureReducer.class);
Reducer:
- Hadoop
Streaming uses UNIX standard streams as the interface between Hadoop and
your program.
- Streaming
is naturally suited for text processing
- Hadoop
Pipes: C++ interface to Hadoop Map Reduce
Configuration:
C:\hadoop\hadoop-1.1.0-SNAPSHOT\conf
hdfs-site.xml:
dfs.replication ⇒ This property set the replication factor for data.
dfs.webhdfs.enabled
Summary:
Active Hadoop node ||
Passive Hadoop (Secondary name node)
Name node server: is a Hadoop name node server that runs
20-35 Peta Bytes of data
large cluster can support.
Underlying file System
options [HDFS]:
You do not format the
hard drive with HDFS but use ext3, ext4 or XFS slave hard drive.
HDFS =>Abstract file
system
Ext4 recommendations for
hard drive format.
Commands:
hadoop fs -help
bin
/hadoop fs
-ls
-mkdir
-copyFromLocal
-copyToLocal
-moveToLocal
-rm
-tail
-chmod
-setrep -w 4 -R
/dir1/s-dir/ //Replication factor change to 4 recursively for this subdirectory
Yahoo
4500 node clusters:
110 Racks
Each rack 40 slaves
At top there is rack
switch for each rack
8 core switches
Every slaves machine has
2 Cat five cables going at top of rack switch, that means each to of rack
switch has 40*2 = 80 Ports in each rack.
Within rack its’ 1GB
network, However core switch layer is 10GB network, of which 8GB
is dedicated for
HDFS & rest 2GB for Map reduce administration, & user traffic on the
network.
Rack awareness:
Name node is rack aware.
Commands:
Hadoop
FS
What are the files at
root of HDFS?
haddop fs -ls /
dfs -ls /user/hduser/
==>works at both linux
& windows
Make
new dir:
mkdir
/user/clusdra/newDir
hadoop fs -ls copyFromLocaal shakesspear.txt
/user/username/newDir
Filesystem check
command:
% hadoop fsck /
-files -blocks
in windows: hadoop
fsck \ -files -blocks
-copyToLocal:
- hadoop
fs -copyToLocal hdfs://localhost/user/hadoop/test1.txt C:\Users\prems.bist\Desktop\test.txt
-copyFromLocal:
hadoop fs -copyFromLocal D:\tutorials\test.txt hdfs://localhost/user/hadoop/test1.txt
Listing
files inside the directory of hdfs:
hadoop fs -ls
hdfs://localhost/user/hadoop/yourFolder => Enlists all the files at
yourFolder dir.
can use the relative
path as well as:
hadoop fs -copyFromLocal
input/docs/quangle.txt /user/tom/quangle.txt
command
Examples in windows:
hadoop fs -mkdir books
//Creates directory book at C:\users\prems.bist but it
will not be seen thorough user
prems.bist
hadoop fs -ls
Example
2 -command :
hadoop fs -mkdir input
//makes
directory named input
hadoop fs -ls input
//It
will so nothing as there is no files
hadoop fs -put *.xml input
//Puts all the files starting with .xml
in input directory
output:
Found 1 items
drwxr-xr-x -
prems.bist supergroup 0
2014-03-13 13:09 /user/prems.bist/books
Column first -->filePermssion
Second column =>replication factory which is (-) thats
means not defined yet.
Third column : Owner
Fourth column : group
Fifth column: size of the file in byte 0 for dierctory
6th & seventh
column: Last modified date
& time
8th Column: Absolutely name of file or directory
Examples
3 - command Running jar through command line:
Create directory
hadoop dfs -mkdir hdfs:/inputFolder
Show directory
hadoop dfs -ls hdfs:/
Copy the files from
localSystem
hadoop dfs
-copyFromLocal D:\SampleData\input hdfs:/inputFolder
Run the Jar file
hadoop jar jarName
runningClassName inputFileLocation outputFileLocation
ex-
hadoop jar
D:\target\MapReduce-1.0-SNAPSHOT.jar com.impetus.hadoop.WordCount
hdfs:/wordCountFolder/sampleData hdfs:/outputFolder
Viewing file output:
hadoop dfs -cat
/outputFolder/part-00000
List
all the files in root directory of local filesystem ie( files of c drive):
hadoop fs -ls file:///
HDFS:
Distributed filesystems:
- Filesystems
that manage the storage across a network of machines.
Commodity hardware: Normal hardware devices
Streaming data access: Time to read all dataset is more important
than reading a first one.
Low-latency data access:
Lots of small files: Since the namenode holds filesystem metadata
in memory, the limit to the number of files in a filesystem is governed by the
amount of memory on the namenode.
Multiple writers,
arbitrary file modifications : Files in HDFS may be written to by a single writer
Block:
- Is
the minimum amount of data that it a disk can read or write.
- File
system blocks are typically a few kilobytes in size, while disk blocks are
normally 512 bytes.
- HDFS,
too, has the concept of a block, but it is a much larger unit—64 MB. Like
in a file system for a single disk, files in HDFS are broken into
block-sized chunks,
- Which
are stored as independent units. HDFS blocks are large compared to disk
blocks, and the reason is to minimize the cost of seeks.
- A
quick calculation shows that if the seek time is around 10 ms, and the
transfer rate is 100 MB/s, then to make the seek time 1% of the transfer
time, we need to make the block size around 100 MB.
- +
Fail over and Fencing:
The transition from the
active namenode to the standby is managed by a new entity in
The system called the
failover controller. The first implementation uses ZooKeeper to
ensure that only one name node is active. Failover may also be initiated
manually by an administrator, in the case of routine maintenance, for example.
This is known as a graceful failover, since the failover controller
Arranges an orderly
transition for both name nodes to switch roles. STONITH =>Shoot the
other node in the head.
Default HDFS port: 8020
org.apache.hadoop.fs.FileSystem
represents a file system
in Hadoop.
Accessing
HDFS over Http:
2 ways:
1) Using HDFS daemons
that server http request to clients
2) Using distributed
file system API’s (Using proxies)
Hadoop file systems:
Name
|
URI
scheme
|
Class
|
Description
|
Local
|
file
|
fs.LocalFileSystem
|
|
HDFS
|
hdfs
|
hdfs.DistributedFileSystem
|
|
HFTP
|
hftp
|
hdfs.HftpFileSystem
|
Provides read only
access to HDFS over http
|
HSFTP
|
hsftp
|
hdfs.HsftpFileSystem
|
Providing read-only
access to HDFS over HTTPS.
|
WebHDFS
|
webhdfs
|
hdfs.web.WebHdfsFile
System
|
Secured read write
access to http.
|
HAR
|
har
|
fs.HarFileSystem
|
Archiving
|
KFS
|
kfs
|
fs.kfs.KosmosFileSystem
|
CloudStore writtenn in
C++
|
FTP
|
ftp
|
fs.ftp.FTPFileSystem
|
FTP server backed file
system
|
S3
|
s3
|
Amazon backed
|
|
Distributed
RAID
|
hdfs
|
hdfs.DistributedRaidFi
leSystem
|
|
View
|
viewfs
|
viewfs.ViewFileSystem
|
client-side mount
table for other Hadoop filesystems
|
File System functions:
FilSystem dfs = FileSystem.get(config);
getWorkingDirectory =>
Returns working path
delete ⇒ To delete file.
createNewFile()
exists(src)
getDefaultBlockSize()
available() //returns
estimated number of byte remaining
//Create directory
hadoop dfs -mkdir
hdfs:/inputFolder
//Show directory
fs:/ //Copy the files
hadoop dfs -ls h
d from localSystem
mLocal
D:\SampleData\input hdfs:/inputFolder //Run the Jar file
hadoop dfs -copyFr
o
hadoop jar jarName
runningClassName inputFileLocation outputFileLocation
hdfs:/wordCountFolder/sampleData
hdfs:/outputFolder //output Can be seen as : hadoop dfs -cat
/outputFolder/part-00000
hadoop jar
D:\target\MapReduce-1.0-SNAPSHOT.jar com.impetus.hadoop.WordCount
References: Hadoop Defnitive Guide
Happy Learning
cheers.!!