Python Streaming vs Clojure-Hadoop for MapReduce Implementations

You have a number of options these days for coding MR jobs.  Broadly, these fall into two categories –

  • Custom Jar
    • Hadoop allows programs packaged as Java JAR files to run as Mappers and Reducers
    • Any language which can be compiled into a JAR and can interop with Hadoop API can be used to implement MR jobs (such as Clojure, Scala, Groovy etc.)
  • Streaming
    • Create MR jobs in non-JVM languages, without no custom JAR, such as Python or C++.
    • These are programs which read from stdin and write to stdout, and are managed by Hadoop as Mappers and Reducers.

Java is not good at this.  The code is horrible.  I wanted to compare the syntax of a couple of alternatives: Python Streaming, and a Custom Jar written in Clojure.

1. Python Streaming

Wordcount is implemented here as two Python scripts passed to Hadoop’s streaming JAR:

Execute with:

hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming.jar \
-mapper "python $PWD/" \
-reducer "python $PWD/" \
-input  hdfs-input-file  \
-output hdfs-output-folder

MR implementation:

import sys

for line in sys.stdin:
#--- output tuples [word, 1] in tab-delimited format to stdout---
  for word in line.split():
    print '%s\t%s' % (word, "1")

import sys

words = {}

# transform to map and count for each key
for line in sys.stdin:
 word, count = line.split('\t', 1)
 words[word] = words[word]+int(count)

# write the tuples to stdout
for word in words.keys():
 print '%s\t%s'% ( word, words[word] )

Note because the input to the reducer is just a sys.stdin, logic is required to transform the lines into a format that can be readily reduced – here, a map (‘words’). In custom Jars, this is done by the Hadoop framework itself.

One nice feature of streaming is that it is so easy to run this outside of hadoop using regular text files for the IO streams.

2. Pydoop

With pydoop, we can simplify this since we don’t deal directly with stdin/out streams:

def mapper(_, line, writer):
  for word in line.split():
    writer.emit(word, "1")

def reducer(word, icounts, writer):
  writer.emit(word, sum(map(int, icounts)))

Notice the reducer is simpler than in streaming, since icounts comes in the form [1 1 1 1 1 1 1] which needs no transformation in order to do reduce functions on it.

Execute with:

pydoop script hdfs-input-file hdfs-output-file

3. Clojure-Hadoop

This clojure library wraps the Hadoop Java API so Clojure-Java interop is hidden to the developer.  You can pass Clojure functions directly to Hadoop as Mappers and Reducers.

Execute with:

java -cp mycustomjar.jar clojure_hadoop.job \
-input hdfs-input-file \
-output  hdfs-output-folder \
-map clojure-hadoop.examples.wordcount/my-map \
-map-reader clojure-hadoop.wrap/int-string-map-reader \
-reduce clojure-hadoop.examples.wordcount/my-reduce \
-input-format text

MR implementation:

(ns clojure-hadoop.examples.wordcount
  (:import (java.util StringTokenizer)))

(defn my-map [key value]
  (map (fn [token] [token 1]) (enumeration-seq (StringTokenizer. value))))

(defn my-reduce [key values-fn]
  [[key (reduce + (values-fn))]])

my-map returns a vector [“word” 1] for each token in the parameter ‘value’
my-reduce returns a vector [“word” n] where n is the sum of occurrences, i.e. the wordcount

Note the entirety of clojure and this library must be packaged into an uberjar for this to run on Hadoop.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s