Suspending over blocking

This article aims to show how to use Kotlin Coroutines and remove Reaxtive eXtensions (Rx).


Benefits


To start let's consider four benefits of Coroutines over Rx:


Suspending over Blocking


To run non-blocking code using Rx you'd write something like this:


Observable.interval(1, TimeUnit.SECONDS)
    .subscribe {
        textView.text = "$it seconds have passed"
    }

Which is effectively creating a new thread. Threads are heavy objects in terms of memory and performance.


Both are critical in the mobile development world.


You can achieve the same behavior using the following snippet:


launch {
    var i = 0
    while (true){
        textView.text = "${it++} seconds have passed"
        delay(1000)
    }
}

Essentially, Coroutines are light-weight threads but we don't create any real thread.
Here we are using non-blocking delay() function, which is a special suspending function that does not block a thread but suspends the Coroutine.


Natural backpressure handling over manual


Backpressure is when observables produce items more rapidly than their observers consume them.
While using Rx you have to explicitly specify how you will deal with backpressure.
There are 2 basic approaches:


  • Use throttling, buffers or windows operators
  • The reactive pull model

Whereas Coroutines can suspend they provide a natural answer to handling backpressure.
Thus, no additional actions are required.


Sync code style over async


The basic nature of a mobile app is to react to user actions. That is why Reactive eXtensions would be a good choice.


However, you have to write a code in a functional style. If you used to write in imperative style it could be a bit hard.


Whereas Coroutines enable you to write async code as if it was usual sync functions. For example,


suspend fun showTextFromRemote() {
    val text = remote.getText()
    textView.text = text
}

Even I am working with functional style for a long time it is still easier to read and debug an imperative code.


Native over 3rd party lib


Coroutines are a native build-in feature of Kotlin.


You don't have to add any additional dependencies. Currently, all the main libraries could deal with coroutines.


For example,


Retrofit


interface Api {

    @Get("users")
    suspend fun loadUsers() : List<User>
}

Room


interface Dao {

   @Update
   suspend fun update(user: UserEntity)
}

So, you can build an app which is all the way suspending — starting UI layer, through domain and ending in the data layer.


App


Let's go down to business. We will create a classic master-detail app.
The first page would contain an infinite list of deliveries.
On item click, we will open a detail page.
Also, we will support offline mode — all the data would be cached.
Moreover, I will use MVVM architecture where the ViewModel role is played by Fragment instead of ViewModel from AAC. There are several reasons:
Fragments are usually very bald — just bind viewModel to XML.


Features like setting status bar color couldn't be done in AAC ViewModel — you have to trigger fragment's method. Using fragment as ViewModel would allow us to store all the related functionality (managing one given screen) in one class.


First, let's create BaseViewModel:


abstract class BaseViewModel<B : BaseBindings, V : ViewDataBinding> : Fragment(), CoroutineScope  by CoroutineScope(Dispatchers.IO){

    protected abstract val layoutId: Int

    protected abstract val bindings: B

    protected lateinit var viewBinding: V

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        retainInstance = true
    }

    override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View? {
        viewBinding = DataBindingUtil.inflate(inflater, layoutId, container, false)

        return viewBinding.root
    }

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        viewBinding.lifecycleOwner = viewLifecycleOwner

        viewBinding.setVariable(BR.bindings, bindings)
    }

    override fun onDestroy() {
        cancel()

        super.onDestroy()
    }
}

We mark our ViewModel as CoroutineScope so that we can start coroutines inside view models and any launched coroutines would be limited to the lifecycle of a fragment.


We have to explicitly specify the end of scope's lifecycle calling cancel() method to cancel all the running requests to avoid memory leaks.


We set retainInstance = true so that in configuration changes fragment would not be recreated so that we can complete all long-running requests.


Also, we have to set lifecycleOwner to binding to turn on two-way data binding.


Exception handling


According to Coroutines documentation:


Coroutine builders come in two flavors: propagating exceptions automatically (launch and actor) or exposing them to users (async and produce). The former treat exceptions as unhandled, similar to Java's Thread.uncaughtExceptionHandler

Since we are using launch builder in the most cases we have to specify CoroutineExceptionHandler
CoroutineExceptionHandler is CoroutineContext.Element which could be used to build a coroutine context using plus operator.
I will declare static handler as follows:


val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
    Timber.e(throwable)
}

And change BaseViewModel:


abstract class BaseViewModel<B : BaseBindings, V : ViewDataBinding> : Fragment(), CoroutineScope by CoroutineScope(Dispatchers.IO + exceptionHandler)

From here on any exception occurred in launched coroutine inside ViewModel's scope would be delivered to given handler.
Next, I need to declare my API and DAO:


interface DeliveriesApi {

    @GET("deliveries")
    suspend fun getDeliveries(@Query("offset") offset: Int, @Query("limit") limit: Int): List<DeliveryResponse>
}

@Dao
interface DeliveryDao {

    @Query("SELECT * FROM ${DeliveryEntity.TABLE_NAME}")
    fun getAll(): DataSource.Factory<Int, DeliveryEntity>

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insert(delivery: DeliveryEntity)
}

As you can see I marked methods as suspended so that we can just declare expected response objects. Moreover, cancellation of parent coroutine will cancel network call as well.
The same for DAO.
The only difference is that we want to provide an ability to observe the database.
The easiest way is to use built-in live data support. But if we would mark getAll() as suspended it would cause a compilation error
error:


Not sure how to convert a Cursor to this method's return type ...

Here we don't need suspending because:


  • Db requests are performed in the background by default
  • Resulting LiveData is lifecycle aware so that we don't need to cancel it manually

We have to somehow combine remote and local data sources.
It is worthy to remember — there is should be an only single point of truth.
According to offline-first design, it would be local storage. So, we would observe the database state. When there is nothing to retrieve we would ask data from remote and insert it to the database.
We will introduce the Listing class


data class Listing<T>(
    val pagedList: LiveData<PagedList<T>>,
    val dataState: LiveData<DataState>,
    val refreshState: LiveData<DataState>,
    val refresh: () -> Unit,
    val retry: () -> Unit
)

Let's go val by val:


  • pagedList — the main data which is constructed as PagedList to enable infinite scrolling and wrapped with LiveData to enable data observing
  • dataState — one of three states in which our data could be: Success, Running, Error. Also wrapped to LiveData to observe changes
  • refreshState — when we trigger data refreshing through swipe-to-refresh we need some tool by which we would distinguish between refresh request feedback and next page request feedback. For the former one, we want to show an error at the end of the list but for refresh error, we want to show a toast message and hide a loader.
  • refresh() — callback to trigger on swipe-to-refresh
  • retry() — callback to trigger on pagedList loading error
    Next, list view model:


    class DeliveryListViewModel : BaseViewModel<DeliveryListBindings, DeliveryListBinding>(), DeliveryListBindings, DeliveryListItemBindings, DeliveryListErrorBindings {
    
    override val layoutId: Int = R.layout.delivery_list
    
    override val bindings: DeliveryListBindings = this
    
    private val deliveryGateway: DeliveryGateway by inject { parametersOf(this) }
    
    private val listing = deliveryGateway.getDeliveries()
    
    override val dataState = listing.dataState
    
    override val isRefreshing = Transformations.switchMap(listing.refreshState) {
        MutableLiveData(it == DataState.Loading)
    }
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
    
        setupList()
    
        setupRefresh()
    }
    
    private fun setupList() {
        val adapter = DeliveriesAdapter(this, this)
    
        viewBinding.deliveries.adapter = adapter
        viewBinding.deliveries.setHasFixedSize(true)
    
        listing.pagedList.observe(viewLifecycleOwner, Observer {
            adapter.submitList(it)
        })
    
        listing.dataState.observe(viewLifecycleOwner, Observer {
            adapter.updateDataState(it)
        })
    }
    
    private fun setupRefresh() {
        listing.refreshState.observe(viewLifecycleOwner, Observer {
            if (it is DataState.Error) {
                Toast.makeText(context, it.message, LENGTH_SHORT).show()
            }
        })
    }
    
    override fun refresh() {
        listing.refresh()
    }
    
    override fun onDeliveryClicked(delivery: Delivery) {
        view?.findNavController()?.navigate(DeliveryListViewModelDirections.toDetails(delivery))
    }
    
    override fun onRetryClicked() {
        listing.retry()
    }
    }

    Let's start from class declaration.



First of all DeliveryListBindings and DeliveryListBinding. First is our declared interface to glue view model with XML view. Second is the autogenerated class based on XML. We need the second one to set our bindings interface and lifecycle to XML.


Moreover, it is good practice to reference views using this autogenerated binding rather than using kotlin's synthetic.


There is could be the case when referenced through synthetic view doesn't exist in the current view. With data binding, you will fail fast even on compilation stage.


Next, three interfaces: DeliveryListBindings, DeliveryListItemBindings, DeliveryListErrorBindings.


  1. DeliveryListBindings — bindings for the screen itself. For example, it contains refresh() method which is called on vertical swipe.
  2. DeliveryListItemBindings — bindings for an item in the list. For example, onClicked()
  3. DeliveryListErrorBindings — bindings for error view which is also the list item shown on error state. For example, it contains retry() method

Thus, we are handling everything in the single view model since it is a single screen but also following Interface Segregation principle


Let's turn special attention to this line:


private val deliveryGateway: DeliveryGateway by inject { parametersOf(this) }

DeliveryGateway needs to perform requests out of the main thread. So, it needs either to declare methods as suspended or CoroutineScope to launch new coroutines on this scope. We would choose the second approach since we need our LiveData from the beginning and then we would just wait for updates from it. It is very similar to subscribing to liveData instance when we are passing lifecycleOwner(which often refers to 'this'). Here are in the same way we are passing 'this' as CoroutineScope


CoroutineScope interface consists of a sole field — CoroutineContext. In essence, a scope and a context are the same things. The difference between a context and a scope is in their intended purpose.


To learn more about this I would recommend an article by Roman Elizarov. So, providing scope to DeliveryGateway will also result in using the same context. Specifically thread, job and exception handler.
Now let's take a look at DeliveryGateway itself:


class DeliveryBoundGateway(
    private val db: DataBase,
    private val api: DeliveriesApi,
    private val deliveryDao: DeliveryDao,
    private val coroutineScope: CoroutineScope
) : DeliveryGateway {

    private val boundaryCallback = DeliveriesBoundaryCallback(
        api = api,
        coroutineScope = coroutineScope,
        handleResponse = { insertIntoDatabase(it) }
    )

    @MainThread
    override fun getDeliveries(): Listing<Delivery> {
        val refreshTrigger = MutableLiveData<Unit>()
        val refreshState = Transformations.switchMap(refreshTrigger) { refresh() }

        val pagingConfig = Config(
            initialLoadSizeHint = PAGE_SIZE,
            pageSize = PAGE_SIZE,
            prefetchDistance = PAGE_SIZE
        )

        val deliveries = deliveryDao.getAll()
            .toLiveData(
                config = pagingConfig,
                boundaryCallback = boundaryCallback
            )

        return Listing(
            pagedList = deliveries,
            dataState = boundaryCallback.dataState,
            retry = { boundaryCallback.helper.retryAllFailed() },
            refresh = { refreshTrigger.value = null },
            refreshState = refreshState
        )
    }

    /**
     * When refresh is called, we simply run a fresh network request and when it arrives, clear
     * the database table and insert all new items in a transaction.
     * <p>
     * Since the PagedList already uses a database bound data source, it will automatically be
     * updated after the database transaction is finished.
     */
    @MainThread
    private fun refresh(): LiveData<DataState> {
        boundaryCallback.refresh()

        val dataState = MutableLiveData<DataState>()
        dataState.value = DataState.Loading

        coroutineScope.launch {
            try {
                val deliveries = api.getDeliveries(0, PAGE_SIZE)

                db.withTransaction {
                    deliveryDao.clear()
                    insertIntoDatabase(deliveries)
                }

                dataState.postValue(DataState.Loaded)
            } catch (throwable: Throwable) {
                Timber.w(throwable)
                dataState.postValue(DataState.Error(throwable.message))
            }
        }

        return dataState
    }

    private suspend fun insertIntoDatabase(deliveries: List<DeliveryResponse>) {
        deliveries.forEach { delivery ->
            val entity = deliveryConverter.fromNetwork(delivery)
            deliveryDao.insert(entity)
        }
    }

    companion object {
        const val PAGE_SIZE = 20
    }
}

Here we are building LiveData structure from the beginning and then using coroutines load data and post it to the LiveData. Also, we are using the implementation of PagedList.BoundaryCallback() to glue local database and remote API. When we reach the end of the paged list boundaryCallback is triggered and loads next chunk of data.

As you can see we are using coroutineScope to launch new coroutines.


Since this scope equals to the fragment's lifecycle — all pending requests would be canceled on fragment's onDestroy() callback.


The delivery detail page is quite straightforward — we just pass a Delivery object as Parcelable from the master screen using navigation component save args plugin. On details screen simply bind given an object to an XML.


class DeliveryViewModel : BaseViewModel<DeliveryBindings, DeliveryBinding>(), DeliveryBindings {

    override val layoutId: Int = R.layout.delivery

    override val bindings: DeliveryBindings = this

    private val args: DeliveryViewModelArgs by navArgs()

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        viewBinding.delivery = args.delivery

        viewBinding.image.clipToOutline = true
    }
}

Contact me


Here is the link to the github source code.


You are welcome to leave comments and open issues.

Share post
AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 3

    0
    Which is effectively creating a new thread. Threads are heavy objects in terms of memory and performance.

    Essentially, Coroutines are light-weight threads but we don't create any real thread.

    Actually, both examples with Rx and coroutines are doing the same: run the task on a shared thread of pools.

      0

      You can check on which thread each peace of code is running with
      println("Thread is ${Thread.currentThread().name}")


      For Rx it would be 'Thread is RxComputationThreadPool-1'
      For Coroutines 'Thread is main'


      As you can see Coroutines don't create any additional threads

        0
        For Coroutines 'Thread is main'

        Well, yeah, if you're launching it in main thread dispatcher's context, but by default it runs in a shared thread pool.


        You can achieve the same result with Rx by using observeOn/subscribeOn. Moreover, you'll have to subscribe on main thread if you're working with UI, otherwise you'll get an exception, as you can't modify views from any other thread.


        My point is that it's incorrect to contrapose rx and coroutines in terms of threads management – both can be launched on main thread / shared pool of threads / separate thread, there's no difference between them in this case.

    Only users with full accounts can post comments. Log in, please.