Saturday, November 30, 2013

RxJava on Android with Scala => AWESOME

I The issue

One of the most annoying aspect about Android is how the multithreading is managed.

As you know long task can't be executed on the main thread, because any task executed on is blocking the whole device. For that reason, recent version of Android will just crash if you try to do network task without multithreading. And in other situation, you will get a dialog (ANR for Application Not Responding) to ask to the user to make the app crash if it doesn't respond during few seconds.

Even if you have some IO stuff very rapid to execute like, let's say half a second, the user will still notice some lag. He may try to open a menu, and during the animation, it will stop, then continue, giving this non professional aspect that so many Android dev seems to love.

As per the Android team:
Generally, 100 to 200ms is the threshold beyond which users will perceive slowness in an application.
Now that we agree about taking care of that point, let's see what tools we can use.

The usual tool is the AsyncTask abstract class. It's easy to implement but has many drawbacks. You probably experienced most of them, it's all about context leaking, difficulties to manage exception or to chain several async tasks.

AsyncTaskLoader coming with recent version  of the SDK or the support package are even worse. It solves few issues (mainly the context leak) but add an incredible mass of boilerplate code. If you switched to Scala, that's clearly not what you are looking for.

Example:
(I wanted to copy paste an example from the official Javadoc here, but it's too long!)
Things are improved with the ListenableFutureExplained class from the Guava library. You can chain stuff, you will get many call back, the api is not perfect, but it works and there is not too much boilerplate, I mean just usual Java stuff.


II The perfect solution

Recently, a framework has made some noise,  you have certainly heard about it, it's Rx (the true name is the Reactive Extension).

Initially written in C# by +Erik Meijer, it has been translated in almost every important language, Java of course (and all the JVM world), but also Javascript, Python, Ruby and even PHP (yeah, PHP the fabulous <3).

Even more amazing, a part of a Coursera course is dedicated to its understanding. I am following it, and I can tell you, it's very, no, very very interesting. It's much more than just Rx, it's all about the Reactive programming movement! (heard about that Manifesto?)


Guess what, Rx will be our silver bullet for today!


First, a short introduction to Rx.

I won't be exhaustive because some very good presentation can be found on Google (like here by Ben Christensen).

To make it super short, Rx is will make you switch from a pull logic code to a push one.

If it sounds too much abstract, here is how I see Rx in my mind.
1/ You put your awesome but slow code in something called an Observable, the Observable is like a factory in the real world, it produces some final product (the results). 
2/ When the final product is ready, it delivers it to its client (the Observer). 
3/ If the production fails (your code throwed an exception), it will notify its client. 
4/ The marvellous thing about that is that the factory is located very far from the client (in another thread), and like in our world, the client has no idea how, or when he will get his final product, until that time he can do whatever he has to do, and only when the final product is ready, he takes care of it. 
Bonus: And finally, unlike the Future from Scala, Rx can deliver an infinite number of final product, but each one will be delivered when it's ready.

That's Rx!

You may have noticed that Observable and Observer seems to be similar to some ideas spread by a very famous Gang? It's not an error. Rx by itself is like a merge between the Observable/Observer pattern and the Iterator one. The Observer is useful for the multithreading aspect, and the Iterator one for the capacity to send an infinite amount of result. 

To conclude this short presentation, I have to highlight the fact that an Observable is a monad. Yeah the magic word. Because you are a Scala dev, you have probably no idea of what is a monad and how much awesomeness it means. So no need to give more information :-)

If you dev on Android and already heard about Rx, you have probably read this very good article from +Matthias Käppler regarding the use of Rx in Android.

If you have not yet read it, do it because I will assume you know and understand its content. 

III Scala and Rx


Here we will see what needs to be done to be more Scala compliant.

As said before RxJava, as most java library, works directly for most of the JVM languages.

This compatibility is awesome because we can access to most of the existing Java library. 

However it always means using a Java API. And why would we accept having less mature tools (Scala/Sbt IDE integration? Looking at you Intellij!) if at the end we program with a Java'ish style? OK Scala can also be a better Java without punctuation, that's nice... but not enough, isn't it?

We are lucky!

Nice Scala programmers have written a language adaptor so we can use the library in a Scala way, without anonymous class to implement.

Moreover, +Matthias Käppler has written some little code to help you to use the library on Android.

The issue is that +Matthias Käppler 's code is not directly compliant with Rx Scala language adapter. So you would have to choose which helper you want to use: Scala or Rx...

Fortunately, there is an easy way to benefit from both adapters. 

Let's see how.

IV The build.sbt file


Depending of your Android sbt support plug-in, you may have a proguard cache. It's a nice feature which dramatically accelerate the compilation process. Unfortunately it also create some issues if used with transitive librairies.

With the sbt plug-in I am using, I have noticed that sometimes, when a library depends of other libraries, these second level dependencies are sometimes added two times to the compiled code and then you crash at the compilation.

So, in our case we will declare both hepers as intransitive in the sbt file so we will be sure that the core part of Rx is added only one time.

You will add that to your sbt file:

libraryDependencies ++= Seq (
  "com.netflix.rxjava" % "rxjava-core" % "latest.integration",
  "com.netflix.rxjava" % "rxjava-scala" % "latest.integration" intransitive(),
  "com.netflix.rxjava" % "rxjava-android" % "latest.integration" intransitive()
) 
Now, it's time to implement.

The point we want to solve is making Android helpers work with the Scala one. Indeed in the Rx Android helper, there is the rx.android.concurrency.AndroidSchedulers to get easily a handler linked to the Android main thread, where we want to observe the result of our Async task.

The point we want to solve is that this handler will only work with a Java Observable and not a Scala one (they are different, a Scala Observable is a kind of wrapper of a Java one, so you can use the Scala syntax, and so, is not directly compliant with the Java/Android Observable).

For that, we will use the pimp my lib pattern. Basically, it's a decorator super easy to implement because of Scala language features.

But first, let's have a look to the Scala adaptor of an Observable to see what is different from a Java one.

So let's look to rx.lang.scala.Observable import:

/**
* Provides various ways to construct new Observables.
*/
object Observable {
import scala.collection.JavaConverters._
import scala.collection.immutable.Range
import scala.concurrent.duration.Duration
import ImplicitFunctionConversions._
...

Interesting, we can see an ImplicitFunctionConversions package. Inside, there are some code to convert a Java Observable to a Scala one.
/**
 * These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
 * Most RxScala users won't need them, but they might be useful if one wants to use
 * the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
 * to use a Java library taking/returning `Func`s and `Action`s.
 */

object ImplicitFunctionConversions {
  import language.implicitConversions

So we will do the same thing. We will create a class to convert the Android Scheduler to something compliant with the Scala Observable.

import rx.lang.scala.Observable
import language.implicitConversions


 /**
 * Convert implicitly a Java Observable in a Scala Observable.
 */
object RxThread {
  implicit def Observable2Notification[T](o: Observable[T]) = new RxThread(o)
}

Most of the time we will want to execute the Observable in a dedicated thread and get the result in the main one. To avoid boilerplate code we will do it in the pimp my lib way. 

The class will look like that:

import rx.lang.scala.Observable
import language.implicitConversions

/**
 * Do all the registration / observation stuff:<br>
 *
 * - observe on main thread<br>
 * - subscribe on new thread<br>
 * - make it compliant with Scala notification<br>
 *
 * @param o the observable to transform
 * @tparam T the type of the observable
 */
class RxThread[T](o:Observable[T]) {
   def execAsync[T] = {
    o.subscribeOn(Schedulers.newThread)
      .observeOn(AndroidSchedulers.mainThread())
      .materialize
  }
}

 /**
 * Convert implicitly a Java Observable in a Scala Observable.
 */
object RxThread {
implicit def Observable2Notification[T](o: Observable[T]) = new RxThread(o)
}
And we have finished.

Here is an example of how to use it:
import scala.io.Source
import java.io.{FileOutputStream, File}
import java.net.URL


import rx.lang.scala.{Subscription, Observer, Observable} 
 private val lastScript = "https://raw.github.com/..."



def getAsyncUrl(urlString: String):Observable[Option[String]] = Observable {
    (observer: Observer[Option[String]]) => {
      val url = new URL(urlString)
      val urlCon = url.openConnection()
      urlCon.setConnectTimeout(5000)
      urlCon.setReadTimeout(5000)
      val io = Source.fromInputStream(urlCon.getInputStream)
      val result = Option(io.getLines().mkString.split("\n")(0))
      io.close()
      observer.onNext(result)
      observer.onCompleted()
      Subscription()
    }
}


val mUnsubscribe = CompositeSubscription() 

mUnsubscribe += getAsyncUrl(lastScript).execAsync.subscribe(
                  _ match {
                    case OnNext(Some(t)) => CInfo(t)
                    case OnNext(None) => CAlert(getString(R.string.connection_error_message, "Server error"))
                    case OnError(e) => CAlert(getString(R.string.connection_error_message, e.getLocalizedMessage))
})

Don't forget to call mUnsubscribe.unsubscribe in your onDestroy callback so your asynctask will be stopped IF NEEDED when the activity is stopped.

In this example I have used a CompositeSubscription so you can attach more Observable to it, and then stop all of them in one call. Very useful feature!

As +Nick Stanchenko wrote to me, Future can do almost the same thing that what is shown in this little example. RxJava becomes more useful when you need to distribute an infinite amount of result. Imagine you want to crawl a big website, then, each time a new file is downloaded, you can notify it on the UI. Unfortunately this kind of example would hide Rx under lots of other code. Anyway, I am sure you got the point. Rx is awesome.

In a coming post, I will show you a simpler alternative based on the Future scala class which should work for 70% of your async tasks.

Read, comment and share this article!
Post a Comment