Pull to refresh

RxJava to Coroutines: end-to-end feature migration

Reading time 7 min
Views 4.5K
image

(originally published on Medium)

Kotlin coroutines are much more than just lightweight threads — they are a new paradigm that helps developers to deal with concurrency in a structured and idiomatic way.

When developing an Android app one should consider many different things: taking long-running operations off the UI thread, handling lifecycle events, cancelling subscriptions, switching back to the UI thread to update the user interface. In the last couple of years RxJava became one of the most commonly used frameworks to solve this set of problems. In this article I’m going to guide you through the end-to-end feature migration from RxJava to coroutines.

Feature


The feature we are going to convert to coroutines is fairly simple: when user submits a country we make an API call to check if the country is eligible for a business details lookup via a provider like Companies House. If the call was successful we show the response, if not — the error message.

Migration



We are going to migrate our code in a bottom-up approach starting with Retrofit service, moving up to a Repository layer, then to an Interactor layer and finally to a ViewModel.

Functions that currently return Single should become suspending functions and functions that return Observable should return Flow. In this particular example we are not going to do anything with Flows.

Retrofit Service


Let’s jump straight into the code and refactor the businessLookupEligibility method in BusinessLookupService to coroutines. This is how it looks like now.

interface BusinessLookupService {
    @GET("v1/eligibility")
    fun businessLookupEligibility(
        @Query("countryCode") countryCode: String
    ): Single<NetworkResponse<BusinessLookupEligibilityResponse, ErrorResponse>>
}

Refactoring steps:

  1. Starting with version 2.6.0 Retrofit supports the suspend modifier. Let’s turn the businessLookupEligibility method into a suspending function.
  2. Remove the Single wrapper from the return type.

interface BusinessLookupService {
    @GET("v1/eligibility")
    suspend fun businessLookupEligibility(
        @Query("countryCode") countryCode: String
    ): NetworkResponse<BusinessLookupEligibilityResponse, ErrorResponse>
}

NetworkResponse is a sealed class that represents BusinessLookupEligibilityResponse or ErrorResponse. NetworkResponse is constructed in a custom Retrofit call adapter. In this way we restrict data flow to only two possible cases — success or error, so consumers of BusinessLookupService don’t need to worry about exception handling.

Repository


Let’s move on and see what we have in BusinessLookupRepository. In the businessLookupEligibility method body we call businessLookupService.businessLookupEligibility (the one we have just refactored) and use RxJava’s map operator to transform NetworkResponse to a Result and map response model to domain model. Result is another sealed class that represents Result.Success and contains theBusinessLookupEligibility object in case if the network call was successful. If there was an error in the network call, deserialization exception or something else went wrong we construct Result.Failure with a meaningful error message (ErrorMessage is typealias for String).

class BusinessLookupRepository @Inject constructor(
    private val businessLookupService: BusinessLookupService,
    private val businessLookupApiToDomainMapper: BusinessLookupApiToDomainMapper,
    private val responseToString: Mapper,
    private val schedulerProvider: SchedulerProvider
) {
    fun businessLookupEligibility(countryCode: String): Single<Result<BusinessLookupEligibility, ErrorMessage>> {
        return businessLookupService.businessLookupEligibility(countryCode)
            .map { response ->
                return@map when (response) {
                    is NetworkResponse.Success -> {
                        val businessLookupEligibility = businessLookupApiToDomainMapper.map(response.body)
                        Result.Success<BusinessLookupEligibility, ErrorMessage>(businessLookupEligibility)
                    }
                    is NetworkResponse.Error -> Result.Failure<BusinessLookupEligibility, ErrorMessage>(
                        responseToString.transform(response)
                    )
                }
            }.subscribeOn(schedulerProvider.io())
    }
}

Refactoring steps:

  1. businessLookupEligibility becomes a suspend function.
  2. Remove the Single wrapper from the return type.
  3. Methods in the repository are usually performing long-running tasks such as network calls or db queries. It is a responsibility of the repository to specify on which thread this work should be done. By subscribeOn(schedulerProvider.io()) we are telling RxJava that work should be done on the io thread. How could the same be achieved with coroutines? We are going to use withContext with a specific dispatcher to shift execution of the block to the different thread and back to the original dispatcher when the execution completes. It’s a good practice to make sure that a function is main-safe by using withContext. Consumers of BusinessLookupRepository shouldn’t think about which thread they should use to execute the businessLookupEligibility method, it should be safe to call it from the main thread.
  4. We don’t need the map operator anymore as we can use the result of businessLookupService.businessLookupEligibility in a body of a suspend function.

class BusinessLookupRepository @Inject constructor(
    private val businessLookupService: BusinessLookupService,
    private val businessLookupApiToDomainMapper: BusinessLookupApiToDomainMapper,
    private val responseToString: Mapper,
    private val dispatcherProvider: DispatcherProvider
) {
    suspend fun businessLookupEligibility(countryCode: String): Result<BusinessLookupEligibility, ErrorMessage> =
        withContext(dispatcherProvider.io) {
            when (val response = businessLookupService.businessLookupEligibility(countryCode)) {
                is NetworkResponse.Success -> {
                    val businessLookupEligibility = businessLookupApiToDomainMapper.map(response.body)
                        Result.Success<BusinessLookupEligibility, ErrorMessage>(businessLookupEligibility)
                    }
                    is NetworkResponse.Error -> Result.Failure<BusinessLookupEligibility, ErrorMessage>(
                        responseToString.transform(response)
                    )
            }
        }
}

Interactor


In this specific example BusinessLookupEligibilityInteractor doesn’t contain any additional logic and serves as a proxy to BusinessLookupRepository. We use invoke operator overloading so the interactor could be invoked as a function.

class BusinessLookupEligibilityInteractor @Inject constructor(
    private val businessLookupRepository: BusinessLookupRepository
) {
    operator fun invoke(countryCode: String): Single<Result<BusinessLookupEligibility, ErrorMessage>> =
        businessLookupRepository.businessLookupEligibility(countryCode)
}

Refactoring steps:

  1. operator fun invoke becomes suspend operator fun invoke.
  2. Remove the Single wrapper from the return type.

class BusinessLookupEligibilityInteractor @Inject constructor(
    private val businessLookupRepository: BusinessLookupRepository
) {
   suspend operator fun invoke(countryCode: String): Result<BusinessLookupEligibility, ErrorMessage> =
      businessLookupRepository.businessLookupEligibility(countryCode)
}

ViewModel


In BusinessProfileViewModel we call BusinessLookupEligibilityInteractor that returns Single. We subscribe to the stream and observe it on the UI thread by specifying the UI scheduler. In case of Success we assign the value from a domain model to a businessViewState LiveData. In case of Failure we assign an error message.

We add every subscription to a CompositeDisposable and dispose them in the onCleared() method of a ViewModel’s lifecycle.

class BusinessProfileViewModel @Inject constructor(
    private val businessLookupEligibilityInteractor: BusinessLookupEligibilityInteractor,
    private val schedulerProvider: SchedulerProvider
) : ViewModel() {
    
    private val disposables = CompositeDisposable()
    internal val businessViewState: MutableLiveData<ViewState> = LiveDataFactory.createDefault("Loading...")

    fun onCountrySubmit(country: Country) {
        disposables.add(businessLookupEligibilityInteractor(country.countryCode)
            .observeOn(schedulerProvider.ui())
            .subscribe { state ->
                return@subscribe when (state) {
                    is Result.Success -> businessViewState.value = state.entity.provider
                    is Result.Failure -> businessViewState.value = state.failure
                }
            })
    }

    @Override
    protected void onCleared() {
        super.onCleared();
        disposables.clear();
    }
}

Refactoring steps:

  1. In the beginning of the article I’ve mentioned one of the main advantages of coroutines — structured concurrency. And this is where it comes into play. Every coroutine has a scope. The scope has control over a coroutine via its job. If a job is cancelled then all the coroutines in the corresponding scope will be cancelled as well. You are free to create your own scopes, but in this case we are going leverage theViewModel lifecycle-aware viewModelScope. We will start a new coroutine in a viewModelScope using viewModelScope.launch. The coroutine will be launched in the main thread as viewModelScope has a default dispatcher — Dispatchers.Main. A coroutine started on Dispatchers.Main will not block the main thread while suspended. As we have just launched a coroutine, we can invoke businessLookupEligibilityInteractor suspending operator and get the result. businessLookupEligibilityInteractor calls BusinessLookupRepository.businessLookupEligibility what shifts execution to Dispatchers.IO and back to Dispatchers.Main. As we are in the UI thread we can update businessViewState LiveData by assigning a value.
  2. We can get rid of disposables as viewModelScope is bound to a ViewModel lifecycle. Any coroutine launched in this scope is automatically canceled if the ViewModel is cleared.

class BusinessProfileViewModel @Inject constructor(
    private val businessLookupEligibilityInteractor: BusinessLookupEligibilityInteractor
) : ViewModel() {

    internal val businessViewState: MutableLiveData<ViewState> = LiveDataFactory.createDefault("Loading...")

    fun onCountrySubmit(country: Country) { 
        viewModelScope.launch {
            when (val state = businessLookupEligibilityInteractor(country.countryCode)) {
                is Result.Success -> businessViewState.value = state.entity.provider
                is Result.Failure -> businessViewState.value = state.failure
            }
        }
    }
}

Key takeaways


Reading and understanding code written with coroutines is quite easy, nonetheless it’s a paradigm shift that requires some effort to learn how to approach writing code with coroutines.

In this article I didn’t cover testing. I used the mockk library as I had issues testing coroutines using Mockito.

Everything I have written with RxJava I found quite easy to implement with coroutines, Flows and Channels. One of advantages of coroutines is that they are a Kotlin language feature and are evolving together with the language.
Tags:
Hubs:
+3
Comments 2
Comments Comments 2

Articles