Reactor Pattern and its relation to Observer

The semantics of RxJava refer to the Observer pattern.  What exactly is the relation?

Here’a nice simple example from the Couchbase documentation:

Observable
    .just("doc1", "doc2", "doc3";)
    .flatMap(bucket::get)
    .subscribe(document -> System.out.println("Got: " + document));

You begin with any kind of stream, for which you want map each member to something new.  Syntactically, this is just the same as the Java 8 streaming API, which also has map and flatMap, and also allows a fork/join. Whilst in Java, the stream functions over a collection, in Rx, the stream is designed to handle events occurring over time, terminating on the receipt of a completion event.

This requires that mapping items (in the example, calling bucket.get for each item) in the stream will return Observables, rather than results or values.  The point of this is that we can add one or more subscribers that will get notified when the result of each element mapping is complete. A Reactor framework will typically allow subscribers for specific observable events – success and failure, and completion of the event stream, etc. These can be handled according to the type of Observable (HTTP request, TCP request, database call etc.).

This is an implementation of the Observer pattern, where observers (subscribers) register for specific event notifications. It is particularly relevant for microservices, where intermediate results are to be collected and then assembled into a final result.

Advertisements

Concise List Creation in Java

I came across some interesting code today to set up a list of objects in Java that I thought was worth highlighting for its conciseness.

List<ServerAddress> list = new ArrayList<>() {{
        add(new ServerAddress("127.0.0.1", 27017));
        add(new ServerAddress("127.0.0.1", 27027));
        add(new ServerAddress("127.0.0.1", 27037));
    }
}

Essentially we subclass a new list instance as an anonymous inner class with an initializer block (since, being anonymous, it cannot have a constructor). In that initializer, we call add on the list for each item we wish to add. I thought this was a good, concise way of creating the populated list, allowing fluent API style without the need for external libraries like op4j.