Clojure Multimethods by Example

Essentially, these are similar to switch/case logic or, perhaps more similar to a strategy pattern.  defmulti will select a defmethod, to which it will dispatch your call.

From the popular clojure-koans, we have this example:

(defmulti diet (fn [x] (:eater x)))
(defmethod diet :herbivore [a] (str (:name a) " eats veggies."))
(defmethod diet :carnivore [a] (str (:name a) " eats animals"))
(defmethod diet :default [a] (str "I don't know what " (:name a) " eats."))

The defmulti ‘diet’ will select the appropriate defmethod from among the three options by evaluating (fn [x] (:eater x)) for x.

We can see from  (:eater x) that x is supposed to be a map, with an :eater key.  The value for that key will determine which defmethod to call.

So if x contains a value of :herbivore for :eater, then when we call diet, (str (:name a) ” eats veggies.”) will be evaluated.

(diet {:name "Bambi"  :eater :herbivore})
>= "Bambi eats veggies."

Likewise if x contains a value of :carnivore for :eater, then when we call diet,  (str (:name a) ” eats animals.”) will be evaluated.

(diet {:name "Simba" :eater :carnivore})
>= "Simba eats animals."

And the default defmethod will get executed when there is no other match.

That is all.

Python Streaming vs Clojure-Hadoop for MapReduce Implementations

You have a number of options these days for coding MR jobs.  Broadly, these fall into two categories –

  • Custom Jar
    • Hadoop allows programs packaged as Java JAR files to run as Mappers and Reducers
    • Any language which can be compiled into a JAR and can interop with Hadoop API can be used to implement MR jobs (such as Clojure, Scala, Groovy etc.)
  • Streaming
    • Create MR jobs in non-JVM languages, without no custom JAR, such as Python or C++.
    • These are programs which read from stdin and write to stdout, and are managed by Hadoop as Mappers and Reducers.

Java is not good at this.  The code is horrible.  I wanted to compare the syntax of a couple of alternatives: Python Streaming, and a Custom Jar written in Clojure.

1. Python Streaming

Wordcount is implemented here as two Python scripts passed to Hadoop’s streaming JAR:

Execute with:

hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming.jar \
-mapper "python $PWD/" \
-reducer "python $PWD/" \
-input  hdfs-input-file  \
-output hdfs-output-folder

MR implementation:

import sys

for line in sys.stdin:
#--- output tuples [word, 1] in tab-delimited format to stdout---
  for word in line.split():
    print '%s\t%s' % (word, "1")

import sys

words = {}

# transform to map and count for each key
for line in sys.stdin:
 word, count = line.split('\t', 1)
 words[word] = words[word]+int(count)

# write the tuples to stdout
for word in words.keys():
 print '%s\t%s'% ( word, words[word] )

Note because the input to the reducer is just a sys.stdin, logic is required to transform the lines into a format that can be readily reduced – here, a map (‘words’). In custom Jars, this is done by the Hadoop framework itself.

One nice feature of streaming is that it is so easy to run this outside of hadoop using regular text files for the IO streams.

2. Pydoop

With pydoop, we can simplify this since we don’t deal directly with stdin/out streams:

def mapper(_, line, writer):
  for word in line.split():
    writer.emit(word, "1")

def reducer(word, icounts, writer):
  writer.emit(word, sum(map(int, icounts)))

Notice the reducer is simpler than in streaming, since icounts comes in the form [1 1 1 1 1 1 1] which needs no transformation in order to do reduce functions on it.

Execute with:

pydoop script hdfs-input-file hdfs-output-file

3. Clojure-Hadoop

This clojure library wraps the Hadoop Java API so Clojure-Java interop is hidden to the developer.  You can pass Clojure functions directly to Hadoop as Mappers and Reducers.

Execute with:

java -cp mycustomjar.jar clojure_hadoop.job \
-input hdfs-input-file \
-output  hdfs-output-folder \
-map clojure-hadoop.examples.wordcount/my-map \
-map-reader clojure-hadoop.wrap/int-string-map-reader \
-reduce clojure-hadoop.examples.wordcount/my-reduce \
-input-format text

MR implementation:

(ns clojure-hadoop.examples.wordcount
  (:import (java.util StringTokenizer)))

(defn my-map [key value]
  (map (fn [token] [token 1]) (enumeration-seq (StringTokenizer. value))))

(defn my-reduce [key values-fn]
  [[key (reduce + (values-fn))]])

my-map returns a vector [“word” 1] for each token in the parameter ‘value’
my-reduce returns a vector [“word” n] where n is the sum of occurrences, i.e. the wordcount

Note the entirety of clojure and this library must be packaged into an uberjar for this to run on Hadoop.

Installing Leiningen from a proxy – on Windows!

A quick note on how to do this, as I had a few issues.

Leiningen comes as a unix shell or windows batch script with some functionality embedded in it to download and install dependencies.  Actually, this is just a single uber-jar these guys have created containing clojure.  Download the script from:

Put it somewhere sensible.  Add the folder containing it to your windows PATH.

Create LEIN_JAR as an environment variable and set it to a local foldername, somewhere where you will put this jar.

Now both the shell and batch scripts use wget, the unix command for HTTP requests.  So I installed cygwin to support this.  This is where the proxy provides a few problems.  Cygwin needs to know about it.

You need your proxy hostname and port number.

Create two files in your cygwin home folder, under the cygwin install, e.g. C:\apps\cygwin\home\<username>


export HTTP_CLIENT="wget --no-check-certificate -O" # 
export http_proxy=$proxy
export https_proxy=$proxy
export HTTP_PROXY=$proxy
export HTTPS_PROXY=$proxy



Now start a cygwin shell (i.e. ‘bash’ from a commandline will do) and do lein self-install and all should be well.