Ap Educe Undamentals: Business
Ap Educe Undamentals: Business
WELCOME
“Secret Of the Business is to
know Something that nobody
else know”
Presented by
Siva Kumar Bhuchipalli
Contents
• Introduction
• History
• (Key, Value) Model
• MR Paradigm for WordCount
02-11-2016 www.hadooptutorial.info/ 2
Map Reduce Solution
MapReduce FrameWork
Split Data
Split Data
02-11-2016 www.hadooptutorial.info/ 7
Map Reduce Model
Imposes Key-Value Input/Output
Define Map and Reduce Functions
map: (K1,V1) list(K2,V2)
reduce: (K2,List(V2)) list(K3,V3)
02-11-2016 www.hadooptutorial.info/ 8
Map Reduce Paradigm( Word Count )
List(K2,V2) K2,List(V2)
K1,V1
Jack, 1 Bill(1,1) Bill,2
Jack Bill Joe Bill, 1 List(K3,V3)
Joe, 1
Don(1,1,1) Don,3 Bill,2
Jack Bill Joe Don, 1 Don,3
Don Don Joe Don Don Joe Don, 1 Jack,2
Jack Don Bill Joe, 1 Joe,2
Jack(1,1) Jack,2
Jack, 1
Jack Don Bill Don, 1
Bill, 1 Joe(1,1) Joe,2
02-11-2016 www.hadooptutorial.info/ 9
Map Reduce Working Flow
Input data is present in data nodes
Map tasks = Input Splits
Mappers produce intermediate data
Data is exchanged among nodes in “shuffling”
All data of same key goes to same reducer
Reducer output is stored at output location
Jobs are broken down into smaller chunks called tasks
These tasks are scheduled
Code is moved to where the data is. Shuffle and Sort barrier re-arranges and moves data between
machines
NM
Task Task
Client
Resource NM
Manager
Task Task
Client
02-11-2016 www.hadooptutorial.info/ 10
Map: (K1,V1)List(K2,V2)
Reduce: (K2,list(V2))list(K3,V3)
Mapper Map
Data Split
Task Output
Node #1
………… …
Reduce Reduce
Task Output
Node #X
Mapper Map
Data Split
Task Output
Node #N
02-11-2016 www.hadooptutorial.info/ 11
Map Reduce V1.0
Client
HDFS Map Reduce
Data Block
02-11-2016 www.hadooptutorial.info/ 12
Map Reduce Limitation
Job Tracker Overburdened - Spends significant amount of time and effort managing the life-
cycle of applications
MRv1(Only Map and Reduce Tasks).
Inflexible Resource Management(MapReduce1 had slot based model)
Cluster Resource sharing and allocation flexibility
Slot based approach (ex. 10 slots per machine no matter how small or big those
tasks are)
02-11-2016 www.hadooptutorial.info/ 14
Map Reduce 2.0 On YARN
Application Master
Active only during job execution
Runs on any one of the available Node Managers
Monitors the job tasks and their progression
02-11-2016 www.hadooptutorial.info/ 16
Daemons In MRv2
Resource History Server
Manager Name Node
Hbase Master
Run the job - with single Input file and multiple input files
02-11-2016 www.hadooptutorial.info/ 19
WordCountMapper.java
package test.mr.project;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(' ');
for(String word: words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
02-11-2016 www.hadooptutorial.info/ 20
WordCountReducer.java
package test.mr.project;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
02-11-2016 www.hadooptutorial.info/ 21
WordCountJob.java
package test.mr.project;
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
02-11-2016 www.hadooptutorial.info/ 22
YARN Job Anatomy
Client – submits Map Reduce Job
Resource Manager – controls the use of resources across the Hadoop cluster
Node Manager – runs on each node in the cluster; creates execution container,
monitors container’s usage
Map Reduce Application Master – Coordinates and manages Map Reduce Jobs;
negotiates with Resource Manager to schedule tasks; the tasks are started by Node
Manager(s)
HDFS – shares resources and jobs’ artifacts between YARN components
02-11-2016 www.hadooptutorial.info/ 23
Job Anatomy
2. Submits Job
MR 1. Run Job
Job Resource Manager
Program
3. Get Job Id
Client Machine
NodeManager NodeManager
9b Launch
6 Launch
Task JVM
MRAppMaster
HDFS YarnChild
11. Run
Red
Map
uce
02-11-2016 www.hadooptutorial.info/ 24
MRv2 On YARN Job Execution
Client submits MapReduce job by interacting with Job objects; Client runs in it’s own
JVM
Use org.apache.hadoop.mapreduce.Job class to configure the job
Submit the job to the cluster and wait for it to finish.
job.waitForCompletion(true)
The YARN protocol is activated when mapreduce.framework.name property in
mapred-site.xml is set to yarn
Job’s code interacts with Resource Manager to acquire application meta-data, such as
application id
Job’s code moves all the job related resources to HDFS to make them available for the
rest of the job
Resource Manager chooses a Node Manager with available resources and requests a
container for MRAppMaster
Node Manager allocates container for MRAppMaster; MRAppMaster will execute and
coordinate MapReduce job
02-11-2016 www.hadooptutorial.info/ 25
MRAppMaster grabs required resource from HDFS, such as Input Splits; these resources
were copied there in step 4
NodeManager creates YarnChild containers that will coordinate and run tasks
YarnChild acquires job resources from HDFS that will be required to execute Map and
Reduce tasks
02-11-2016 www.hadooptutorial.info/ 26
Map Reduce V2.0 Flow
Client
All Namespace edits
logged to share NFS Shared edit Read edit logs and applies to its
Storage; Single Writer HDFS logs
own namespace
YARN Next Generation
(Fencing) Map Reduce
Name Node
High Availability
App App
Container Container
Master Master
App App
Container Container
Master Master
02-11-2016 www.hadooptutorial.info/ 27
Failure In MRv2
Failures can occur in
Tasks
Application Master – MRAppMaster
Node Manager
Resource Manager
Task Failures
Most likely offender and easiest to handle
Task’s exceptions and JVM crashes are propagated to MRAppMaster
Attempt (not a task) is marked as ‘failed’
Hanging tasks will be noticed, killed
Attempt is marked as failed
Control via mapreduce.task.timeout property
Task is considered to be failed after 4 attempts
Set for map tasks via mapreduce.map.maxattempts
Set for reduce tasks via mapreduce.reduce.maxattempts 32
02-11-2016 www.hadooptutorial.info/ 28
Application Master Failures MRAppMaster
MRAppMaster Application can be re-tried
By default will not re-try and will fail after a single application failure
Enable re-try by increasing yarn.resourcemanager.am.max-retries property
Resource Manager receives heartbeats from MRAppMaster and can restart in case of
failure(s)
Restarted MRAppMaster can recover latest state of the tasks
Completed tasks will not need to be re-run
To enable set yarn.app.mapreduce.am.job.recovery.enable property to true
02-11-2016 www.hadooptutorial.info/ 29
Node Manager Failures
Failed Node Manager will not send heartbeat messages to Resource
Manager
Resource Manager will black list a Node Manager that hasn’t reported
within 10 minutes
Configure via property:
• yarn.resourcemanager.nm.liveness-
monitor.expiryinterval-ms
Usually there is no need to change this setting
Tasks on a failed Node Manager are recovered and placed on healthy Node
Managers
02-11-2016 www.hadooptutorial.info/ 30
Node Manager Blacklisted By MRAppMaster
MRAppMaster may blacklist Node Managers if the number of failures is high on
that node
MRAppMaster will attempt to reschedule tasks on a blacklisted Node Manager
onto Healthy Nodes
Blacklisting is per Application/Job therefore doesn’t affect other Jobs
By default blacklisting happens after 3 failures on the same node
Adjust default via mapreduce.job.maxtaskfailures.per.tracker
02-11-2016 www.hadooptutorial.info/ 31
Resource Manager Failure
02-11-2016 www.hadooptutorial.info/ 32
Speculative Execution
Will spawn a speculative task when
All the tasks have been started
Task has been running for an extended period of time over a minute
Did not make significant progress as compared to the rest of the
running tasks
After task’s completion duplicates are killed
Just an optimization
Can be turned off by setting these properties to false
mapred.map.tasks.speculative.execution
• Turn on/off speculative execution for map phase
mapred.reduce.tasks.speculative.execution
• Turn on/off speculative execution for reduce phase
When should I disable Speculative Execution?
Task is outputting directly to a shared resource; then starting a
duplicate task may cause unexpected results
Minimize cluster and bandwidth usage; duplicate tasks use up
resources
02-11-2016 www.hadooptutorial.info/ 33
Job Scheduling
02-11-2016 www.hadooptutorial.info/ 34
Map Reduce Old VS New JAVA API
There are two flavors of MapReduce API which became known as Old and New
Old API classes reside under
org.apache.hadoop.mapred
New API classes can be found under
org.apache.hadoop.mapreduce
org.apache.hadoop.mapreduce.lib
We will use new API exclusively
New API was re-designed for easier evolution
Early Hadoop versions deprecated old API but
Recently deprecation was removed
Do not mix new and old API
02-11-2016 www.hadooptutorial.info/ 35
Advanced Concepts Of MR
Combiners
Partitioners
Counters
Different input and output formats
Custom Data Types
Joins Impact
Map-side Join
Reduce-side Join
Distributed Cache
Analyse
Collect
02-11-2016 www.hadooptutorial.info/ 42
Combiner In MR
Combiners can be treated as "local-reducers" in the Mapper phase
Combiners are used to improve the performance of MR Job
Combiners can primarily use for decreasing the amount of data needed to be
processed by Reducers. In some cases, because of the nature of the algorithm
you implement, this function can be the same as the Reducer. But in some other
cases this function can of course be different.
One constraint that a Combiner will have, unlike a Reducer, is that the
input/output key and value types must match the output types of your Mapper.
Combiners can only be used on the functions that are commutative(a.b = b.a) and
associative {a.(b.c) = (a.b).c} . This also means that combiners may operate only
on a subset of your keys and values or may not execute at all, still you want the
output of the program to remain same.
Reducers can get data from multiple Mappers as part of the partitioning process.
Combiners can only get its input from one Mapper.
02-11-2016 www.hadooptutorial.info/ 43
Combiner Flow
B (B,1)
C (C,1) (B,2)
D (D,1) (C,1)
Mapper Combiner
E (E,1) (D,2)
D (D,1) (E,1) (A,[2]) (A,2)
B (B,1) (B,[2,1]) (B,3)
Reducer
(C,[1,1]) (C,2)
Shuffle
(D,[2,2]) (D,4)
(E,[1]) (E,1)
D (D,1)
A (A,1) (D,2)
A (A,1) (A,2)
Mapper Combiner
C (C,1) (C,1)
B (B,1) (B,1)
D (D,1)
02-11-2016 www.hadooptutorial.info/ 44
Running A Job Without Combiner
Map gets in from map in Another Map and map and map
Map Map
Node #1 Node #2
M1 F1 12 records A1 M1
G1 M1 Transferred M1 A1
I1 I1 A1 M1
Reducer Reducer
Node #3 Node #4
02-11-2016 www.hadooptutorial.info/ 45
Running A Job With Combiner
Map gets in from map in Another Map and map and map
Map Map
M1 A1 M1
F1
G1 M1 A1
M1
I1 A1 M1
I1
Combiner Combiner
Node #1 Node #2
M2 F1 06 records
G1 I2 M3 A3
Transferred
Reducer Reducer
Node #3 Node #4
02-11-2016 www.hadooptutorial.info/ 46
Partitioners
Partitioning phase takes place after the map phase and before the reduce phase.
The number of partitions is equal to the number of reducers. The data gets
partitioned across the reducers according to the partitioning function .
02-11-2016 www.hadooptutorial.info/ 47
Partitioner Flow
….. …..
Input (K,V) pairs Input (K,V) pairs
Intermediate Intermediate
(K,V) pairs (K,V) pairs
Combiner Combiner
Substitute
Substitute
Intermediate
Intermediate
(K,V) pairs
(K,V) pairs
Partitioner Partitioner
“Shuffling” Process
02-11-2016 www.hadooptutorial.info/ 49
Custom Partitioner
Public class CustomPartitioner extends Partitioner<Text, BlogWritable>
{
@override
Public Int getPartition(Text key, BlogWritable values, int numreduceTasks)
{
Int positiveHash = blog.getAuthor().hashCode() & Integer.MAX_VALUE;
All blogs with the same Author will end up in the same Reducer task
02-11-2016 www.hadooptutorial.info/ 50
Counters In MR
Counters are lightweight objects in MapReduce that allow you to keep track
of system progress in both the map and reduce stages of processing
Counters are used to gather information about the data we are analyzing, like
how many types of records were processed, how many invalid records were
found while running the job, and so on
02-11-2016 www.hadooptutorial.info/ 51
Framework provides a set of built-in metrics
For example bytes processed for input and output
User can create new counters
Number of records consumed
Number of errors or warnings
Counters are divided into groups
Tracks total, Mapper and Reducer counts
02-11-2016 www.hadooptutorial.info/ 52
Built In Counters In MR
Maintains and sums up counts
Several groups for built-in counters
Job Counters – documents number of map and reduce tasks
launched, number of failed tasks
File System Counters – number of bytes read and written
Map-Reduce Framework – mapper, reducer, combiner input and
output records counts, time and memory statistics
Web UI exposes counters for each Job
02-11-2016 www.hadooptutorial.info/ 53
Implemented Custom Counters In MR
Counters are lightweight objects in MapReduce that allow you to keep track of
system progress in both the map and reduce stages of processing
Counters are used to gather information about the data we are analyzing, like
how many types of records were processed, how many invalid records were
found while running the job, and so on
We can retrieve Counter from Context object
Framework injects Context object into map and reduce Methods
Increment Counter’s value
Can increment by 1 or more
We can get counter object with the help of Context object which is available in
map and reduce methods
void map(Key key, Value value, Context context)
void reduce(Key key, Iterable<Value> values, Context context)
We can Increment or even set the value of counter
void setValue(long value);
void increment(long incr);
02-11-2016 www.hadooptutorial.info/ 54
Example Usage Of Custom Counters In Map
Update Mapper to document counts for
– Total tokens processed
– Number of tokens that start with uppercase
– Number of tokens that start with lowercase
First create an enum to reference these counters:
public enum Tokens { Total, FirstCharUpper, FirstCharLower }
Protected void map (LongWritable key, Text Value, Context context )
Throws IOException, InterruptedException
{
StringTokenizer tokenizer = new StringTokenizer(value.tostring());
while(tokenizer.hasmoretoken())
{ Keep Count of total
String token = tokenizer.nextToken();
reusableText.set(token.substring(0,1));
Tokens processed
context.write(reusableText, countOne);
context.getCounter(Tokens.Total).increment(1);
Char firstChar = token.charAt(0);
If(Character.isUpperCase(firstChar))
{
context.getCounter(Tokens.FirstCharUpper).increment(1);
}
else
{
context.getCounter(Tokens.FirstCharLower).increment(1);
} Stats on tokens that start
} with Upper case vs Lower
}
02-11-2016 www.hadooptutorial.info/ 55
Example Usage Of Custom Counters In Map
$ yarn jar $PLAY_AREA/HadoopSamples.jar
mr.wordcount.StartsWithCountJob_UserCounters
/training/data/hamlet.txt /training/playArea/wordCount/
...
...
...
Map output records=34189
Custome Counters
Map output bytes=205134
Section
Combine input records=34189
Combine output records=69
Reduce input records=69
Reduce output records=69
mr.wordcount.StartsWithCountMapper_UserCounters$Tokens
FirstCharLower=26080
FirstCharUpper=8109
Total=34189 Stats on tokens that start with Upper
File Input Format Counters case vs Lower
Bytes Read=211294
File Output Format Counters
Bytes Written=385
02-11-2016 www.hadooptutorial.info/ 56
Example Usage Of Custom Counters In Map
We Can customize counter and group names when using enums
Create a properties file <classname>.properties defining counter name properties
Inner classes are substituted by underscore
For example: org.com.MyMapper$CustomEnum would be
MyMapper_CustomEnum.properties
Place properties file in the same package as the class that defines Enum
In our case the enum was defined in
– test.mr.project.WordCountMapper$Tokens
Therefore the file is to be named
– WordCountMapper_Tokens.properties
Define Group and Counter names:
CounterGroupName = Token Processed
Total.name=Total Tokens Processed
FirstCharUpper.name=Tokens start with Uppercase
FirstCharLower.name=Tokens start with Lowercase
02-11-2016 www.hadooptutorial.info/ 57
Different Input And Output Formats
What is input format
Specification for reading data
• Creates Input Splits
Breaks up work into chunks
• Specifies how to read each split
Divides splits into records
Provides an implementation of RecordReader
02-11-2016 www.hadooptutorial.info/ 59
Input Split
Splits are a set of logically arranged records
A set of lines in a file
A set of rows in a database table
Each instance of mapper will process a single split
Map instance processes one record at a time
• map(k,v) is called for each record
Splits are implemented by extending InputSplit class
Framework provides many options for InputSplit implementations
Hadoop’s FileSplit
HBase’s TableSplit
Don’t usually need to deal with splits directly
InputFormat’s responsibility
02-11-2016 www.hadooptutorial.info/ 60
Use Of Input Formats In MR
1 Generate Splits 3 RecordReader reads key-value pairs
2 Each Split gets its own RecordReader 4 For each pair map(key, value) is called
1 Get Splits
02-11-2016 www.hadooptutorial.info/ 61
Different Types Of Input Formats
Hadoop eco-system is packaged with many Input Formats
TextInputFormat
KeyValueTextInputFormat
NLineInputFormat
DBInputFormat
TableInputFormat (HBASE)
HCatInputFormat
SequenceFileInputFormat etc…
Configure on a job object
job.setInputFormatClass(XXXInputFormat.class)
02-11-2016 www.hadooptutorial.info/ 62
Text Input Format
02-11-2016 www.hadooptutorial.info/ 63
N – Line Input Format
02-11-2016 www.hadooptutorial.info/ 64
Table Input Format
02-11-2016 www.hadooptutorial.info/ 65
Output Format
Specification for writing data
The other side of InputFormat
Implementation of OutputFormat<K,V>
TextOutputFormat is the default implementation
output records as lines of text
Key and Values are tab seperated by “Key \t Values”
• can be configured via
“mapreduce.output.textoutputformat.Seperator” Property
Kay and Values may of any type – call .toString()
02-11-2016 www.hadooptutorial.info/ 66
Different Types Of Output Format
02-11-2016 www.hadooptutorial.info/ 67
Text Output Format
02-11-2016 www.hadooptutorial.info/ 68
Component Overview
Input Format Input Format
Combiner Combiner
Partitioner Partitioner
Reduce Data
Reduce
02-11-2016 www.hadooptutorial.info/ 69
Key And Value Types
Keys must implement WritableComparable interface
Extends Writable and java.lang.Comparable<T>
Required because keys are sorted prior reduce phase
02-11-2016 www.hadooptutorial.info/ 70
Custom Data Types
http://hadooptutorial.info/hadoop-data-types/
http://hadooptutorial.info/creating-custom-hadoop-writable-data-type/
02-11-2016 www.hadooptutorial.info/ 71
Implement Custom WritableComparable<T>
02-11-2016 www.hadooptutorial.info/ 72
BlogWritable – Implementation Of WritableComparable<T>
Public class BlogWritable implements WritableComaparable<BlogWritable>
{
Private string author;
Private string content;
Public BlogWritable()
{
}
Public BlogWritable(string author, string content)
{
this.author = author;
This.content = content;
}
Public String getAuthor()
{
return author;
}
Public String getContent()
{
return content;
}
………………
………………
02-11-2016 www.hadooptutorial.info/ 73
………………
………………
@override
Public void readFields(DataInput input) throws IOException
{
author = input.readUTF();
content = input.readUTF();
}
How the data is read
@override
Public void Write(DataOutput output) throws IOException
{
output.writeUTF(author);
output.writeUTF(content); How to write data
}
@override
Public int compareTo(BlogWritable other)
{
How to order
Return author.compareTo(other.author) BlogWritables
}
}
02-11-2016 www.hadooptutorial.info/ 74
Distributed Cache
We need some data and libraries on all the Nodes
In Our Driver:
DistributedCache.addCacheFile(
Map or
Reduce
New URI(“/Some/path/to/ourfile.txt), conf);
Task Local
Copy
Map or
Reduce
Task
HDFS
Distributed
cache
Map or
Reduce
Task Local
Copy In Our mapper or reducer:
Map or
Reduce @override
Task Public void setup(Context Context) throws IOException,
InterruptedException
{
Configuration conf = context.getConfiguration();
localFiles = DistributedCache.getlocalCacheFiles(Conf);
}
02-11-2016 www.hadooptutorial.info/ 75
Distributed Cache Working
Accepts two types: files and archives
Archives are unarchived on the local node
Items specified to the $yarn command via –files, -libjars and -archives are copied to HDFS
Supports
– Simple text files
– Jars
– Archives: zip, tar, tgz/tar.gz
Prior to task execution these files are copied locally from HDFS
Files now reside on a local disk – local cache
Files provided to the -libjars are appended to task’s CLASSPATH
Locally cached files become qualified to be deleted after all tasks utilizing cache complete
Files in the local cache are deleted after a 10GB threshold is reached
Allow space for new files
Configured via yarn.nodemanager.localizer.cache.targetsize - mb property
Files to cached must be exist in HDFS
Local Cache is stored under ${yarn.nodemanager.local-dirs}/usercache/$user/filecache
02-11-2016 www.hadooptutorial.info/ 76
JAVA API Distributed Cache
Adding cache file via methods on job
Create Symbolic links for all files in DistributedCache; without the links you
would have to use fully qualified path, in this case
“/training/data/startWithExcludeFile.txt”
return job.waitForCompletion(true) ? 0 : 1;
}
02-11-2016 www.hadooptutorial.info/ 77
JAVA API Distributed Cache
Retrieved File from Cache
If we set the distributed cache through Java API in driver program via
job.addCacheFile(“hdfs://namenode/actual/path#symbolic_link”), the file needs to
be present in HDFS
But if we use yarn jar command’s –files option then our files can be reside on LFS also
02-11-2016 www.hadooptutorial.info/ 79
JOINS
02-11-2016 www.hadooptutorial.info/ 80
02-11-2016 www.hadooptutorial.info/ 81
Joins
Two Types
Map Side Joins
Reduce Side Joins
02-11-2016 www.hadooptutorial.info/ 82
Miscellaneous Concept In MR
Record Reader : The InputSplit has defined a slice of work, but does not describe how to
access it. The RecordReader class actually loads the data from its source and converts it
into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is
defined by the Input Format.
Shuffle: After the first map tasks have completed, the nodes may still be performing
several more map tasks each. But they also begin exchanging the intermediate outputs
from the map tasks to where they are required by the reducers. This process of moving
map outputs to the reducers is known as shuffling.
Sort: Each reduce task is responsible for reducing the values associated with several
intermediate keys. The set of intermediate keys on a single node is automatically sorted
by Hadoop before they are presented to the Reducer.
Hadoop Streaming: Streaming is a generic API that allows programs written in virtually
any language to be used as Hadoop Mapper and Reducer implementations. Languages
supproted in Hadoop streaming are Perl, Ruby, Python, C, C#.
02-11-2016 www.hadooptutorial.info/ 83
USEFULL REFERENCE
http://hadoop.apache.org/docs/r2.4.1/
http://hadooptutorial.info/
02-11-2016 www.hadooptutorial.info/ 84
Questions Or Doubts
? !
02-11-2016 www.hadooptutorial.info/ 85
“Your Career is your Business
Its time for you to manage it as
a CEO”
THANK YOU