Deploying a JAR to Amazon EMR

I provisioned a simple EMR cluster and wanted to run my own WordCount on it. It took a few tries so here are the lessons learnt:

When you ‘Add Step’ to run a job on the new cluster, the key properties are as follows:

JAR location: s3n://poobar/wordcount.jar
Arguments: org.adrian.WordCount s3n://poobar/hamlet111.txt hdfs://10.32.43.156:9000/poobar/out

So my uberjar with the MapReduce job has been uploaded to S3, in a top level bucket called ‘poobar’.

There are three arguments to the job.

The first is the main class – this is always the first argument on EMR.
The following args get passed into the job’s public static void main(String[] args), i.e. the main method.
The job uses args[0] for the inputfile and args[1] for the output folder, which is fairly standard.
The inputfile has already been uploaded by me to S3, like the Jar itself.
The outputfolder has to be addressed using an HDFS protocol – a relative folder location seems to do the trick.

So with my setup, everything ends up in the ‘poobar’ bucket.

Advertisements

Python Streaming vs Clojure-Hadoop for MapReduce Implementations

You have a number of options these days for coding MR jobs.  Broadly, these fall into two categories –

  • Custom Jar
    • Hadoop allows programs packaged as Java JAR files to run as Mappers and Reducers
    • Any language which can be compiled into a JAR and can interop with Hadoop API can be used to implement MR jobs (such as Clojure, Scala, Groovy etc.)
  • Streaming
    • Create MR jobs in non-JVM languages, without no custom JAR, such as Python or C++.
    • These are programs which read from stdin and write to stdout, and are managed by Hadoop as Mappers and Reducers.

Java is not good at this.  The code is horrible.  I wanted to compare the syntax of a couple of alternatives: Python Streaming, and a Custom Jar written in Clojure.

1. Python Streaming

Wordcount is implemented here as two Python scripts passed to Hadoop’s streaming JAR:

Execute with:

hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming.jar \
-mapper "python $PWD/mapper.py" \
-reducer "python $PWD/reducer.py" \
-input  hdfs-input-file  \
-output hdfs-output-folder

MR implementation:

mapper.py:

import sys

for line in sys.stdin:
#--- output tuples [word, 1] in tab-delimited format to stdout---
  for word in line.split():
    print '%s\t%s' % (word, "1")

reducer.py

import sys

words = {}

# transform to map and count for each key
for line in sys.stdin:
 word, count = line.split('\t', 1)
 words[word] = words[word]+int(count)

# write the tuples to stdout
for word in words.keys():
 print '%s\t%s'% ( word, words[word] )

Note because the input to the reducer is just a sys.stdin, logic is required to transform the lines into a format that can be readily reduced – here, a map (‘words’). In custom Jars, this is done by the Hadoop framework itself.

One nice feature of streaming is that it is so easy to run this outside of hadoop using regular text files for the IO streams.

2. Pydoop

With pydoop, we can simplify this since we don’t deal directly with stdin/out streams:

def mapper(_, line, writer):
  for word in line.split():
    writer.emit(word, "1")

def reducer(word, icounts, writer):
  writer.emit(word, sum(map(int, icounts)))

Notice the reducer is simpler than in streaming, since icounts comes in the form [1 1 1 1 1 1 1] which needs no transformation in order to do reduce functions on it.

Execute with:

pydoop script wordcount.py hdfs-input-file hdfs-output-file

3. Clojure-Hadoop

This clojure library wraps the Hadoop Java API so Clojure-Java interop is hidden to the developer.  You can pass Clojure functions directly to Hadoop as Mappers and Reducers.

Execute with:

java -cp mycustomjar.jar clojure_hadoop.job \
-input hdfs-input-file \
-output  hdfs-output-folder \
-map clojure-hadoop.examples.wordcount/my-map \
-map-reader clojure-hadoop.wrap/int-string-map-reader \
-reduce clojure-hadoop.examples.wordcount/my-reduce \
-input-format text

MR implementation:

(ns clojure-hadoop.examples.wordcount
  (:import (java.util StringTokenizer)))

(defn my-map [key value]
  (map (fn [token] [token 1]) (enumeration-seq (StringTokenizer. value))))

(defn my-reduce [key values-fn]
  [[key (reduce + (values-fn))]])

my-map returns a vector [“word” 1] for each token in the parameter ‘value’
my-reduce returns a vector [“word” n] where n is the sum of occurrences, i.e. the wordcount

Note the entirety of clojure and this library must be packaged into an uberjar for this to run on Hadoop.