Option and getOrElse in Scala

An Option can be thought of as a container (or proxy) for an object to prevent null pointer exceptions.

The paradigmatic case is returning an Option type from function calls, instantiated by either a Some or None subclass:

def toInt(in: String): Option[Int] = {
    try {
        Some(Integer.parseInt(in.trim))
    } catch {
        case e: NumberFormatException =>; None
    }
}

Some or None can be matched to functions by consumers:

toInt(someString) match {
    case Some(i) =>; println(i)
    case None =>; println("That didn't work.")
}

Say we have this type:

case class Person(age: Int)

In the case where it has a value:

val person = Option(Person(100))   > subclass Some
val age = person.map(_.age +5)     > Some(105)
age.getOrElse(0)                   > 105

In the case where it has no value:

val person = Option[Person] = Option(null)          > None
val age = person.map(_.age +5)                      > None
age.getOrElse(0)                                    > 0

You can see how getOrElse specifically requires the developer to specify the action for the null case.

Refactoring – towards a better definition

According to SAFe, refactoring is

“modifying an entity, such as a module, method or program in such a way that it’s external appearance, i.e. its functionality, remains unchanged.”

Refactoring, the author says, may increase speed or address security concerns. According to the preface of Martin Fowler’s book ‘Refactoring’, refactoring is

“changing a software system in such a way that is does not alter the external behaviour of the code yet improves the internal structure. It is a disciplined way to clean up code that minimizes the chances of introducing bugs. In essence, when you refactor you are improving the design of the code after it has been written.”

Both agree that refactoring leaves the system ‘externally’ unchanged. Both agree it makes no sense to say, as I have heard developers say in the past, ‘I want to refactor the code to add this new feature for the end user.’ That is not a refactoring; it is a new feature. I have a question about this. Are performance improvements such as adding internal caching or adding an index examples of refactoring? SAFe thinks so. But is a performance improvement not a change for the end user? Moreover, you can make a horrible mess in the code when you implement a performance improvement. Is it helpful to call that a refactoring? Do we want to be able to say ‘I refactored the IdLookup component to make it faster, but I made a mess; we need to refactor it again to make it cleaner.’ Performance improvements are non-functional improvements or, better, quality improvements that affect external users. This is not exactly refactoring, which has a focus on the ‘internal’.

What about moving configuration from code to an external config file, to improve maintainability or flexibility? This is another example of refactoring given by SAFe. I can see how, from a certain point of view, this could be thought of as an ‘internal’ change. But some ‘external’ stakeholders will care greatly about this change. The time it takes to deploy a configuration change can be greatly affected by externalizing configuration from a hard-coded value, since the code does not have to be recompiled and rebuilt. Build and package enhancements are quality improvements that affect external stakeholders such as testing and operations, which is not, strictly speaking, refactoring.

When we do TDD, we say we ‘refactor’ after each passing test. What does this really mean? At it simplest, this typically involves removing duplication, reducing complexity, applying certain patterns, fixing style violations, and so forth. Note that these are quality attributes of the system that are generally measurable with static analysis and code review. And there is a stakeholder who cares about these qualities; the dev team as a whole, the team lead, or, better, QA.  These improvements are about code quality, technical debt, and primarily affect the internal development team.  These are true refactorings.

Clojure Multimethods by Example

Essentially, these are similar to switch/case logic or, perhaps more similar to a strategy pattern.  defmulti will select a defmethod, to which it will dispatch your call.

From the popular clojure-koans, we have this example:

(defmulti diet (fn [x] (:eater x)))
(defmethod diet :herbivore [a] (str (:name a) " eats veggies."))
(defmethod diet :carnivore [a] (str (:name a) " eats animals"))
(defmethod diet :default [a] (str "I don't know what " (:name a) " eats."))

The defmulti ‘diet’ will select the appropriate defmethod from among the three options by evaluating (fn [x] (:eater x)) for x.

We can see from  (:eater x) that x is supposed to be a map, with an :eater key.  The value for that key will determine which defmethod to call.

So if x contains a value of :herbivore for :eater, then when we call diet, (str (:name a) ” eats veggies.”) will be evaluated.

(diet {:name "Bambi"  :eater :herbivore})
>= "Bambi eats veggies."

Likewise if x contains a value of :carnivore for :eater, then when we call diet,  (str (:name a) ” eats animals.”) will be evaluated.

(diet {:name "Simba" :eater :carnivore})
>= "Simba eats animals."

And the default defmethod will get executed when there is no other match.

That is all.

Deploying a JAR to Amazon EMR

I provisioned a simple EMR cluster and wanted to run my own WordCount on it. It took a few tries so here are the lessons learnt:

When you ‘Add Step’ to run a job on the new cluster, the key properties are as follows:

JAR location: s3n://poobar/wordcount.jar
Arguments: org.adrian.WordCount s3n://poobar/hamlet111.txt hdfs://10.32.43.156:9000/poobar/out

So my uberjar with the MapReduce job has been uploaded to S3, in a top level bucket called ‘poobar’.

There are three arguments to the job.

The first is the main class – this is always the first argument on EMR.
The following args get passed into the job’s public static void main(String[] args), i.e. the main method.
The job uses args[0] for the inputfile and args[1] for the output folder, which is fairly standard.
The inputfile has already been uploaded by me to S3, like the Jar itself.
The outputfolder has to be addressed using an HDFS protocol – a relative folder location seems to do the trick.

So with my setup, everything ends up in the ‘poobar’ bucket.

Python Streaming vs Clojure-Hadoop for MapReduce Implementations

You have a number of options these days for coding MR jobs.  Broadly, these fall into two categories –

  • Custom Jar
    • Hadoop allows programs packaged as Java JAR files to run as Mappers and Reducers
    • Any language which can be compiled into a JAR and can interop with Hadoop API can be used to implement MR jobs (such as Clojure, Scala, Groovy etc.)
  • Streaming
    • Create MR jobs in non-JVM languages, without no custom JAR, such as Python or C++.
    • These are programs which read from stdin and write to stdout, and are managed by Hadoop as Mappers and Reducers.

Java is not good at this.  The code is horrible.  I wanted to compare the syntax of a couple of alternatives: Python Streaming, and a Custom Jar written in Clojure.

1. Python Streaming

Wordcount is implemented here as two Python scripts passed to Hadoop’s streaming JAR:

Execute with:

hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming.jar \
-mapper "python $PWD/mapper.py" \
-reducer "python $PWD/reducer.py" \
-input  hdfs-input-file  \
-output hdfs-output-folder

MR implementation:

mapper.py:

import sys

for line in sys.stdin:
#--- output tuples [word, 1] in tab-delimited format to stdout---
  for word in line.split():
    print '%s\t%s' % (word, "1")

reducer.py

import sys

words = {}

# transform to map and count for each key
for line in sys.stdin:
 word, count = line.split('\t', 1)
 words[word] = words[word]+int(count)

# write the tuples to stdout
for word in words.keys():
 print '%s\t%s'% ( word, words[word] )

Note because the input to the reducer is just a sys.stdin, logic is required to transform the lines into a format that can be readily reduced – here, a map (‘words’). In custom Jars, this is done by the Hadoop framework itself.

One nice feature of streaming is that it is so easy to run this outside of hadoop using regular text files for the IO streams.

2. Pydoop

With pydoop, we can simplify this since we don’t deal directly with stdin/out streams:

def mapper(_, line, writer):
  for word in line.split():
    writer.emit(word, "1")

def reducer(word, icounts, writer):
  writer.emit(word, sum(map(int, icounts)))

Notice the reducer is simpler than in streaming, since icounts comes in the form [1 1 1 1 1 1 1] which needs no transformation in order to do reduce functions on it.

Execute with:

pydoop script wordcount.py hdfs-input-file hdfs-output-file

3. Clojure-Hadoop

This clojure library wraps the Hadoop Java API so Clojure-Java interop is hidden to the developer.  You can pass Clojure functions directly to Hadoop as Mappers and Reducers.

Execute with:

java -cp mycustomjar.jar clojure_hadoop.job \
-input hdfs-input-file \
-output  hdfs-output-folder \
-map clojure-hadoop.examples.wordcount/my-map \
-map-reader clojure-hadoop.wrap/int-string-map-reader \
-reduce clojure-hadoop.examples.wordcount/my-reduce \
-input-format text

MR implementation:

(ns clojure-hadoop.examples.wordcount
  (:import (java.util StringTokenizer)))

(defn my-map [key value]
  (map (fn [token] [token 1]) (enumeration-seq (StringTokenizer. value))))

(defn my-reduce [key values-fn]
  [[key (reduce + (values-fn))]])

my-map returns a vector [“word” 1] for each token in the parameter ‘value’
my-reduce returns a vector [“word” n] where n is the sum of occurrences, i.e. the wordcount

Note the entirety of clojure and this library must be packaged into an uberjar for this to run on Hadoop.

Sonar Unit and Integration Test Coverage with Maven

Lots of posts on the web about this, few seem to work. Here is my config, which works.

First, configuration of the maven sonar plugin:

<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>  <!-- Ensure you run mvn install before sonar:sonar -->
<sonar.java.codeCoveragePlugin>jacoco</sonar.java.codeCoveragePlugin>
<sonar.surefire.reportsPath>/target/surefire-reports</sonar.surefire.reportsPath>
<sonar.jacoco.reportPath>target/jacoco.exec</sonar.jacoco.reportPath>    <!-- This is the default, put here to be explicit -->
<sonar.jacoco.itReportPath>target/jacoco-it.exec</sonar.jacoco.itReportPath>

Next, the Jacoco plugin. Here we use the default output file for unit tests, and a separate output file for integration tests:

<plugin>
    <groupId>org.jacoco</groupId>
    <artifactId>jacoco-maven-plugin</artifactId>
    <version>0.6.2.201302030002</version>
    <executions>
        <execution>
            <id>pre-unit-test</id>
            <goals>
                <goal>prepare-agent</goal>
            </goals>
        </execution>
        <execution>
            <id>post-unit-test</id>
            <phase>test</phase>
            <goals>
                <goal>report</goal>
            </goals>
        </execution>
        <execution>
            <id>pre-integration-test</id>
            <phase>pre-integration-test</phase>
            <goals>
                <goal>prepare-agent</goal>
            </goals>
            <configuration>
                <destFile>target/jacoco-it.exec</destFile>
                <propertyName>failsafe.argLine</propertyName>
            </configuration>
        </execution>
        <execution>
            <id>post-integration-test</id>
            <phase>post-integration-test</phase>
            <goals>
                <goal>report</goal>
            </goals>
            <configuration>
                <dataFile>target/jacoco-it.exec</dataFile>
            </configuration>
        </execution>
    </executions>
</plugin>

Finally, the failsafe plugin. This ensures tests will be instrumented during the integration test phase, and the results collected during the verify phase. The reference to argLine is critical, because this causes failsafe to write to the correct Jacoco output file.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-failsafe-plugin</artifactId>
    <version>2.14</version>
    <configuration>
        <argLine>${failsafe.argLine}</argLine>
    </configuration>
    <executions>
        <execution>
            <id>integration-test</id>
            <goals>
                <goal>integration-test</goal>
            </goals>
        </execution>
        <execution>
            <id>verify</id>
            <goals>
                <goal>verify</goal>
            </goals>
        </execution>
    </executions>
</plugin>

JiraRestClient for Java does not support edits or updates

Continuing from my last post on using Jira’s REST client for Java, it seems the version I am using, 2.0.0-m2, does not actually support updates.  It is read only.

Because it’s not like you’d ever want to update JIRA through its API, obviously :O

However the basic REST api does support updates.  To use this you need to authenticate and do an HTTP PUT which contains a JSON message with the update details.  Here I am going to post the code to update a custom field referenced by name.  Took me all afternoon.  Atlassian suck.

public class JiraRestAPITest {
  private static final String JIRA_USERNAME = "admin";
  private static final String JIRA_PASSWORD = "password";
  private static final String JIRA_URL = "http://localhost:8091/";
  private static final String CUSTOM_FIELD_NAME = "AosCustom1";
  private static final String JIRA_FILTER_BY_STATUS = "Open";
  private static final String JIRA_PROJECT_NAME = "aosdemo";
  private static final String ISSUE_ID = "DEMO-3";

  @Test
  public void testSomething() throws URISyntaxException {

    JiraRestClient restClient = login(JIRA_USERNAME, JIRA_PASSWORD);
    String jsql = filterByProject(JIRA_PROJECT_NAME);

    SearchResult results = restClient.getSearchClient().searchJql(jsql).claim();
    for (BasicIssue i : results.getIssues()) {
       String key = i.getKey();

       Issue issue = restClient.getIssueClient().getIssue(key).claim();
       Field customField = issue.getFieldByName(CUSTOM_FIELD_NAME);

       if (ISSUE_ID.equals(key)){
          String newValue = "Edited by AdrianOS at " + new Date();
          update(issue, customField, newValue);
       }

    }

  }

  private static String filterByProject(String projectName) {
    return String.format("project = '%s' AND status = '%s' ORDER BY priority DESC", projectName, JIRA_FILTER_BY_STATUS);
  }

  private static void update(Issue issue, Field field, String value) {

     try {
       HttpURLConnection connection = urlConnection(forIssue(issue), withEncoding());

       connection.connect();

       writeData(connection, jsonEditIssue(field, value));
       checkResponse(connection);

    } catch (IOException e) {
       throw new RuntimeException(e);
    }

 }

 private static void writeData(HttpURLConnection connection, JSONObject fields) throws IOException {
    System.out.println(fields);
    OutputStreamWriter out = new OutputStreamWriter(connection.getOutputStream());
    out.write(fields.toString());
    out.flush();
    out.close();
 }

 private static String withEncoding() {
    String userPassword = JIRA_USERNAME + ":" + JIRA_PASSWORD;
    return encodeBase64String(userPassword.getBytes());
 }

 private static void checkResponse(HttpURLConnection connection)
 throws IOException {

    if (HttpURLConnection.HTTP_NO_CONTENT != connection.getResponseCode()) {

    BufferedReader reader = new BufferedReader(new InputStreamReader(
    connection.getInputStream()));
    StringBuilder stringBuilder = new StringBuilder();

    String line = null;
    while ((line = reader.readLine()) != null) {
       stringBuilder.append(line + "\n");
    }

    System.err.println(stringBuilder.toString());
    }
 }

 private static URL forIssue(Issue issue) throws MalformedURLException {
    return issue.getSelf().toURL();
 }

 private static HttpURLConnection urlConnection(URL url, String encoding) throws IOException {
    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
    connection.setDoOutput(true);
    connection.setRequestProperty("Content-Type", "application/json");
    connection.setRequestMethod("PUT");
    connection.setRequestProperty("Authorization", "Basic " + encoding);
    return connection;
 }

 private static JSONObject jsonEditIssue(Field field, String value) {
    JSONObject summary = new JSONObject()
      .accumulate(field.getId(), value);

    JSONObject fields = new JSONObject().accumulate("fields", summary);
      return fields;
 }

 private static JiraRestClient login(String admin, String password) throws URISyntaxException {
    final URI jiraServerUri = new URI(JIRA_URL);
    final JiraRestClientFactory factory = new AsynchronousJiraRestClientFactory();
    return factory.createWithBasicHttpAuthentication(jiraServerUri, admin, password);
 }

}

Here are the dependencies:

 	<dependencies>
		<dependency>
			<groupId>com.atlassian.jira</groupId>
			<artifactId>jira-rest-java-client</artifactId>
			<version>2.0.0-m2</version>
		</dependency>

		<dependency>
			<groupId>net.sf.json-lib</groupId>
			<artifactId>json-lib</artifactId>
			<version>2.4</version>
			<classifier>jdk15</classifier>
		</dependency>

		<dependency>
			<groupId>commons-codec</groupId>
			<artifactId>commons-codec</artifactId>
			<version>1.6</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
	</dependencies>