Single Source Of Truth with RxJava in Android

Photo by Pablo Orcaray on Unsplash

Who doesn’t love offline applications. But with multiple sources of data comes great responsibility. In this story, I am going to talk about how I typically implement offline applications.

What is Single Source Of Truth?

Well, the name is self sufficient. It means we should only be using a single source as our ground truth. In my opinion SSOT(Single Source Of Truth) is very scalable pattern. When I was building chat application in my company, I implemented SSOT. At first it seemed like over engineering. But after few months the hard work paid off. Being a chat app, offline feature is a most(I should be able to access messages that I have previously viewed even if there is no internet connection now). At first it was just local database (implemented with Room) and network (implemented with Retrofit). But soon other sources joined on. We had our Websocket implementation. Then we had firebase notifications. Since I had implemented SSOT it didn’t matter where the data was coming from. All I had to do was to put it into local database and that’s it.

That was just some benefits of SSOT. The theory is simple but it’s difficult to well execute. SSOT becomes quite challenging when we have to take into account the loading, error status of various sources.

In this story I will be demonstrating SSOT by developing a simple application that downloads a list of movies from the server and stores in the local db. We also give user access to remove the movies from the local db.

TL DR; Here is the link to the repository

What is our requirements?

Well that’s a hell lot of a requirements. Looks like we are going to have a dinner full of “if else” (😆). Well fear not, RxJava to the rescue. We will use RxJava to simplify most of these requirements into a reactive stream. Let’s get started.

We will only provide a simple functionality of storing all the movies list and removing a single movie. I will only show the repository and activity code for simplicity. You can refer to the source code for implementation details.

First let’s implement getMoviesFromDBfunction in the repository. Those unfamiliar with LCE I recommend reading this article.

Let’s break it down

Now let’s implement a simple function in repository called getMoviesFromServer to fetch movies from server like below

So let’s look through the function. The steps taken here are

Let’s merge these sources together with merge operator.

Let’s look at the View portion. I will be using LCEE RecyclerView , you can learn more about it in my another article.

This is the ViewModel code, it is just setting the schedulers for the observable.

This is the activity code, it checks the LCE and renders the appropriate view.

It’s pretty straight forward.

For Removing Movies, here is the repository

Here is the ViewModel

Here is the activity code. We passed a lambda that gets called when the remove button is clicked on the list.

Too much code, let’s look at how it looks now.

The movies are loaded as usual and removing the the movies work fine. Wait, why is the last movie not being removed. I checked the DB and it’s gone from there. Here comes the problem, remember this code,

    private fun getMoviesFromDB(): Observable<Lce<List<Movies>>> {
return moviesDao.getAllMovies()
.filter {
it.isNotEmpty()
}
.map {
Lce.Content(it)
}
}

Since we are filtering the empty list once the last movies is removed the DB is empty but it gets blocked by this filter. So the views shows the last movie as it doesn’t get empty list. It fails our requirement 4. Let’s fix that.

This needs some explanation.

This solves our problem and full fills requirement 4 as we don’t emit empty list for the first time and then emit the empty list if the DB is empty for successive emissions.

This is how it looks now

Well there is another problem. Notice the central loader works fine when the DB is empty. Then when we relaunch the app, we want the SwipeToRefresh loader to appear to indicate that we are refreshing the data but it doesn’t appear. The problem lies here

.startWith(Lce.Loading())

Since we are starting the network request with a loading then using merge to combine it with local db.

return Observable.merge(getMoviesFromDB(), getMoviesFromServer())

The flow that we want is

(DB Content) -> Loading -> (Server Content)

Since the merge operator doesn’t gaurantee the sequence of events, well it doesn’t happen as expected. So, the actual flow is like this

Loading -> (DB Content) -> (Server Content)

Here the DB content overrides the Loading. Eagle eyed user might notice the brief loading but others will not. You might be tempted to use concat operator instead of merge . Since our DB technically never completes (as we are observing the table) the server request is never sent. Also you might be thinking of using concat like this.

return Observable.concat(getMoviesFromServer(), getMoviesFromDB())

But the main point of using local DB is that the user will see the content immediately. With the above approach the user will have to wait for server response to see the local content which is pointless in my opinion.

Enough of this edge cases, let’s try to solve this. First let’s make our requirements clear.

We want the DB to behave exactly like before except this time we want to fire the network request after the first emission from the DB. This is to make sure the loading state come after the DB content.

Let’s look at the result now

Well here comes another issue, the SwipeToRefresh is showing but it’s not stopping. This simple looking SSOT is giving too much headache now. So let’s fix it for the last time. It took me a while to figure out the issue. So the problem is we are using concat operator so the second observable skip will not emit unless the first one completes. As we are inserting into DB within the getMoviesFromServer function which is in the first observable, the second observable misses this change in DB and doesn’t emit when it’s subscribed. So With the magic of RxJava here is the final working version.

To avoid any race conditions I have used publish operator to share the observable. So we are using concatEager operator that subscribes to both the observables and buffers their emission, then it emits them in the order the observables are subscribed. This is the final result.

I could have just shown you the final code and be done with it. But I think this way of finding the problem and solving it step by step helps us understand the core problem. If you have any questions or feedback let me know in the comments below.

android developer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store