Hadoop is written in JAVA, but allows you to write map/reduce code in any language you want using the Hadoop Streaming interface.
We have looked at Streaming with Unix commands. In this lecture, you will see how to stream with scripts.
Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.
You could use different language in mapper and reduce functions.
It suits for text processing (e.g. read every line from a big CSV file).
It can also handle binary streams (e.g. read image as input).
cat ***.txt | mapper | sort | reducer > output
hadoop fs -rm -r /user/yanfei/output hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \ -input /user/yanfei/fruits \ -output /user/yanfei/output \ -mapper "/usr/bin/cat" \ -reducer "/usr/bin/wc"
We can use any executable script that processes a line-oriented data stream from
STDIN and outputs to
STDOUT with Hadoop Streaming.
Now let's use patent data from the National Bureau of Economic Research (NBER) at http://www.nber.org/patents/. The data sets were originally compiled for the paper “The NBER Patent Citation Data File: Lessons, Insights and Methodological Tools.” We use the patent description data set apat63_99.txt, and find maximum of an attribute.
|GDATE||Grant date, given as the number of days elapsed since January 1, 1960|
|APPYEAR||Application year (available only for patents granted since 1967)|
|COUNTRY||Country of first inventor|
|POSTATE||State of first inventory (if country is U.S.)|
|CLAIMS||Number of claims (available only for patents granted since 1975)|
|NCLASS||3-digit main patent class|
#!/usr/bin/env python3.8 import sys index = int(sys.argv) max = 0 for line in sys.stdin: fields = line.strip().split(",") if fields[index].isdigit(): val = int(fields[index]) if (val > max): max = val print(max)
#! /usr/bin/sh hadoop fs -rm -r /user/yanfei/output/ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \ -input /user/yanfei/apat63_99.txt \ -output /user/yanfei/output \ -file mapper.py \ -mapper "python3.8 mapper.py 8" \
Now let's consider a wordcount example.
With Hadoop Streaming, we need to write a program that acts as the mapper and a program that acts as the reducer. These applications must interface with input/output streams in such a way equivalent to the following series of pipes:
$ head -n100 code/L8/wordcount-py/streaming.ipynb | ./mapper.py | sort | ./reducer.py > output.txt
#!/usr/bin/env python3.8 import sys for line in sys.stdin: line = line.strip() keys = line.split() for key in keys: value = 1 print( "%s\t%d" % (key, value) )
A lot happens between the map and reduce steps that is largely transparent to the developer. In brief, the output of the mappers is transformed and distributed to the reducers (termed the shuffle step) in such a way that
#!/usr/bin/env python3.8 import sys last_key = None running_total = 0 for input_line in sys.stdin: input_line = input_line.strip() this_key, value = input_line.split("\t", 1) value = int(value) if last_key == this_key: running_total += value else: if last_key: print( "%s\t%d" % (last_key, running_total) ) running_total = value last_key = this_key if last_key == this_key: print( "%s\t%d" % (last_key, running_total) )
NOTE Before submitting the Hadoop job, you should make sure your mapper and reducer scripts actually work. This is just a matter of running them through pipes on a little bit of sample data:
head -n100 streaming.ipynb | ./mapper.py | sort | ./reducer.py
hadoop fs -rm -r /user/yanfei/output hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \ -input /user/yanfei/hadoop.ipynb \ -output /user/yanfei/output \ -file mapper.py reducer.py -mapper "python3.8 mapper.py" \ -reducer "python3.8 reducer.py"
#!/usr/bin/env python3.8 import sys for line in sys.stdin: fields = line.split(",") if (fields and fields.isdigit()): print(fields[1:-1] + "\t" + fields)
#!/usr/bin/env python3.8 import sys (last_key, sum, count) = (None, 0.0, 0) for line in sys.stdin: (key, val) = line.split("\t") if last_key and last_key != key: print(str(last_key) + "\t" + str(sum / count)) (sum, count) = (0.0, 0) last_key = key sum += float(val) count += 1 print(str(last_key) + "\t" + str(sum / count))