Big Data Essentials

L8: Hadoop Streaming





Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site

Hadoop streaming with Python

  • Hadoop is written in JAVA, but allows you to write map/reduce code in any language you want using the Hadoop Streaming interface.

  • We have looked at Streaming with Unix commands. In this lecture, you will see how to stream with scripts.

  • Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.

    • You could use different language in mapper and reduce functions.

    • It suits for text processing (e.g. read every line from a big CSV file).

    • It can also handle binary streams (e.g. read image as input).

Overall data flow in Hadoop streaming

  • Like a pipe where data streams through the mapper, the output of which is sorted and streamed through the reducer. In pseudo-code using Unix’s command line notation:

cat ***.txt | mapper | sort | reducer > output

Best Practice with Hadoop Streaming

  • Write your Hadoop commnad in a Bash file instead run it directly on Linux Shell.
In [1]:
cat code/L8/wordcount-sh/main.sh
hadoop fs -rm -r /user/yanfei/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \
    -input /user/yanfei/fruits \
    -output /user/yanfei/output \
    -mapper "/usr/bin/cat" \
    -reducer "/usr/bin/wc" 

Streaming with Unix commands

Streaming with scripts

  • We can use any executable script that processes a line-oriented data stream from STDIN and outputs to STDOUT with Hadoop Streaming.

  • Now let's use patent data from the National Bureau of Economic Research (NBER) at http://www.nber.org/patents/. The data sets were originally compiled for the paper “The NBER Patent Citation Data File: Lessons, Insights and Methodological Tools.” We use the patent description data set apat63_99.txt, and find maximum of an attribute.

Variable Description
PATENT Patent number
GPYEAR Grant year
GDATE Grant date, given as the number of days elapsed since January 1, 1960
APPYEAR Application year (available only for patents granted since 1967)
COUNTRY Country of first inventor
POSTATE State of first inventory (if country is U.S.)
CLAIMS Number of claims (available only for patents granted since 1975)
NCLASS 3-digit main patent class

The mapper

In [11]:
cat code/L8/patent-max/mapper.py
#!/usr/bin/env python3.8

import sys

index = int(sys.argv[1])
max   = 0
for line in sys.stdin:
    fields = line.strip().split(",")
    if fields[index].isdigit():
        val = int(fields[index])
        if (val > max):
            max = val
print(max)
In [3]:
cat code/L8/patent-max/main.sh
#! /usr/bin/sh

hadoop fs -rm -r /user/yanfei/output/

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \
    -input /user/yanfei/apat63_99.txt \
    -output /user/yanfei/output \
    -file mapper.py \
    -mapper "python3.8 mapper.py 8" \

Exercise

Write a reducer that outputs the maximum over the values output by the mappers.

Streaming with key/value pairs

  • Working with key/value pairs allows us to take advantage of the key-based shuffling and sorting to create interesting data analyses.

Streaming with key/value pairs

Now let's consider a wordcount example.

  • the map step will take the raw text of our document (I will use our lecture notes) and convert it to key/value pairs. Each key is a word, and all keys (words) will have a value of 1.
  • the reduce step will combine all duplicate keys by adding up their values. Since every key (word) has a value of 1, this will reduce our output to a list of unique keys, each with a value corresponding to that key’s (word’s) count.

With Hadoop Streaming, we need to write a program that acts as the mapper and a program that acts as the reducer. These applications must interface with input/output streams in such a way equivalent to the following series of pipes:

$ head -n100 code/L8/wordcount-py/streaming.ipynb | ./mapper.py | sort | ./reducer.py > output.txt

The mapper

In [4]:
cat code/L8/wordcount-py/mapper.py
#!/usr/bin/env python3.8

import sys

for line in sys.stdin:
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1
        print( "%s\t%d" % (key, value) )

The shuffle

A lot happens between the map and reduce steps that is largely transparent to the developer. In brief, the output of the mappers is transformed and distributed to the reducers (termed the shuffle step) in such a way that

  • All key/value pairs are sorted before being presented to the reducer function
  • All key/value pairs sharing the same key are sent to the same reducer

The reducer

In [5]:
cat code/L8/wordcount-py/reducer.py
#!/usr/bin/env python3.8

import sys

last_key = None
running_total = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("\t", 1)
    value = int(value)
    
    if last_key == this_key:
        running_total += value
    else:
        if last_key:
            print( "%s\t%d" % (last_key, running_total) )
        running_total = value
        last_key = this_key

if last_key == this_key:
    print( "%s\t%d" % (last_key, running_total) )

Running the hadoop job

NOTE Before submitting the Hadoop job, you should make sure your mapper and reducer scripts actually work. This is just a matter of running them through pipes on a little bit of sample data: head -n100 streaming.ipynb | ./mapper.py | sort | ./reducer.py

In [6]:
cat code/L8/wordcount-py/main.sh
hadoop fs -rm -r /user/yanfei/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \
    -input /user/yanfei/hadoop.ipynb \
    -output /user/yanfei/output \
    -file mapper.py reducer.py
    -mapper "python3.8 mapper.py" \
    -reducer "python3.8 reducer.py"

Streaming with key/value pairs

Now let us find the average number of claims for each country.

The mapper

In [9]:
cat code/L8/patent-avg/mapper.py
#!/usr/bin/env python3.8

import sys

for line in sys.stdin:
    fields = line.split(",")
    if (fields[9] and fields[9].isdigit()):
        print(fields[5][1:-1] + "\t" + fields[9])

Exercise

  1. Run the hadoop job.
  2. Look at your output without the reducer.

The reducer

In [10]:
cat code/L8/patent-avg/reducer.py
#!/usr/bin/env python3.8

import sys

(last_key, sum, count) = (None, 0.0, 0)
for line in sys.stdin:
    (key, val) = line.split("\t")
    if last_key and last_key != key:
        print(str(last_key) + "\t" + str(sum / count))
        (sum, count) = (0.0, 0)
    last_key = key
    sum   += float(val)
    count += 1
print(str(last_key) + "\t" + str(sum / count))

Lab 5

Further readings