RxJava to Coroutines: end-to-end feature migration

(originally published on Medium)

Kotlin coroutines are much more than just lightweight threads — they are a new paradigm that helps developers to deal with concurrency in a structured and idiomatic way.

When developing an Android app one should consider many different things: taking long-running operations off the UI thread, handling lifecycle events, cancelling subscriptions, switching back to the UI thread to update the user interface. In the last couple of years RxJava became one of the most commonly used frameworks to solve this set of problems. In this article I’m going to guide you through the end-to-end feature migration from RxJava to coroutines.

Feature

The feature we are going to convert to coroutines is fairly simple: when user submits a country we make an API call to check if the country is eligible for a business details lookup via a provider like Companies House. If the call was successful we show the response, if not — the error message.

Migration

We are going to migrate our code in a bottom-up approach starting with Retrofit service, moving up to a Repository layer, then to an Interactor layer and finally to a ViewModel.

Functions that currently return Single should become suspending functions and functions that return Observable should return Flow. In this particular example we are not going to do anything with Flows.

Retrofit Service

Let’s jump straight into the code and refactor the businessLookupEligibility method in BusinessLookupService to coroutines. This is how it looks like now.

interface BusinessLookupService {
@GET("v1/eligibility")
@Query("countryCode") countryCode: String
}


Refactoring steps:

1. Starting with version 2.6.0 Retrofit supports the suspend modifier. Let’s turn the businessLookupEligibility method into a suspending function.
2. Remove the Single wrapper from the return type.

interface BusinessLookupService {
@GET("v1/eligibility")
@Query("countryCode") countryCode: String
}


NetworkResponse is a sealed class that represents BusinessLookupEligibilityResponse or ErrorResponse. NetworkResponse is constructed in a custom Retrofit call adapter. In this way we restrict data flow to only two possible cases — success or error, so consumers of BusinessLookupService don’t need to worry about exception handling.

Repository

Let’s move on and see what we have in BusinessLookupRepository. In the businessLookupEligibility method body we call businessLookupService.businessLookupEligibility (the one we have just refactored) and use RxJava’s map operator to transform NetworkResponse to a Result and map response model to domain model. Result is another sealed class that represents Result.Success and contains theBusinessLookupEligibility object in case if the network call was successful. If there was an error in the network call, deserialization exception or something else went wrong we construct Result.Failure with a meaningful error message (ErrorMessage is typealias for String).

class BusinessLookupRepository @Inject constructor(
private val responseToString: Mapper,
private val schedulerProvider: SchedulerProvider
) {
.map { response ->
return@map when (response) {
is NetworkResponse.Success -> {
}
responseToString.transform(response)
)
}
}.subscribeOn(schedulerProvider.io())
}
}


Refactoring steps:

1. businessLookupEligibility becomes a suspend function.
2. Remove the Single wrapper from the return type.
3. Methods in the repository are usually performing long-running tasks such as network calls or db queries. It is a responsibility of the repository to specify on which thread this work should be done. By subscribeOn(schedulerProvider.io()) we are telling RxJava that work should be done on the io thread. How could the same be achieved with coroutines? We are going to use withContext with a specific dispatcher to shift execution of the block to the different thread and back to the original dispatcher when the execution completes. It’s a good practice to make sure that a function is main-safe by using withContext. Consumers of BusinessLookupRepository shouldn’t think about which thread they should use to execute the businessLookupEligibility method, it should be safe to call it from the main thread.
4. We don’t need the map operator anymore as we can use the result of businessLookupService.businessLookupEligibility in a body of a suspend function.

class BusinessLookupRepository @Inject constructor(
private val responseToString: Mapper,
private val dispatcherProvider: DispatcherProvider
) {
withContext(dispatcherProvider.io) {
is NetworkResponse.Success -> {
}
responseToString.transform(response)
)
}
}
}


Interactor

In this specific example BusinessLookupEligibilityInteractor doesn’t contain any additional logic and serves as a proxy to BusinessLookupRepository. We use invoke operator overloading so the interactor could be invoked as a function.

class BusinessLookupEligibilityInteractor @Inject constructor(
) {
operator fun invoke(countryCode: String): Single<Result<BusinessLookupEligibility, ErrorMessage>> =
}


Refactoring steps:

1. operator fun invoke becomes suspend operator fun invoke.
2. Remove the Single wrapper from the return type.

class BusinessLookupEligibilityInteractor @Inject constructor(
) {
suspend operator fun invoke(countryCode: String): Result<BusinessLookupEligibility, ErrorMessage> =
}


ViewModel

In BusinessProfileViewModel we call BusinessLookupEligibilityInteractor that returns Single. We subscribe to the stream and observe it on the UI thread by specifying the UI scheduler. In case of Success we assign the value from a domain model to a businessViewState LiveData. In case of Failure we assign an error message.

We add every subscription to a CompositeDisposable and dispose them in the onCleared() method of a ViewModel’s lifecycle.

class BusinessProfileViewModel @Inject constructor(
private val schedulerProvider: SchedulerProvider
) : ViewModel() {

private val disposables = CompositeDisposable()

fun onCountrySubmit(country: Country) {
.observeOn(schedulerProvider.ui())
.subscribe { state ->
return@subscribe when (state) {
is Result.Success -> businessViewState.value = state.entity.provider
is Result.Failure -> businessViewState.value = state.failure
}
})
}

@Override
protected void onCleared() {
super.onCleared();
disposables.clear();
}
}


Refactoring steps:

1. In the beginning of the article I’ve mentioned one of the main advantages of coroutines — structured concurrency. And this is where it comes into play. Every coroutine has a scope. The scope has control over a coroutine via its job. If a job is cancelled then all the coroutines in the corresponding scope will be cancelled as well. You are free to create your own scopes, but in this case we are going leverage theViewModel lifecycle-aware viewModelScope. We will start a new coroutine in a viewModelScope using viewModelScope.launch. The coroutine will be launched in the main thread as viewModelScope has a default dispatcher — Dispatchers.Main. A coroutine started on Dispatchers.Main will not block the main thread while suspended. As we have just launched a coroutine, we can invoke businessLookupEligibilityInteractor suspending operator and get the result. businessLookupEligibilityInteractor calls BusinessLookupRepository.businessLookupEligibility what shifts execution to Dispatchers.IO and back to Dispatchers.Main. As we are in the UI thread we can update businessViewState LiveData by assigning a value.
2. We can get rid of disposables as viewModelScope is bound to a ViewModel lifecycle. Any coroutine launched in this scope is automatically canceled if the ViewModel is cleared.

class BusinessProfileViewModel @Inject constructor(
) : ViewModel() {

fun onCountrySubmit(country: Country) {
viewModelScope.launch {
when (val state = businessLookupEligibilityInteractor(country.countryCode)) {
is Result.Success -> businessViewState.value = state.entity.provider
is Result.Failure -> businessViewState.value = state.failure
}
}
}
}


Key takeaways

Reading and understanding code written with coroutines is quite easy, nonetheless it’s a paradigm shift that requires some effort to learn how to approach writing code with coroutines.

In this article I didn’t cover testing. I used the mockk library as I had issues testing coroutines using Mockito.

Everything I have written with RxJava I found quite easy to implement with coroutines, Flows and Channels. One of advantages of coroutines is that they are a Kotlin language feature and are evolving together with the language.

Similar posts

AdBlock has stolen the banner, but banners are not teeth — they will be back