Hadoop Streaming

Hadoop streaming with Python

Hadoop is written in JAVA, but allows you to write map/reduce code in any language you want using the Hadoop Streaming interface.

We have looked at Streaming with Unix commands. In this lecture, you will see how to stream with scripts.

Streaming with scripts

Now let's use patent data from the National Bureau of Economic Research (NBER) at http://www.nber.org/patents/. The data sets were originally compiled for the paper “The NBER Patent Citation Data File: Lessons, Insights and Methodological Tools.” We use the citation data set cite75_99.txt and the patent description data set apat63_99.txt.

Find maximum of an attribute

Mapper:

#!/usr/bin/env python3

import sys

index = int(sys.argv[1]) max = 0 for line in sys.stdin: fields = line.strip().split(",") if fields[index].isdigit(): val = int(fields[index]) if (val > max): max = val print(max)

Find maximum of an attribute

sh:

#! /usr/bin/sh

hadoop fs -rm -r /user/yanfeikang/output/

hadoop jar /opt/apps/ecm/service/hadoop/2.8.5-1.2.0/package/hadoop-2.8.5-1.2.0/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar \ -input /user/yanfeikang/apat63_99.txt \ -output /user/yanfeikang/output \ -file findmax.py \ -mapper "python3 findmax.py 8" \

Lab

Write a reducer that outputs the maximum over the values output by the mappers.

Streaming with key/value pairs

Now let's consider a wordcount example.

  • the map step will take the raw text of our document (I will use Herman Melville’s classic, Moby Dick) and convert it to key/value pairs. Each key is a word, and all keys (words) will have a value of 1.
  • the reduce step will combine all duplicate keys by adding up their values. Since every key (word) has a value of 1, this will reduce our output to a list of unique keys, each with a value corresponding to that key’s (word’s) count.

With Hadoop Streaming, we need to write a program that acts as the mapper and a program that acts as the reducer. These applications must interface with input/output streams in such a way equivalent to the following series of pipes:

$ cat input.txt | ./mapper.py | sort | ./reducer.py > output.txt

The mapper

#!/usr/bin/env python3

import sys

for line in sys.stdin: line = line.strip() keys = line.split() for key in keys: value = 1 print( "%s\t%d" % (key, value) )

The shuffle

A lot happens between the map and reduce steps that is largely transparent to the developer. In brief, the output of the mappers is transformed and distributed to the reducers (termed the shuffle step) in such a way that

  • All key/value pairs are sorted before being presented to the reducer function
  • All key/value pairs sharing the same key are sent to the same reducer

The reducer

#!/usr/bin/env python3

import sys

last_key = None running_total = 0

for input_line in sys.stdin: input_line = input_line.strip() this_key, value = input_line.split("\t", 1) value = int(value)

if last_key == this_key: running_total += value else: if last_key: print( "%s\t%d" % (last_key, running_total) ) running_total = value last_key = this_key

if last_key == this_key: print( "%s\t%d" % (last_key, running_total) )

Running the hadoop job

hadoop jar /opt/apps/ecm/service/hadoop/2.8.5-1.2.0/package/hadoop-2.8.5-1.2.0/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar \ -input /user/yanfeikang/license.txt \ -output /user/yanfeikang/output \ -file mapper.py reducer.py -mapper "python3 mapper.py" \ -reducer "python3 reducer.py"

NOTE Before submitting the Hadoop job, you should make sure your mapper and reducer scripts actually work. This is just a matter of running them through pipes on a little bit of sample data: head -n100 license.txt | ./mapper.py | sort | ./reducer.py

Streaming with key/value pairs

Now let us use the patent data apat63_99.txt and find the average number of claims for each country.

The mapper

#!/usr/bin/env python3

import sys

for line in sys.stdin: fields = line.split(",") if (fields[9] and fields[9].isdigit()): print(fields[5][1:-1] + "\t" + fields[9])

The reducer

#!/usr/bin/env python3 import sys (last_key, sum, count) = (None, 0.0, 0) for line in sys.stdin: (key, val) = line.split("\t") if last_key and last_key != key: print(str(last_key) + "\t" + str(sum / count)) (sum, count) = (0.0, 0) last_key = key sum += float(val) count += 1 print(str(last_key) + "\t" + str(sum / count))

Lab

  1. Run the hadoop job.
  2. Remove the reducer and look at your output.