Single Source Of Truth with RxJava in Android

Suson Thapa
7 min readJul 16, 2020

--

Photo by Pablo Orcaray on Unsplash

Who doesn’t love offline applications. But with multiple sources of data comes great responsibility. In this story, I am going to talk about how I typically implement offline applications.

What is Single Source Of Truth?

Well, the name is self sufficient. It means we should only be using a single source as our ground truth. In my opinion SSOT(Single Source Of Truth) is very scalable pattern. When I was building chat application in my company, I implemented SSOT. At first it seemed like over engineering. But after few months the hard work paid off. Being a chat app, offline feature is a most(I should be able to access messages that I have previously viewed even if there is no internet connection now). At first it was just local database (implemented with Room) and network (implemented with Retrofit). But soon other sources joined on. We had our Websocket implementation. Then we had firebase notifications. Since I had implemented SSOT it didn’t matter where the data was coming from. All I had to do was to put it into local database and that’s it.

That was just some benefits of SSOT. The theory is simple but it’s difficult to well execute. SSOT becomes quite challenging when we have to take into account the loading, error status of various sources.

In this story I will be demonstrating SSOT by developing a simple application that downloads a list of movies from the server and stores in the local db. We also give user access to remove the movies from the local db.

TL DR; Here is the link to the repository

What is our requirements?

  1. When the user first opens the app, it should show a central loading if there is no content in the local db, otherwise it should show the contents and just show the pull down to refresh loading view.
  2. Only when there is no data from the server, then we should show empty view even if the local db is empty.
  3. If there is some error in network then it should only show the error if there is not any data in the list.
  4. User should be able to remove the items from the local db, and it should show a empty view after all the data is removed from the local db.

Well that’s a hell lot of a requirements. Looks like we are going to have a dinner full of “if else” (😆). Well fear not, RxJava to the rescue. We will use RxJava to simplify most of these requirements into a reactive stream. Let’s get started.

We will only provide a simple functionality of storing all the movies list and removing a single movie. I will only show the repository and activity code for simplicity. You can refer to the source code for implementation details.

First let’s implement getMoviesFromDBfunction in the repository. Those unfamiliar with LCE I recommend reading this article.

Let’s break it down

  1. Get all the movies from the DB
  2. Filter: Only forward the movies list if it’s not empty (requirement 1 and 2)
  3. Map :Convert the movies to LCE

Now let’s implement a simple function in repository called getMoviesFromServer to fetch movies from server like below

So let’s look through the function. The steps taken here are

  1. Get response from the server
  2. Map: Check the response, if it’s “True” then convert the response to DB entity and save to db and return the result(technically we won’t use the result). If the response is false just emit an Error.
  3. onErrorReturn: The third step is to use onErrorReturn to return custom error instead of passing down a terminal event. We are using this because there might be network errors and since this observable will be used with other observable, we don’t want to terminate the entire observable chain.
  4. Filter: The fourth step is quite confusing as we are only passing the emission if it’s a error or if the content is empty. Well since we are implementing SSOT, we don’t depend on network result so we just block the actual content and only pass the errors and content if it’s empty (requirement 2).
  5. startWith: This is quite intuitive as we always start network call with loading.

Let’s merge these sources together with merge operator.

Let’s look at the View portion. I will be using LCEE RecyclerView , you can learn more about it in my another article.

This is the ViewModel code, it is just setting the schedulers for the observable.

This is the activity code, it checks the LCE and renders the appropriate view.

It’s pretty straight forward.

  1. Loading: We show the central loading if the list is empty and show the SwipeToRefresh loading if the list is not empty.
  2. Content: We check for empty list and show the empty view. If the list is not empty we hide all other extra views and add the movies to the adapter and also hide the SwipeToRefresh loading.
  3. Error: We show the error view only when the list is empty and also hide the SwipeToRefresh loading.

For Removing Movies, here is the repository

Here is the ViewModel

Here is the activity code. We passed a lambda that gets called when the remove button is clicked on the list.

Too much code, let’s look at how it looks now.

The movies are loaded as usual and removing the the movies work fine. Wait, why is the last movie not being removed. I checked the DB and it’s gone from there. Here comes the problem, remember this code,

    private fun getMoviesFromDB(): Observable<Lce<List<Movies>>> {
return moviesDao.getAllMovies()
.filter {
it.isNotEmpty()
}
.map {
Lce.Content(it)
}
}

Since we are filtering the empty list once the last movies is removed the DB is empty but it gets blocked by this filter. So the views shows the last movie as it doesn’t get empty list. It fails our requirement 4. Let’s fix that.

This needs some explanation.

  1. First we are getting all the movies and only taking the first emission and apply the same filter as previous one. This means we don’t emit empty list for the first time if the DB is empty.
  2. Then we emit all data from the second emission on wards even if the DB is empty.

This solves our problem and full fills requirement 4 as we don’t emit empty list for the first time and then emit the empty list if the DB is empty for successive emissions.

This is how it looks now

Well there is another problem. Notice the central loader works fine when the DB is empty. Then when we relaunch the app, we want the SwipeToRefresh loader to appear to indicate that we are refreshing the data but it doesn’t appear. The problem lies here

.startWith(Lce.Loading())

Since we are starting the network request with a loading then using merge to combine it with local db.

return Observable.merge(getMoviesFromDB(), getMoviesFromServer())

The flow that we want is

(DB Content) -> Loading -> (Server Content)

Since the merge operator doesn’t gaurantee the sequence of events, well it doesn’t happen as expected. So, the actual flow is like this

Loading -> (DB Content) -> (Server Content)

Here the DB content overrides the Loading. Eagle eyed user might notice the brief loading but others will not. You might be tempted to use concat operator instead of merge . Since our DB technically never completes (as we are observing the table) the server request is never sent. Also you might be thinking of using concat like this.

return Observable.concat(getMoviesFromServer(), getMoviesFromDB())

But the main point of using local DB is that the user will see the content immediately. With the above approach the user will have to wait for server response to see the local content which is pointless in my opinion.

Enough of this edge cases, let’s try to solve this. First let’s make our requirements clear.

We want the DB to behave exactly like before except this time we want to fire the network request after the first emission from the DB. This is to make sure the loading state come after the DB content.

Let’s look at the result now

Well here comes another issue, the SwipeToRefresh is showing but it’s not stopping. This simple looking SSOT is giving too much headache now. So let’s fix it for the last time. It took me a while to figure out the issue. So the problem is we are using concat operator so the second observable skip will not emit unless the first one completes. As we are inserting into DB within the getMoviesFromServer function which is in the first observable, the second observable misses this change in DB and doesn’t emit when it’s subscribed. So With the magic of RxJava here is the final working version.

To avoid any race conditions I have used publish operator to share the observable. So we are using concatEager operator that subscribes to both the observables and buffers their emission, then it emits them in the order the observables are subscribed. This is the final result.

I could have just shown you the final code and be done with it. But I think this way of finding the problem and solving it step by step helps us understand the core problem. If you have any questions or feedback let me know in the comments below.

--

--

Suson Thapa
Suson Thapa

Written by Suson Thapa

Android | iOS | Flutter | ReactNative — Passionate Software Engineer

Responses (2)