Big Data Essentials¶

Hadoop MapReduce¶





Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site

Hadoop MapReduce¶

  • Google released a paper on MapReduce technology in December, 2004. This became the genesis of the Hadoop Processing Model.
  • MapReduce is a batch-based, distributed computing framework.
  • It allows you to parallelize work over a large amount of raw data.

Traditional way¶

Traditional way¶

  • Critical path problem: It is the amount of time taken to finish the job without delaying the next milestone or actual completion date. So, if, any of the machines delays the job, the whole work gets delayed.
  • Reliability problem: What if, any of the machines which is working with a part of data fails? The management of this failover becomes a challenge.
  • Equal split issue: How will I divide the data into smaller chunks so that each machine gets even part of data to work with. In other words, how to equally divide the data such that no individual machine is overloaded or under utilized.
  • Single split may fail: If any of the machine fails to provide the output, I will not be able to calculate the result. So, there should be a mechanism to ensure this fault tolerance capability of the system.
  • Aggregation of result: There should be a mechanism to aggregate the result generated by each of the machines to produce the final output.

MapReduce¶

MapReduce gives you the flexibility to write code logic without caring about the design issues of the system.

MapReduce¶

MapReduce is a programming framework that allows us to perform distributed and parallel processing on large data sets in a distributed environment.

  • MapReduce consists of two distinct tasks – Map and Reduce.
  • As the name MapReduce suggests, reducer phase takes place after mapper phase has been completed.
  • So, the first is the map job, where a block of data is read and processed to produce key-value pairs as intermediate outputs.
  • The output of a Mapper or map job (key-value pairs) is input to the Reducer.
  • The reducer receives the key-value pair from multiple map jobs.
  • Then, the reducer aggregates those intermediate data tuples (intermediate key-value pair) into a smaller set of tuples or key-value pairs which is the final output.

MapReduce example - word count¶

The input data consists of text documents, and we want to output the count of each unique word.

Map task¶

  1. Input:
    • Document 1: "Hello world! World of MapReduce."
    • Document 2: "MapReduce is powerful. Hello, Hadoop."
  2. Map Function:
    • For each document, the Map function tokenizes the text into words and emits key-value pairs where the key is the word and the value is 1.
    • Key-value pairs: ("Hello", 1), ("world", 1), ("World", 1), ("of", 1), ("MapReduce", 1), ("MapReduce", 1), ("is", 1), ("powerful", 1), ("Hello", 1), ("Hadoop", 1).

Shuffle and Sort:¶

  1. The intermediate key-value pairs are sorted and grouped by key for the shuffle and sort phase.

Reduce task¶

  1. Input:
    • Key: "Hello", Values: [1, 1]
    • Key: "Hadoop", Values: [1]
    • Key: "MapReduce", Values: [1, 1]
    • Key: "World", Values: [1]
    • Key: "is", Values: [1]
    • Key: "of", Values: [1]
    • Key: "powerful", Values: [1]
    • Key: "world", Values: [1]
  2. Reduce Function: for each key, the Reduce function sums up the values, giving the total count of each word, which is ("Hello", 2), ("Hadoop", 1), ("MapReduce", 2), ("World", 1), ("is", 1), ("of", 1), ("powerful", 1), ("world", 1).

MapReduce example - word count¶

MapReduce example - word count¶

  1. Divide the input in three splits as shown in the figure. This will distribute the work among all the map nodes.
  2. Tokenize the words in each of the mapper and give a hardcoded value (1) to each of the tokens or words. The rationale behind giving a hardcoded value equal to 1 is that every word, in itself, will occur once.
  3. Now, a list of key-value pair will be created where the key is nothing but the individual words and value is one. So, for the first line (Dear Bear River) we have 3 key-value pairs – Dear, 1; Bear, 1; River, 1. The mapping process remains the same on all the nodes.
  4. After mapper phase, a partition process takes place where sorting and shuffling happens so that all the tuples with the same key are sent to the corresponding reducer.
  5. After the sorting and shuffling phase, each reducer will have a unique key and a list of values corresponding to that very key. For example, Bear, [1,1]; Car, [1,1,1].., etc.
  6. Now, each Reducer counts the values which are present in that list of values. As shown in the figure, reducer gets a list of values which is [1,1] for the key Bear. Then, it counts the number of ones in the very list and gives the final output as – Bear, 2.
  7. Finally, all the output key/value pairs are then collected and written in the output file.

Advantages of MapReduce¶

  1. Parallel Processing: In MapReduce, we are dividing the job among multiple nodes and each node works with a part of the job simultaneously.

  2. Data Locality: Instead of moving data to the processing unit, we are moving processing unit to the data in the MapReduce Framework.

MapReduce example - word count¶

In [1]:
hadoop fs -rm -r /user/yanfei/output

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
    -input /user/yanfei/hadoop.ipynb \
    -output /user/yanfei/output \
    -mapper "/usr/bin/cat" \
    -reducer "/usr/bin/wc"
2023-11-07 13:37:07,241 INFO fs.TrashPolicyDefault: Moved: 'hdfs://master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com:9000/user/yanfei/output' to trash at: hdfs://master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com:9000/user/yanfei/.Trash/Current/user/yanfei/output
packageJobJar: [/tmp/hadoop-unjar3569133461550345997/] [] /tmp/streamjob1498758464425584878.jar tmpDir=null
2023-11-07 13:37:09,171 INFO client.RMProxy: Connecting to ResourceManager at master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com/192.168.0.4:8032
2023-11-07 13:37:09,305 INFO client.AHSProxy: Connecting to Application History server at master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com/192.168.0.4:10200
2023-11-07 13:37:09,330 INFO client.RMProxy: Connecting to ResourceManager at master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com/192.168.0.4:8032
2023-11-07 13:37:09,330 INFO client.AHSProxy: Connecting to Application History server at master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com/192.168.0.4:10200
2023-11-07 13:37:09,451 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /yarn/c-a637e78383e5fe3e/mapred/staging/yanfei/.staging/job_1697504306738_0018
2023-11-07 13:37:09,606 INFO mapred.FileInputFormat: Total input files to process : 1
2023-11-07 13:37:09,643 INFO mapreduce.JobSubmitter: number of splits:16
2023-11-07 13:37:09,715 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1697504306738_0018
2023-11-07 13:37:09,716 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-11-07 13:37:09,834 INFO conf.Configuration: found resource resource-types.xml at file:/etc/taihao-apps/hadoop-conf/resource-types.xml
2023-11-07 13:37:09,884 INFO impl.YarnClientImpl: Submitted application application_1697504306738_0018
2023-11-07 13:37:09,907 INFO mapreduce.Job: The url to track the job: http://master-1-1.c-a637e78383e5fe3e.cn-zhangjiakou.emr.aliyuncs.com:20888/proxy/application_1697504306738_0018/
2023-11-07 13:37:09,908 INFO mapreduce.Job: Running job: job_1697504306738_0018
2023-11-07 13:37:15,968 INFO mapreduce.Job: Job job_1697504306738_0018 running in uber mode : false
2023-11-07 13:37:15,969 INFO mapreduce.Job:  map 0% reduce 0%
2023-11-07 13:37:23,020 INFO mapreduce.Job:  map 100% reduce 0%
2023-11-07 13:37:28,038 INFO mapreduce.Job:  map 100% reduce 100%
2023-11-07 13:37:28,043 INFO mapreduce.Job: Job job_1697504306738_0018 completed successfully
2023-11-07 13:37:28,096 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=24980
		FILE: Number of bytes written=5766509
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=133200
		HDFS: Number of bytes written=175
		HDFS: Number of read operations=83
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=14
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=16
		Launched reduce tasks=7
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4252040
		Total time spent by all reduces in occupied slots (ms)=2219341
		Total time spent by all map tasks (ms)=81770
		Total time spent by all reduce tasks (ms)=21547
		Total vcore-milliseconds taken by all map tasks=81770
		Total vcore-milliseconds taken by all reduce tasks=21547
		Total megabyte-milliseconds taken by all map tasks=136065280
		Total megabyte-milliseconds taken by all reduce tasks=71018912
	Map-Reduce Framework
		Map input records=1559
		Map output records=1559
		Map output bytes=70982
		Map output materialized bytes=38341
		Input split bytes=2400
		Combine input records=0
		Combine output records=0
		Reduce input groups=788
		Reduce shuffle bytes=38341
		Reduce input records=1559
		Reduce output records=7
		Spilled Records=3118
		Shuffled Maps =112
		Failed Shuffles=0
		Merged Map outputs=112
		GC time elapsed (ms)=2970
		CPU time spent (ms)=27730
		Physical memory (bytes) snapshot=12230250496
		Virtual memory (bytes) snapshot=91465084928
		Total committed heap usage (bytes)=28149022720
		Peak Map Physical memory (bytes)=593018880
		Peak Map Virtual memory (bytes)=3542560768
		Peak Reduce Physical memory (bytes)=423645184
		Peak Reduce Virtual memory (bytes)=4992397312
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=130800
	File Output Format Counters 
		Bytes Written=175
2023-11-07 13:37:28,096 INFO streaming.StreamJob: Output directory: /user/yanfei/output
In [3]:
hadoop fs -ls /user/yanfei/output
Found 8 items
-rw-r-----   2 yanfei hadoop          0 2023-10-31 19:23 /user/yanfei/output/_SUCCESS
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00000
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00001
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00002
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00003
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00004
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00005
-rw-r-----   2 yanfei hadoop         25 2023-10-31 19:23 /user/yanfei/output/part-00006
In [2]:
hadoop fs -cat /user/yanfei/output/*
    264    1121    9819	
    271     908    8650	
    258    1396   12967	
    163     999    8517	
    251    1318   11857	
    174    1197    9691	
    178    1091    9418	

MapReduce example - word count¶

In [20]:
hadoop fs -rm -r /user/yanfei/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
    -input /user/yanfei/hadoop.ipynb \
    -output /user/yanfei/output \
    -mapper "/usr/bin/cat" \
    -reducer "/usr/bin/wc" \
    -numReduceTasks 1
21/12/03 12:02:57 INFO fs.TrashPolicyDefault: Moved: 'hdfs://emr-header-1.cluster-49012:9000/user/yanfei/output' to trash at: hdfs://emr-header-1.cluster-49012:9000/user/yanfei/.Trash/Current/user/yanfei/output
packageJobJar: [/tmp/hadoop-unjar4915498231992068313/] [] /tmp/streamjob2060219075808971674.jar tmpDir=null
21/12/03 12:02:59 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-49012/192.168.0.3:8032
21/12/03 12:02:59 INFO client.AHSProxy: Connecting to Application History server at emr-header-1.cluster-49012/192.168.0.3:10200
21/12/03 12:02:59 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-49012/192.168.0.3:8032
21/12/03 12:02:59 INFO client.AHSProxy: Connecting to Application History server at emr-header-1.cluster-49012/192.168.0.3:10200
21/12/03 12:02:59 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/yanfei/.staging/job_1637546690590_0004
21/12/03 12:02:59 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
21/12/03 12:03:00 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
21/12/03 12:03:00 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
21/12/03 12:03:00 INFO mapred.FileInputFormat: Total input files to process : 1
21/12/03 12:03:00 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
21/12/03 12:03:00 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
21/12/03 12:03:00 INFO mapreduce.JobSubmitter: number of splits:16
21/12/03 12:03:00 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
21/12/03 12:03:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1637546690590_0004
21/12/03 12:03:00 INFO mapreduce.JobSubmitter: Executing with tokens: []
21/12/03 12:03:00 INFO conf.Configuration: resource-types.xml not found
21/12/03 12:03:00 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/12/03 12:03:00 INFO impl.YarnClientImpl: Submitted application application_1637546690590_0004
21/12/03 12:03:00 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-49012:20888/proxy/application_1637546690590_0004/
21/12/03 12:03:00 INFO mapreduce.Job: Running job: job_1637546690590_0004
21/12/03 12:03:05 INFO mapreduce.Job: Job job_1637546690590_0004 running in uber mode : false
21/12/03 12:03:05 INFO mapreduce.Job:  map 0% reduce 0%
21/12/03 12:03:09 INFO mapreduce.Job:  map 13% reduce 0%
21/12/03 12:03:10 INFO mapreduce.Job:  map 100% reduce 0%
21/12/03 12:03:12 INFO mapreduce.Job:  map 100% reduce 100%
21/12/03 12:03:12 INFO mapreduce.Job: Job job_1637546690590_0004 completed successfully
21/12/03 12:03:12 INFO mapreduce.Job: Counters: 55
	File System Counters
		FILE: Number of bytes read=16539
		FILE: Number of bytes written=4237750
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=132018
		HDFS: Number of bytes written=25
		HDFS: Number of read operations=53
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Killed map tasks=1
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4745661
		Total time spent by all reduces in occupied slots (ms)=189727
		Total time spent by all map tasks (ms)=41997
		Total time spent by all reduce tasks (ms)=1679
		Total vcore-milliseconds taken by all map tasks=41997
		Total vcore-milliseconds taken by all reduce tasks=1679
		Total megabyte-milliseconds taken by all map tasks=151861152
		Total megabyte-milliseconds taken by all reduce tasks=6071264
	Map-Reduce Framework
		Map input records=1535
		Map output records=1535
		Map output bytes=70324
		Map output materialized bytes=25405
		Input split bytes=1840
		Combine input records=0
		Combine output records=0
		Reduce input groups=799
		Reduce shuffle bytes=25405
		Reduce input records=1535
		Reduce output records=1
		Spilled Records=3070
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=1050
		CPU time spent (ms)=11930
		Physical memory (bytes) snapshot=8222433280
		Virtual memory (bytes) snapshot=85753057280
		Total committed heap usage (bytes)=12587630592
		Peak Map Physical memory (bytes)=495804416
		Peak Map Virtual memory (bytes)=5044715520
		Peak Reduce Physical memory (bytes)=303345664
		Peak Reduce Virtual memory (bytes)=5058519040
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=130178
	File Output Format Counters 
		Bytes Written=25
21/12/03 12:03:12 INFO streaming.StreamJob: Output directory: /user/yanfei/output
In [21]:
hadoop fs -cat /user/yanfei/output/*
21/12/03 12:03:20 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
   1535    7774   70273	

Further readings¶

MapReduce tutorial.