Reactive programming is a programming paradigm geared around propagation of change. In other words, a program is expressed as a reaction to asynchronous event stream. Multiple consumers subscribe to an event stream and whenever there is a new event it is propagated to all the interested parties which can then react to the event. The canonical example of reactive programming is the modern spreadsheet like Microsoft Excel. In spreadsheet, you can set formulas such as “C1=A1*B1” to calculate product of values at cell A1
and B1
. Whenever values of either A1
or B1
cell changes, the product will be recalculated and new value will be shown at cell C1
. This is something that we can’t do natively in Java using just the core Java SDK. Let’s look at the following code snippet.
int a1 = 10;
int b1 = 5;
int c1 = a1*b1;
System.out.println("Product is "+c1); // Prints **Product is 50**
b1=6;
System.out.println("Product is "+c1); // Prints **Product is 50**
As you can see in the code snippet above, even though we changed the value of b1
from 5 to 6 the product c1
still prints 50 instead of printing 60.
To build reactive applications in Java, you can use any of the many libraries available in the Java ecosystem. One such library is RxJava, which is a Java port of Reactive Extensions. Reactive Extensions were originated at Microsoft and is part of .NET API. A library to compose asynchronous and event-driven programs through observable sequences.
My first interaction with reactive programming was in November 2013 when I was learning Meteor web framework. I spent sometime learning MeteorJS, blogged about it, built a sample application, and then never used it. Then earlier this year(2015), I again ventured into reactive programming while learning functional programming by following Eric Meijer awesome video series. Eric Meijer is one of the guys who created Rx(Reactive Extensions) while he was at Microsoft. Microsoft is one of the earliest users of reactive programming principles. In 2013, Netflix released the Java port of Rx called RxJava and from then whole world started following reactive programming.
Why Reactive programming?
Reactive programming is an answer to build responsive, resilient, scalable applications. Today’s applications need to:
* React to user load: Applications should scale to handle growing traffic with time.
* React to failure: Application should gracefully handle failure scenarios and recover from them.
* Responsive under load and failure: Application should respond to the client in timely manner under any circumstance.
These are the new requirements that we now have to enforce so that our applications remain useable for its consumers. Reactive programming can help build these new modern era applications by providing you a high level abstraction to work with a continuous stream of events.
Prerequisite for this blog
Following are the prerequisite:
- Java 8 knowledge is required. In case you are not familiar with Java 8 then you can refer to my 7 days with Java 8 series.
- Install the latest Java Development Kit (JDK) i.e. JDK 8 on the operating system. You can download it from Oracle website.
- Eclipse or IntelliJ IDEA
Getting started with RxJava
It is very easy to introduce RxJava in your project. Just add the following dependency to your pom.xml in case you are using Apache Maven.
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.14</version>
</dependency>
If you are using Gradle then you can add following to your build.gradle
file.
compile 'io.reactivex:rxjava:1.0.14'
Working with RxJava
The core concept behind Rx is Observable
. In RxJava, Observable is a class that implements reactive design pattern. It makes use of an underlying collection or some sort of computation function to produce events that can be consumed by consumers. Observable class provide methods that allow consumers to subscribe to the Observable. Many consumers can subscribe to a single Observable. On subscription, events are pushed to the consumer and they can then react to the pushed events. Let’s look at the code snippet to create an Observable with three values.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
In the code shown above, we used a factory method just
of the Observable
class. just
is one of the many factory methods that can be used to produce an Observable. The just
method take the three tweets and convert them into an Observable that emit those tweets.
Creating an Observable is no fun if no one subscribes to it. Let’s write our first subscriber that just prints the tweet to console as shown below.
import rx.Observable;
public class Example1 {
public static void main(String[] args) {
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.subscribe(tweet -> System.out.println(tweet));
}
}
In the code shown above, an Observer
is subscribed to the Observable
by calling the subscribe
method on the Observable instance. By invoking subscribe method on the Observable instance, we inform the Observable that this Observer want to receive events from it. Observer is an interface with three methods:
- onNext(T t): This method on Observer is invoked whenever Observable has a new item.
- onCompleted(): This method notify Observer that no more events will be pushed by the Observable.
- onError(Throwable e): This method notify Observer that the Observable has experienced an error condition.
The subscribe
method has many variants. In this case, we used the one which requires only onNext
handler. The method itself provides implementations for onCompleted
and onError
handlers. The tweet -> System.out.println(tweet)
is a lambda expression of type Action1<String>
which will prints to the console whenever a new event is received. Action1
is a functional interface i.e. an interface with only one abstract method. Observable will invoke this lambda expression every time it has a new event. In case you are not familiar with lambdas then please refer to my blog on this subject. We can make the above code more succinct by using another Java 8 feature called Method references.
tweets.subscribe(System.out::println);
Method references can be seen as shorthand notation for lambda expression that only calls a single method. In the expression System.out::println
, System.out
is the target reference, :: is the delimiter, and printltn
is the function that will be called on the target reference. You can use method references on both the static and instance methods.
The example program when run will produce following output.
learning RxJava
Writing blog about RxJava
RxJava rocks!!
Observable creational factory methods
There are many static factory methods in the Observable class that you can use to create Observable instances. Let’s look at few of the them.
Observable.from
This is a convenient method that turns any Iterable
into an Observable. You can use it like as shown below.
List<String> tweets = Arrays.asList("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
Observable<String> observable = Observable.from(tweets);
observable.subscribe(System.out::println);
Observable.just
is a convenient wrapper around Observable.from
.
Observable.create
This is the base factory method that every other factory method uses to create an Observable. This method can be used to convert anything into an Observable. For example, we could write an Observable that will produce first n natural numbers as shown below.
import rx.Observable;
import java.util.stream.IntStream;
public class Example4 {
public static Observable<Integer> naturalNumbers(int n) {
return Observable.create(subscriber -> {
IntStream.rangeClosed(1, n).forEach(number -> subscriber.onNext(number));
subscriber.onCompleted();
});
}
public static void main(String[] args) {
Observable<Integer> obs1 = naturalNumbers(10);
obs1.subscribe(System.out::println);
}
}
The Observable.create
factory method takes one parameter of type OnSubscribe
. The OnSubscribe
interface is a functional interface with one method takes one argument of type Subscriber<T>
and returns nothing. OnSubscribe
call
method will be invoked each time a new Observer
subscribes to the Observable
.
There are many more factory methods to create an Observable like Observable.timer
, Observable.interval
, Observable.empty
, etc that I are not covering in this blog post. It is left to the reader to learn them off their own.
Multiple subscribers
Let’s now at the scenario when multiple Observer subscribe to an Observable.
import rx.Observable;
import java.util.Arrays;
import java.util.List;
public class Example3 {
public static void main(String[] args) {
List<String> tweets = Arrays.asList("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
Observable<String> observable = Observable.from(tweets);
observable.subscribe(tweet -> System.out.println("Subscriber 1 >> " + tweet));
observable.subscribe(tweet -> System.out.println("Subscriber 2 >> " + tweet));
observable.subscribe(tweet -> System.out.println("Subscriber 3 >> " + tweet));
}
}
As you can see the code above, we have three Observer that are subscribing to the a single tweets
Observable. When we will run this program, it will produce following output.
Subscriber 1 >> learning RxJava
Subscriber 1 >> Writing blog about RxJava
Subscriber 1 >> RxJava rocks!!
Subscriber 2 >> learning RxJava
Subscriber 2 >> Writing blog about RxJava
Subscriber 2 >> RxJava rocks!!
Subscriber 3 >> learning RxJava
Subscriber 3 >> Writing blog about RxJava
Subscriber 3 >> RxJava rocks!!
As is obvious from the program output, each Observer got its own copy of events from the Observable. Observable only emits events when someone subscribes to it and until then it remains inactive. Every call to the subscribe
method will emit all the events in the underlying Iterable
from the start i.e. all the Observer will receive all the tweets.
Cold and Hot Observable
Cold Observable are those which only emit events when someone subscribes to them. All the Observable factory methods so far covered in this blog like create
, from
, just
produce cold Observable instances.
Hot observable are those that emit event independent of whether someone is subscribed to them or not. The subscriber that subscribes to the Observable will only receive future event. Subscribers does not receive events that were produced before they subscribed to the Observable. You can convert any cold Observable to hot Observable by calling the publish
method on it.
import rx.Observable;
import rx.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
public class Example5 {
public static void main(String[] args) throws Exception{
ConnectableObservable<Long> hotObservable = Observable.interval(1, TimeUnit.SECONDS).publish();
hotObservable.subscribe(val -> System.out.println("Subscriber 1 >> " + val));
hotObservable.connect();
Thread.sleep(5000);
hotObservable.subscribe(val -> System.out.println("Subscriber 2 >> " + val));
Thread.sleep(5000);
}
}
In the code shown above, we created an interval based Observable. This Observable emits one long value every one second starting from 0. We converted the Observable into a hot Observable by calling the publish
method on it. The publish method returns a Observable of type ConnectableObservable
. A ConnectableObservable does not begin emitting items when an Observer is subscribed to it, but when its connect method is called. The first subscriber will receive all the 10 events but as second subscriber joins after 5 seconds so it wil only receive remaining 5. The sample output produced by this program is shown below.
Subscriber 1 >> 0
Subscriber 1 >> 1
Subscriber 1 >> 2
Subscriber 1 >> 3
Subscriber 1 >> 4
Subscriber 1 >> 5
Subscriber 2 >> 5
Subscriber 1 >> 6
Subscriber 2 >> 6
Subscriber 1 >> 7
Subscriber 2 >> 7
Subscriber 1 >> 8
Subscriber 2 >> 8
Subscriber 1 >> 9
Subscriber 2 >> 9
Performing transformations and computations on Observable
So far we have only used forEach
method to print all the events produced by an Observable stream. There are many more methods available on the Observable instance that can help you work with Observable stream in a declarative manner. These operators fall into three main categories.
- Transformation
- Filter
- Aggregation
Transformation operators
These operators transform one Observable to another Observable stream. Some of the common operators that fall under this category are map
, flatMap
, scan
.
- map: This will convert an Observable stream into another Observable stream by applying a function to all the items emitted by the source Observable stream. In the example shown below, we are converting a Observable of String to Observable of integers.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.map(tweet -> tweet.length()).forEach(System.out::println);
It will produce following output.
15
25
14
- flatMap: This convert an Observable stream into stream of Observable and then flatten them to a single Observable.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.flatMap(tweet -> Observable.<String>from(tweet.split(""))).forEach(System.out::println);
You can read more about transformational functions in the RxJava documentation.
Filter operators
- filter: It filters items emitted by the Observable. In the example shown below, only last tweet
RxJava rocks!!
will be printed.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.filter(tweet -> tweet.startsWith("RxJava")).forEach(System.out::println);
You can read more about filtering functions in the RxJava documentation.
Aggregation operators
- count: It counts the number of events emitted by the Observable as shown in the example below.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.count().forEach(System.out::println);
- reduce: It applies a function to each item emitted by an Observable, and produces a single value. In the code shown below, we used
reduce
function to sum length of all the tweets.
Observable<String> tweets = Observable.just("learning RxJava", "Writing blog about RxJava", "RxJava rocks!!");
tweets.map(tweet -> tweet.length()).reduce(0, (acc, el) -> acc + el).forEach(System.out::println);
Building a sample application
Now that you know basics of RxJava let’s build a develop a real-time tweet sentiment analyzer using it. We will use the poor man sentiment analysis approach i.e. bag of positive and negative words — if the text contains one of the positive word from the positive word list then we will mark it as a positive tweet, else if text contains a negative word then we will mark it as negative tweet, otherwise we will consider it as a neutral tweet. I am not using any advanced sentiment analyzer like Stanford CoreNLP to keep focus on RxJava. In case you want to learn about Stanford CoreNLP API then you can refer to my earlier blog on this subject.
Create a twitter application
The application also requires four environment variables corresponding to a twitter application. Create a new twitter application by going to https://dev.twitter.com/apps/new. The environment variables are:
twitter4j.oauth.consumerKey=<enter value>
twitter4j.oauth.consumerSecret=<enter value>
twitter4j.oauth.accessToken=<enter value>
twitter4j.oauth.accessTokenSecret=<enter value>
Adding Twitter4j dependencies to classpath
Please add Twitter4j dependencies to your pom.xml.
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.3</version>
</dependency>
Creating a Tweet Observable
Create a new class TweetObservable
that will have a factory method to create an Observable as shown below.
import rx.Observable;
import twitter4j.*;
public final class TweetObservable {
public static Observable<Status> tweetObservable(final String[] searchKeywords) {
return Observable.create(subscriber -> {
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusAdapter() {
public void onStatus(Status status) {
subscriber.onNext(status);
}
public void onException(Exception ex) {
subscriber.onError(ex);
}
});
FilterQuery query = new FilterQuery();
query.language(new String[]{"en"});
query.track(searchKeywords);
twitterStream.filter(query);
});
}
}
The code shown above does the following:
- It creates an Observable using the
Observable.create(OnSubscribe)
method.
Observable.create
method is passed an lambda expression that creates an instance of TwitterStream
using the TwitterStreamFactory
.
- We query Twitter for all the search terms received as method argument.
- The
twitterStream
instance is configured with a listener that will be invoked when a new status is received or an exception is encountered. When a new status is received then subscriber.onNext()
is called with the status update. In case an error is encountered, subscriber onError
method is invoked passing it the exception that was thrown.
Subscribing to Tweet Observable
Next we create a class SentimentAnalyzer
which acts creates an Observable and subscribes a Observer to it. Each time a new status update is received we print it to console.
import rx.Observable;
import rx.observables.ConnectableObservable;
import twitter4j.Status;
public class SentimentAnalyzer {
public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);
tweetStream.forEach(System.out::println);
}
}
Grouping tweets by Sentiment
Now let’s extend this example by adding the functionality to group tweets based on the sentiment. We will use groupBy
operator which returns an Observable instance that emit multiple Observable instances. The emitted Observable instances are of type GroupedObservable.
public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);
tweetStream.groupBy(tweet -> sentiment(tweet)).subscribe(
(GroupedObservable<Sentiment, String> gr) ->
gr.asObservable().forEach(tweet -> System.out.println(String.format("%s >> %s", gr.getKey(), tweet))));
}
The sentiment function is a very simple function that checks if tweet text contains positive or negative word.
The output of the above program will be like as shown below.
POSITIVE >> I like the new Justin bieber song and no one can stop me
NEUTRAL >> RT @JBCrewdotcom: VIDEO: Rebecca Black Talks About Justin Bieber at the MTV VMA's 2015: http://t.co/T24MXwQ7Ve
NEGATIVE >> I hate Justin bieber songs and music
Building analytics
Let’s suppose we want to count how many positive, negative, and neutral tweets are posted every minute we can write code as shown below.
public static void main(String[] args) throws Exception {
String[] searchTerms = {"Justin Bieber"};
ConnectableObservable<Status> observable = TweetObservable.tweetObservable(searchTerms).publish();
observable.connect();
Observable<String> tweetStream = observable.map(Status::getText);
tweetStream.groupBy(tweet -> sentiment(tweet)).subscribe(
(GroupedObservable<Sentiment, String> gr) ->
gr.asObservable().window(1, TimeUnit.MINUTES).subscribe(val -> val.count().subscribe(count -> System.out.println(String.format("%s %d tweets/minute", gr.getKey(), count)))));
}
The output will be something like as shown below.
POSITIVE 44 tweets/minute
NEUTRAL 70 tweets/minute
NEGATIVE 11 tweets/minute
POSITIVE 32 tweets/minute
NEUTRAL 56 tweets/minute
NEGATIVE 28 tweets/minute
Conclusion
In this blog we first looked at RxJava basics and then built a simple tweet sentiment analyzer. I have only scratched surface of RxJava in this blog. There are many more topics like error handling, back pressure, combinators, schedulers that I have not covered in this blog. I will cover them later in this series. Till then stay tuned and keep giving us feedback.