Building Reactive Apps with RxJava and Java 8


Reactive programming is a programming paradigm geared around propagation of change. In other words, a program is expressed as a reaction to asynchronous event stream. Multiple consumers subscribe to an event stream and whenever there is a new event it is propagated to all the interested parties which can then react to the event. The canonical example of reactive programming is the modern spreadsheet like Microsoft Excel. In spreadsheet, you can set formulas such as “C1=A1*B1” to calculate product of values at cell A1 and B1. Whenever values of either A1 or B1 cell changes, the product will be recalculated and new value will be shown at cell C1. This is something that we can’t do natively in Java using just the core Java SDK. Let’s look at the following code snippet.

int a1 = 10;
int b1 = 5;
int c1 = a1*b1;
System.out.println("Product is "+c1); // Prints **Product is 50**
b1=6;
System.out.println("Product is "+c1); // Prints **Product is 50**

As you can see in the code snippet above, even though we changed the value of b1 from 5 to 6 the product c1 still prints 50 instead of printing 60.

To build reactive applications in Java, you can use any of the many libraries available in the Java ecosystem. One such library is RxJava, which is a Java port of Reactive Extensions. Reactive Extensions were originated at Microsoft and is part of .NET API. A library to compose asynchronous and event-driven programs through observable sequences.

My first interaction with reactive programming was in November 2013 when I was learning Meteor web framework. I spent sometime learning MeteorJS, blogged about it, built a sample application, and then never used it. Then earlier this year(2015), I again ventured into reactive programming while learning functional programming by following Eric Meijer awesome video series. Eric Meijer is one of the guys who created Rx(Reactive Extensions) while he was at Microsoft. Microsoft is one of the earliest users of reactive programming principles. In 2013, Netflix released the Java port of Rx called RxJava and from then whole world started following reactive programming.

Why Reactive programming?

Reactive programming is an answer to build responsive, resilient, scalable applications. Today’s applications need to:
* React to user load: Applications should scale to handle growing traffic with time.
* React to failure: Application should gracefully handle failure scenarios and recover from them.
* Responsive under load and failure: Application should respond to the client in timely manner under any circumstance.

These are the new requirements that we now have to enforce so that our applications remain useable for its consumers. Reactive programming can help build these new modern era applications by providing you a high level abstraction to work with a continuous stream of events.

Prerequisite for this blog

Following are the prerequisite:

  1. Java 8 knowledge is required. In case you are not familiar with Java 8 then you can refer to my 7 days with Java 8 series.
  2. Install the latest Java Development Kit (JDK) i.e. JDK 8 on the operating system. You can download it from Oracle website.
  3. Eclipse or IntelliJ IDEA

Getting started with RxJava

It is very easy to introduce RxJava in your project. Just add the following dependency to your pom.xml in case you are using Apache Maven.

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.14</version>
</dependency>

If you are using Gradle then you can add following to your build.gradle file.

compile 'io.reactivex:rxjava:1.0.14'

Working with RxJava

The core concept behind Rx is Observable. In RxJava, Observable is a class that implements reactive design pattern. It makes use of an underlying collection or some sort of computation function to produce events that can be consumed by consumers. Observable class provide methods that allow consumers to subscribe to the Observable. Many consumers can subscribe to a single Observable. On subscription, events are pushed to the consumer and they can then react to the pushed events. Let’s look at the code snippet to create an Observable with three values.

Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");

In the code shown above, we used a factory method just of the Observable class. just is one of the many factory methods that can be used to produce an Observable. The just method take the three tweets and convert them into an Observable that emit those tweets.

Creating an Observable is no fun if no one subscribes to it. Let’s write our first subscriber that just prints the tweet to console as shown below.

import rx.Observable;

public class Example1 {
public static void main(String[] args) {
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.subscribe(tweet -> System.out.println(tweet));
}
}

In the code shown above, an Observer is subscribed to the Observable by calling the subscribe method on the Observable instance. By invoking subscribe method on the Observable instance, we inform the Observable that this Observer want to receive events from it. Observer is an interface with three methods:

  1. onNext(T t): This method on Observer is invoked whenever Observable has a new item.
  2. onCompleted(): This method notify Observer that no more events will be pushed by the Observable.
  3. onError(Throwable e): This method notify Observer that the Observable has experienced an error condition.

The subscribe method has many variants. In this case, we used the one which requires only onNext handler. The method itself provides implementations for onCompleted and onError handlers. The tweet -&gt; System.out.println(tweet) is a lambda expression of type Action1&lt;String&gt; which will prints to the console whenever a new event is received. Action1 is a functional interface i.e. an interface with only one abstract method. Observable will invoke this lambda expression every time it has a new event. In case you are not familiar with lambdas then please refer to my blog on this subject. We can make the above code more succinct by using another Java 8 feature called Method references.

tweets.subscribe(System.out::println);

Method references can be seen as shorthand notation for lambda expression that only calls a single method. In the expression System.out::println, System.out is the target reference, :: is the delimiter, and printltn is the function that will be called on the target reference. You can use method references on both the static and instance methods.

The example program when run will produce following output.

learning RxJava
Writing blog about RxJava
RxJava rocks!!

Observable creational factory methods

There are many static factory methods in the Observable class that you can use to create Observable instances. Let’s look at few of the them.

Observable.from

This is a convenient method that turns any Iterable into an Observable. You can use it like as shown below.

List<String> tweets = Arrays.asList("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
Observable<String> observable = Observable.from(tweets);
observable.subscribe(System.out::println);

Observable.just is a convenient wrapper around Observable.from.

Observable.create

This is the base factory method that every other factory method uses to create an Observable. This method can be used to convert anything into an Observable. For example, we could write an Observable that will produce first n natural numbers as shown below.

import rx.Observable;

import java.util.stream.IntStream;

public class Example4 {

public static Observable<Integer> naturalNumbers(int n) {
return Observable.create(subscriber -> {
IntStream.rangeClosed(1, n).forEach(number -> subscriber.onNext(number));
subscriber.onCompleted();
});
}

public static void main(String[] args) {
Observable<Integer> obs1 = naturalNumbers(10);
obs1.subscribe(System.out::println);
}
}

The Observable.create factory method takes one parameter of type OnSubscribe. The OnSubscribe interface is a functional interface with one method takes one argument of type Subscriber&lt;T&gt; and returns nothing. OnSubscribe call method will be invoked each time a new Observer subscribes to the Observable.

There are many more factory methods to create an Observable like Observable.timer, Observable.interval, Observable.empty, etc that I are not covering in this blog post. It is left to the reader to learn them off their own.

Multiple subscribers

Let’s now at the scenario when multiple Observer subscribe to an Observable.

import rx.Observable;

import java.util.Arrays;
import java.util.List;

public class Example3 {

public static void main(String[] args) {
List<String> tweets = Arrays.asList("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
Observable<String> observable = Observable.from(tweets);
observable.subscribe(tweet -> System.out.println("Subscriber 1 >> " + tweet));
observable.subscribe(tweet -> System.out.println("Subscriber 2 >> " + tweet));
observable.subscribe(tweet -> System.out.println("Subscriber 3 >> " + tweet));
}
}

As you can see the code above, we have three Observer that are subscribing to the a single tweets Observable. When we will run this program, it will produce following output.

Subscriber 1 >> learning RxJava
Subscriber 1 >> Writing blog about RxJava
Subscriber 1 >> RxJava rocks!!
Subscriber 2 >> learning RxJava
Subscriber 2 >> Writing blog about RxJava
Subscriber 2 >> RxJava rocks!!
Subscriber 3 >> learning RxJava
Subscriber 3 >> Writing blog about RxJava
Subscriber 3 >> RxJava rocks!!

As is obvious from the program output, each Observer got its own copy of events from the Observable. Observable only emits events when someone subscribes to it and until then it remains inactive. Every call to the subscribe method will emit all the events in the underlying Iterable from the start i.e. all the Observer will receive all the tweets.

Cold and Hot Observable

Cold Observable are those which only emit events when someone subscribes to them. All the Observable factory methods so far covered in this blog like create, from, just produce cold Observable instances.

Hot observable are those that emit event independent of whether someone is subscribed to them or not. The subscriber that subscribes to the Observable will only receive future event. Subscribers does not receive events that were produced before they subscribed to the Observable. You can convert any cold Observable to hot Observable by calling the publish method on it.

import rx.Observable;
import rx.observables.ConnectableObservable;

import java.util.concurrent.TimeUnit;

public class Example5 {

public static void main(String[] args) throws Exception{
ConnectableObservable<Long> hotObservable = Observable.interval(1, TimeUnit.SECONDS).publish();
hotObservable.subscribe(val -> System.out.println("Subscriber 1 >> " + val));
hotObservable.connect();

Thread.sleep(5000);

hotObservable.subscribe(val -> System.out.println("Subscriber 2 >> " + val));

Thread.sleep(5000);
}
}

In the code shown above, we created an interval based Observable. This Observable emits one long value every one second starting from 0. We converted the Observable into a hot Observable by calling the publish method on it. The publish method returns a Observable of type ConnectableObservable. A ConnectableObservable does not begin emitting items when an Observer is subscribed to it, but when its connect method is called. The first subscriber will receive all the 10 events but as second subscriber joins after 5 seconds so it wil only receive remaining 5. The sample output produced by this program is shown below.

Subscriber 1 >> 0
Subscriber 1 >> 1
Subscriber 1 >> 2
Subscriber 1 >> 3
Subscriber 1 >> 4
Subscriber 1 >> 5
Subscriber 2 >> 5
Subscriber 1 >> 6
Subscriber 2 >> 6
Subscriber 1 >> 7
Subscriber 2 >> 7
Subscriber 1 >> 8
Subscriber 2 >> 8
Subscriber 1 >> 9
Subscriber 2 >> 9

Performing transformations and computations on Observable

So far we have only used forEach method to print all the events produced by an Observable stream. There are many more methods available on the Observable instance that can help you work with Observable stream in a declarative manner. These operators fall into three main categories.

  1. Transformation
  2. Filter
  3. Aggregation

Transformation operators

These operators transform one Observable to another Observable stream. Some of the common operators that fall under this category are map, flatMap, scan.

  • map: This will convert an Observable stream into another Observable stream by applying a function to all the items emitted by the source Observable stream. In the example shown below, we are converting a Observable of String to Observable of integers.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.map(tweet -> tweet.length()).forEach(System.out::println);

It will produce following output.

15
25
14
  • flatMap: This convert an Observable stream into stream of Observable and then flatten them to a single Observable.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.flatMap(tweet -> Observable.<String>from(tweet.split(""))).forEach(System.out::println);

You can read more about transformational functions in the RxJava documentation.

Filter operators

  • filter: It filters items emitted by the Observable. In the example shown below, only last tweet RxJava rocks!! will be printed.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.filter(tweet -> tweet.startsWith("RxJava")).forEach(System.out::println);

You can read more about filtering functions in the RxJava documentation.

Aggregation operators

  • count: It counts the number of events emitted by the Observable as shown in the example below.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.count().forEach(System.out::println);
  • reduce: It applies a function to each item emitted by an Observable, and produces a single value. In the code shown below, we used reduce function to sum length of all the tweets.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.map(tweet -> tweet.length()).reduce(0, (acc, el) -> acc + el).forEach(System.out::println);

Building a sample application

Now that you know basics of RxJava let’s build a develop a real-time tweet sentiment analyzer using it. We will use the poor man sentiment analysis approach i.e. bag of positive and negative words — if the text contains one of the positive word from the positive word list then we will mark it as a positive tweet, else if text contains a negative word then we will mark it as negative tweet, otherwise we will consider it as a neutral tweet. I am not using any advanced sentiment analyzer like Stanford CoreNLP to keep focus on RxJava. In case you want to learn about Stanford CoreNLP API then you can refer to my earlier blog on this subject.

Create a twitter application

The application also requires four environment variables corresponding to a twitter application. Create a new twitter application by going to https://dev.twitter.com/apps/new. The environment variables are:

twitter4j.oauth.consumerKey=<enter value>
twitter4j.oauth.consumerSecret=<enter value>
twitter4j.oauth.accessToken=<enter value>
twitter4j.oauth.accessTokenSecret=<enter value>

Adding Twitter4j dependencies to classpath

Please add Twitter4j dependencies to your pom.xml.

<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.3</version>
</dependency>

Creating a Tweet Observable

Create a new class TweetObservable that will have a factory method to create an Observable as shown below.

import rx.Observable;
import twitter4j.*;

public final class TweetObservable {

public static Observable<Status> tweetObservable(final String[] searchKeywords) {
return Observable.create(subscriber -> {
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusAdapter() {
public void onStatus(Status status) {
subscriber.onNext(status);
}

public void onException(Exception ex) {
subscriber.onError(ex);
}
});
FilterQuery query = new FilterQuery();
query.language(new String[]{"en"});
query.track(searchKeywords);
twitterStream.filter(query);
});
}
}

The code shown above does the following:

  1. It creates an Observable using the Observable.create(OnSubscribe) method.
  2. Observable.create method is passed an lambda expression that creates an instance of TwitterStream using the TwitterStreamFactory.
  3. We query Twitter for all the search terms received as method argument.
  4. The twitterStream instance is configured with a listener that will be invoked when a new status is received or an exception is encountered. When a new status is received then subscriber.onNext() is called with the status update. In case an error is encountered, subscriber onError method is invoked passing it the exception that was thrown.

Subscribing to Tweet Observable

Next we create a class SentimentAnalyzer which acts creates an Observable and subscribes a Observer to it. Each time a new status update is received we print it to console.

import rx.Observable;
import rx.observables.ConnectableObservable;
import twitter4j.Status;

public class SentimentAnalyzer {

public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);
tweetStream.forEach(System.out::println);
}
}

Grouping tweets by Sentiment

Now let’s extend this example by adding the functionality to group tweets based on the sentiment. We will use groupBy operator which returns an Observable instance that emit multiple Observable instances. The emitted Observable instances are of type GroupedObservable.

public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);

tweetStream.groupBy(tweet -> sentiment(tweet)).subscribe(
(GroupedObservable<Sentiment, String> gr) ->
gr.asObservable().forEach(tweet -> System.out.println(String.format("%s >> %s", gr.getKey(), tweet))));
}

The sentiment function is a very simple function that checks if tweet text contains positive or negative word.

The output of the above program will be like as shown below.

POSITIVE >> I like the new Justin bieber song and no one can stop me
NEUTRAL >> RT @JBCrewdotcom: VIDEO: Rebecca Black Talks About Justin Bieber at the MTV VMA's 2015: http://t.co/T24MXwQ7Ve
NEGATIVE >> I hate Justin bieber songs and music

Building analytics

Let’s suppose we want to count how many positive, negative, and neutral tweets are posted every minute we can write code as shown below.

public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);

tweetStream.groupBy(tweet -> sentiment(tweet)).subscribe(
(GroupedObservable<Sentiment, String> gr) ->
gr.asObservable().window(1, TimeUnit.MINUTES).subscribe(val -> val.count().subscribe(count -> System.out.println(String.format("%s %d tweets/minute", gr.getKey(), count)))));
}

The output will be something like as shown below.

POSITIVE 44 tweets/minute
NEUTRAL 70 tweets/minute
NEGATIVE 11 tweets/minute
POSITIVE 32 tweets/minute
NEUTRAL 56 tweets/minute
NEGATIVE 28 tweets/minute

Conclusion

In this blog we first looked at RxJava basics and then built a simple tweet sentiment analyzer. I have only scratched surface of RxJava in this blog. There are many more topics like error handling, back pressure, combinators, schedulers that I have not covered in this blog. I will cover them later in this series. Till then stay tuned and keep giving us feedback.

2 thoughts on “Building Reactive Apps with RxJava and Java 8”

Leave a comment