They say you should develop a proactive mindset in life, not a
reactive one. That does not apply to
Android programming, however! :]
Reactive programming is not just another API. It’s a whole new paradigm and a very useful one. RxJava is a reactive implementation used on Android. Android is a perfect place to start your exploration of the reactive world. It’s made even easier with RxAndroid, a library that wraps asynchronous UI events to be more RxJava like.
Don’t be scared — I’ll bet the basic concept of reactive programming is known to you even if you are not aware of it yet. :]
You’ll be working exclusively in CheeseActivity.java. The
In imperative programming, an expression is evaluated once and a value is assigned to a variable:
On the other hand, reactive programming is all about responding to value changes.
You have probably done some reactive programming — even if you didn’t realize it at the time.
The spreadsheet assigns cell B1 with a value of 2, cell B2 with a value of 3 and a third cell, B3, with an expression that multiplies the value of B1 by the value of B2. When the value of either of the the components referenced in the expression changes, the change is observed and the expression is re-evaluated automagically in B3:
Among the methods in the
From that point, the
That sounds complicated, but some examples will easily explain.
A network request observable usually emits a single item and immediately completes:
The circle represents an item that has been emitted from the observable and the black block represents a completion or error .
A mouse movement observable would emit mouse coordinates but will never complete:
Here you can see multiple items that have been emitted but no block showing the mouse has completed or raised an error.
No more items can be emitted after an observable has completed. Here’s an example of a misbehaving observable that violates the Observable contract:
That’s a bad, bad observable because it violates the Observable contract by emitting an item after it signaled completion.
You’ll create an Observable using
That’s nice and concise, but what does it mean? What is the “source?” To understand that signature, you need to know what an
Like an episode of a J.J. Abrams show like “Lost” or “Westworld,”
that answers some questions while inevitably asking more. So the
“source” you need to create your
RxJava’s
An
To visualize this whole situation, think of a water faucet regulating the flow of water. The water pipes are like an
An example will make the situation less abstract and more clear. It’s time to create your first observable! :]
Here’s what’s going on in the code above:
This interface is handy when you want to set up a simple subscription to an Observable.
The
But if all you want out of your subscription is for the observer to respond to values sent to
You’ll do exactly that when you subscribe in your activity’s
The import for
Here’s an explanation of each step:
Sounds yummy! :]
You might also notice the following line in Android Monitor:
This happens because
One popular myth about RxJava is that it is multi-threaded by default, similar to
You can change this behavior with the
On the other hand, it’s okay to call
The most useful schedulers are:
If you have an observable called
And if you apply
The result would be the following:
That’s a handy way to iterate over multiple items with little code. Let’s put it to use!
Modify
When prompted, resolve the ambiguous
Going over the code above:
For that you’ll need a
In the same
Taking each numbered comment in turn:
First, you need to subscribe to
Here’s the play-by-play of each step above:
Build and run your app. You should see the search kick off when you start typing text in the
Replace
Resolve the ambiguous
Everything will work exactly the same, except that text queries with
Run the app; you should see the search kick off only when you type the second character:
In
Run the app. You’ll notice that the search begins only when you stop making quick changes:
There are a lot of operators to combine observables. The most simple and useful one is
Change the beginning of
Run your app. Play with the text field and the search button; the
search will kick off either when you type finish typing two or more
symbols or when you simply press the Search button.
The
Add the following field to
In
Since you subscribed to the observable in
Add the following code to CheeseActivity.java:
And that’s it! :]
You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling
To learn more about RxJava refer to the ReactiveX documentation.
If you have any comments or questions, don’t hesitate to join the discussion below!
Android programming, however! :]
Reactive programming is not just another API. It’s a whole new paradigm and a very useful one. RxJava is a reactive implementation used on Android. Android is a perfect place to start your exploration of the reactive world. It’s made even easier with RxAndroid, a library that wraps asynchronous UI events to be more RxJava like.
Don’t be scared — I’ll bet the basic concept of reactive programming is known to you even if you are not aware of it yet. :]
Note: This tutorial requires good knowledge of Android and Java. To get up to speed, check out our Android Development Tutorials first and return to this tutorial when you’re ready.
In this RxAndroid Tutorial you will learn how to do the following:- What Reactive Programming is
- What an observable is
- Turn asynchronous events like button clicks and text field context changes into observables
- Transform observable items
- Filter observable items
- Specify the thread on which code should be executed
- Combine several observables into one
Getting Started
Download the starter project for this tutorial and open it in Android Studio.You’ll be working exclusively in CheeseActivity.java. The
CheeseActivity
class extends BaseSearchActivity
; take some time to explore BaseSearchActivity
and check out the following features ready for your use:showProgressBar()
: A method to show a progress bar…hideProgressBar()
: … and a method to hide it.showResult(List<String> result)
: A method to display a list of cheeses.mCheeseSearchEngine
: A field which is an instance ofCheeseSearchEngine
. It has asearch
method which you call when you want to search for cheeses. It accepts a text search query and returns a list of matching cheeses:
What is Reactive Programming?
Before creating your first observable, indulge yourself with a bit of a theory first. :]In imperative programming, an expression is evaluated once and a value is assigned to a variable:
int a = 2; int b = 3; int c = a * b; // c is 6 a = 10; // c is still 6 |
You have probably done some reactive programming — even if you didn’t realize it at the time.
- Defining cell values in spreadsheets is similar to defining variables in imperative programming.
- Defining cell expressions in spreadsheets is similar to defining and operating on observables in reactive programming.
The spreadsheet assigns cell B1 with a value of 2, cell B2 with a value of 3 and a third cell, B3, with an expression that multiplies the value of B1 by the value of B2. When the value of either of the the components referenced in the expression changes, the change is observed and the expression is re-evaluated automagically in B3:
RxJava Observable Contract
RxJava makes use of the Observer pattern.
Note: To refresh your memory about the Observer pattern you can visit Common Design Patterns for Android.
In the Observer pattern, you have objects that implement two key RxJava interfaces: Observable
and Observer
. When an Observable
changes state, all Observer
objects subscribed to it are notified.Among the methods in the
Observable
interface is subscribe()
, which an Observer
will call to begin the subscription.From that point, the
Observer
interface has three methods which the Observable
calls as needed:onNext(T value)
provides a new item of type T to theObserver
onComplete()
notifies theObserver
that theObservable
has finished sending itemsonError(Throwable e)
notifies theObserver
that theObservable
has experienced an error
Observable
emits zero or more items that could be followed by either completion or error. That sounds complicated, but some examples will easily explain.
A network request observable usually emits a single item and immediately completes:
The circle represents an item that has been emitted from the observable and the black block represents a completion or error .
A mouse movement observable would emit mouse coordinates but will never complete:
Here you can see multiple items that have been emitted but no block showing the mouse has completed or raised an error.
No more items can be emitted after an observable has completed. Here’s an example of a misbehaving observable that violates the Observable contract:
That’s a bad, bad observable because it violates the Observable contract by emitting an item after it signaled completion.
How to Create an Observable
There are many libraries to help you create observables from almost any type of event. However, sometimes you just need to roll your own. Besides, it’s a great way to learn!You’ll create an Observable using
Observable.create()
. Here is its signature:Observable<T> create(ObservableOnSubscribe<T> source) |
ObservableOnSubscribe
is. It’s an interface, with this contract:public interface ObservableOnSubscribe<T> { void subscribe(ObservableEmitter<T> e) throws Exception; } |
Observable
will need to expose subscribe()
, which in turn requires whatever’s calling it to provide an “emitter” as a parameter. What, then, is an emitter?RxJava’s
Emitter
interface is similar to the Observer
one:public interface Emitter<T> { void onNext(T value); void onError(Throwable error); void onComplete(); } |
ObservableEmitter
, specifically, also provides a means to cancel the subscription.To visualize this whole situation, think of a water faucet regulating the flow of water. The water pipes are like an
Observable
,
willing to deliver a flow of water if you have a means of tapping into
it. You construct a faucet that can turn on and off, which is like an ObservableEmitter
, and connect it to the water pipes in Observable.create()
. The outcome is a nice fancy faucet. :]An example will make the situation less abstract and more clear. It’s time to create your first observable! :]
Observe Button Clicks
Add the following code inside theCheeseActivity
class:// 1 private Observable<String> createButtonClickObservable() { // 2 return Observable.create(new ObservableOnSubscribe<String>() { // 3 @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { // 4 mSearchButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 5 emitter.onNext(mQueryEditText.getText().toString()); } }); // 6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { // 7 mSearchButton.setOnClickListener(null); } }); } }); } |
- You declare a method that returns an observable that will emit strings.
- You create an observable with
Observable.create()
, and supply it with a newObservableOnSubscribe
. - You define your
ObservableOnSubscribe
by overridingsubscribe()
. - Set up an
OnClickListener
onmSearchButton
. - When the click event happens, call
onNext
on the emitter and pass it the current text value ofmQueryEditText
. - Keeping references can cause memory leaks in Java. It’s a useful
habit to remove listeners as soon as they are no longer needed. But what
do you call when you are creating your own
Observable
? For that very reason,ObservableEmitter
hassetCancellable()
. Overridecancel()
, and your implementation will be called when the Observable is disposed, such as when the Observable is completed or all Observers have unsubscribed from it. - For
OnClickListener
, the code that removes the listener issetOnClickListener(null)
.
Consumer
. It’s a simple way to accept values coming in from an emitter.public interface Consumer<T> { void accept(T t) throws Exception; } |
The
Observable
interface requires several versions of subscribe()
, all with different parameters. For example, you could pass a full Observer
if you like, but then you’d need to implement all the necessary methods.But if all you want out of your subscription is for the observer to respond to values sent to
onNext()
, you can use the version of subscribe()
that takes in a single Consumer
(the parameter is even named onNext
, to make the connection clear).You’ll do exactly that when you subscribe in your activity’s
onStart()
. Add the following code to CheeseActivity.java:@Override protected void onStart() { super.onStart(); // 1 Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 2 .subscribe(new Consumer<String>() { //3 @Override public void accept(String query) throws Exception { // 4 showResult(mCheeseSearchEngine.search(query)); } }); } |
Consumer
is ambiguous; when prompted, import:import io.reactivex.functions.Consumer; |
- First, create an observable by calling the method you just wrote.
- Subscribe to the observable with
subscribe()
, and supply a simpleConsumer
. - Override
accept()
, which will be called when the observable emits an item. - Finally, perform the search and show the results.
Sounds yummy! :]
RxJava Threading Model
You’ve had your first taste of reactive programming. There is one problem though: the UI freezes up for a few seconds when the search button is pressed.You might also notice the following line in Android Monitor:
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread. |
search
is executed on the main thread. If search
were to perform a network request, Android will crash the app with a NetworkOnMainThreadException exception. It’s time to fix that.One popular myth about RxJava is that it is multi-threaded by default, similar to
AsyncTask
. However, if not otherwise specified, RxJava does all the work in the same thread it was called from.You can change this behavior with the
subscribeOn
and observeOn
operators.subscribeOn
is supposed to be called only once in the chain of operators. If it’s not, the first call wins. subscribeOn
specifies the thread on which the observable will be subscribed (i.e.
created). If you use observables that emit events from Android View, you
need to make sure subscription is done on the Android UI thread.On the other hand, it’s okay to call
observeOn
as many times as you want in the chain. observeOn
specifies the thread on which the next operators in the chain will be executed. For example:myObservable // observable will be subscribed on i/o thread .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(/* this will be called on main thread... */) .doOnNext(/* ...and everything below until next observeOn */) .observeOn(Schedulers.io()) .subscribe(/* this will be called on i/o thread */); |
Schedulers.io()
: Suitable for I/O-bound work such as network requests or disk operations.Schedulers.computation()
: Works best with computational tasks like event-loops and processing callbacks.AndroidSchedulers.mainThread()
executes the next operators on the UI thread.
The Map Operator
Themap
operator applies a function to each item emitted
by an observable and returns another observable that emits results of
those function calls. You’ll need this to fix the threading issue as
well.If you have an observable called
numbers
that emits the following:And if you apply
map
as follows:numbers.map(new Function<Integer, Integer>() { @Override public Integer apply(Integer number) throws Exception { return number * number; } } |
That’s a handy way to iterate over multiple items with little code. Let’s put it to use!
Modify
onStart()
in CheeseActivity
class to look like the following:@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(Schedulers.io()) // 2 .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) // 3 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { showResult(result); } }); } |
Function
import:import io.reactivex.functions.Function; |
- First, specify that the next operator should be called on the I/O thread.
- For each search query, you return a list of results.
- Finally, specify that code down the chain should be executed on the
main thread instead of on the I/O thread. In Android, all code that
works with
View
s should execute on the main thread.
Show Progress Bar with doOnNext
It’s time to display the progress bar!For that you’ll need a
doOnNext
operator. doOnNext
takes a Consumer
and allows you do something each time an item is emitted by observable.In the same
CheeseActivity
class modify onStart()
to the following:@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(AndroidSchedulers.mainThread()) // 2 .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { // 3 hideProgressBar(); showResult(result); } }); } |
- Ensure that the next operator in chain will be run on the main thread.
- Add the
doOnNext
operator so thatshowProgressBar()
will be called every time a new item is emitted. - Don’t forget to call
hideProgressBar()
when you are just about to display a result.
Observe Text Changes
What if you want to perform search automatically when the user types some text, just like Google?First, you need to subscribe to
TextView
text changes. Add the following method to the CheeseActivity
class://1 private Observable<String> createTextChangeObservable() { //2 Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { //3 final TextWatcher watcher = new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) {} @Override public void afterTextChanged(Editable s) {} //4 @Override public void onTextChanged(CharSequence s, int start, int before, int count) { emitter.onNext(s.toString()); } }; //5 mQueryEditText.addTextChangedListener(watcher); //6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { mQueryEditText.removeTextChangedListener(watcher); } }); } }); // 7 return textChangeObservable; } |
- Declare a method that will return an observable for text changes.
- Create
textChangeObservable
withcreate()
, which takes anObservableOnSubscribe
. - When an observer makes a subscription, the first thing to do is to create a
TextWatcher
. - You aren’t interested in
beforeTextChanged()
andafterTextChanged()
. When the user types andonTextChanged()
triggers, you pass the new text value to an observer. - Add the watcher to your
TextView
by callingaddTextChangedListener()
. - Don’t forget to remove your watcher. To do this, call
emitter.setCancellable()
and overwritecancel()
to callremoveTextChangedListener()
- Finally, return the created observable.
searchTextObservable
in onStart()
of CheeseActivity
as follows:Observable<String> searchTextObservable = createTextChangeObservable(); |
TextView
:Filter Queries by Length
It doesn’t make sense to search for queries as short as a single letter. To fix this, let’s introduce the powerfulfilter
operator.filter
passes only those items which satisfy a particular condition. filter
takes in a Predicate
, which is an interface that defines the test that input of a given type needs to pass, with a boolean
result. In this case, the Predicate takes a String
and returns true
if the string’s length is two or more characters.Replace
return textChangeObservable
in createTextChangeObservable()
with the following code:return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } }); |
Predicate
import with:import io.reactivex.functions.Predicate; |
length
less than 2
won’t get sent down the chain.Run the app; you should see the search kick off only when you type the second character:
Debounce operator
You don’t want to send a new request to the server every time the query is changed by one symbol.debounce
is one of those operators that shows the real power of reactive paradigm. Much like the filter
operator, debounce
, filters items emitted by the observable. But the decision on whether the item should be filtered out is made not based on what the item is, but based on when the item was emitted.debounce
waits for a specified amount of time after each
item emission for another item. If no item happens to be emitted during
this wait, the last item is finally emitted:In
createTextChangeObservable()
, add the debounce
operator just below the filter
so that the return
statement will look like the following code:return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } }).debounce(1000, TimeUnit.MILLISECONDS); // add this line |
debounce
waits for 1000 milliseconds before emitting the latest query text.Merge Operator
You started by creating an observable that reacted to button clicks and then implemented an observable that reacts to text field changes. But how do you react to both?There are a lot of operators to combine observables. The most simple and useful one is
merge
.merge
takes items from two or more observables and puts them into a single observable:Change the beginning of
onStart()
to the following:Observable<String> buttonClickStream = createButtonClickObservable(); Observable<String> textChangeStream = createTextChangeObservable(); Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream); |
RxJava and Activity/Fragment lifecycle
Remember thosesetCancellable
methods you set up? They won’t fire until the observable is unsubscribed.The
Observable.subscribe()
call returns a Disposable
. Disposable
is an interface that has two methods:public interface Disposable { void dispose(); // ends a subscription boolean isDisposed(); // returns true if resource is disposed (unsubscribed) } |
CheeseActivity
:private Disposable mDisposable; |
onStart()
, set the returned value of subscribe()
to mDisposable
with the following code (only the first line changes):mDisposable = searchTextObservable // change this line .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { hideProgressBar(); showResult(result); } }); |
onStart()
, onStop()
would be a perfect place to unsubscribe. Add the following code to CheeseActivity.java:
@Override protected void onStop() { super.onStop(); if (!mDisposable.isDisposed()) { mDisposable.dispose(); } } |
Where to Go From Here?
You can download the final project from this tutorial here.You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling
RxView.clicks(viewVariable)
.To learn more about RxJava refer to the ReactiveX documentation.
If you have any comments or questions, don’t hesitate to join the discussion below!
0 comments:
Post a Comment
Note: only a member of this blog may post a comment.