
О том, как удобно писать на Rx, какие операторы и сущности в нём есть, как делать запросы в сеть и работать с многопоточностью, не писал разве что ленивый. Кто-то рассказывает, что можно «обмазаться» Rx-ом на всех уровнях архитектуры и приложение станет реактивным, а разработка — в разы быстрее. Но как только слова доходят до дела, то встаёт множество вопросов, как подружить Rx со спецификой UI и системными API и зачем нужен Rx, кроме как для многопоточности. В этой статье я хотел бы рассказать о нашем видении разработки в реактивном стиле и на реальных примерах показать, как он упрощает жизнь.
Наверное, многие видели доклад Джейка Вортона про RxJava2. Всем, кто не смотрел, категорически советую. В нём не столь важно описание того, что умеет RxJava и какие объекты там есть. В этом докладе Вортон показал проблему нереактивных подходов к разработке — неочевидность потоков обработки данных, отсутствие чёткого и однонаправленного пути, по которому идут события, начиная от пользовательского ввода и заканчивая изменением интерфейса, которое повлекли действия пользователя.
И эта проблема очень сильно прослеживается в MVP. То, что Presenter и View имеют ссылки друг на друга, ставит крест на однонаправленности данных; начинаются споры о том, насколько View должна быть пассивной, а в это время дебаггером приходится гулять по коду от одного метода к другому, чтобы отследить цепочку действий. Эту проблему начинают решать MVVM и подобные подходы, и мы пошли по их пути.
Команда Android-разработки FunCorp вдохновилась докладом Вортона и решила попробовать написать абсолютно всё на Rx. Изначально такой цели не было, но по ходу дела мы поняли, что использование реактивного подхода в тех местах, где он кажется очень странным, сильно упрощает жизнь и делает код очевиднее.
Первое, что мы сделали, это подключили RxBinding — теперь наши View умеют в Rx. Это начало любого действия в приложении, то есть начало цепочки обработки данных.
implementation "com.jakewharton.rxbinding3:rxbinding-core:$rx_binding_version"
Мы решили, что View будет максимально пассивной и не будет самостоятельно инициировать никаких действий.
У View есть два типа методов:
- изменения какого-либо свойства;
- наблюдатель на событие пользовательского ввода.
class AuthFullRegistrationView @Inject constructor(): BaseArchView() { fun doneClick(): Observable<Unit> = viewHolder.doneBtn.clicks() fun loginClick(): Observable<Unit> = viewHolder.loginBtn.clicks() fun nicknameText(): Observable<CharSequence> = viewHolder.nickname.textChanges() fun passwordText(): Observable<CharSequence> = viewHolder.password.textChanges() fun emailText(): Observable<CharSequence> = viewHolder.email.textChanges() fun setNickname(nickname: String?) { viewHolder.nickname.setText(nickname) } fun setNicknameError(enabled: Boolean, text: String? = null) { viewHolder.nicknameRegistrationView.error = text viewHolder.nicknameRegistrationView.isErrorEnabled = enabled } fun setEmailError(enabled: Boolean, text: String? = null) { viewHolder.emailRegistrationView.error = text viewHolder.emailRegistrationView.isErrorEnabled = enabled } fun setPasswordError(enabled: Boolean, text: String? = null) { viewHolder.passwordRegistrationView.error = text viewHolder.passwordRegistrationView.isErrorEnabled = enabled } }
Такие View становятся максимально переиспользуемыми, так как ни от кого не зависят (чего нельзя сказать о View в MVP).
Бизнес-логика и пользовательские действия соединяются в Presenter. Presenter подписывает изменение доменной модели на события от View. В onNext/OnError View обновляется исходя из итогового состояния модели. В самом простом варианте это выглядит так:
archView.loginClick() .subscribe { authNavigationController.goToLogin(LoginParams()) }
Сразу встаёт вопрос о том, как отписывать все эти цепочки действий при уходе с экрана. Он решается через CompositeDisposable и экстеншен-функцию.
abstract class SimpleArchPresenter<V: ArchView> { var args: Any? = null var state: Bundle? = null val compositeDisposable = CompositeDisposable() @CallSuper open fun bindIntents(archView: V) { } @CallSuper open fun unbindIntents() { compositeDisposable.clear() } }
fun Disposable.addToDisposable(compositeDisposable: CompositeDisposable): Disposable { compositeDisposable.add(this) return this }
Теперь наша цепочка приобретает следующий вид:
archView.loginClick() .subscribe { authNavigationController.goToLogin(LoginParams()) } .addToDisposable(compositeDisposable)
Появляются более сложные сценарии обработки. Например, регистрация пользователя:
- клик по кнопке Sign in;
- считываем имя пользователя и пароль из полей ввода
- запрос к API;
- в случае успеха переходим на следующий экран или показываем ошибку.
private fun bindRegistrationFlow(archView: AuthFullRegistrationView) { val handleFields = Observable .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(), Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence -> FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText) }) .take(1) archView.doneClick() .flatMap { handleFields } .flatMap { fullRegistrationInteractor.registration(it) .subscribeOn(Schedulers.io()) } .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail() }, { handleError(archView, it) }) .addToDisposable(compositeDisposable) }
Далее мы понимаем, что пока идёт запрос в сеть, нам нужно показать индикатор прогресса. В классическом варианте показ и скрытие диалога находятся в разных местах и разбивают цепочку действий.
У нас есть следующий экстеншен:
/** * if we hide progress with delay and after that this observable is completed, doOnDispose not * called and we dont reset delayed progress */ fun <E, T> Observable<E>.withProgress(progress: ProgressDelegate, action: (data: E) -> Observable<T>): Observable<T> { return this.flatMap { action.invoke(it) .doOnSubscribe { progress.show() } .observeOn(AndroidSchedulers.mainThread()) .doFinally { progress.hide() } } }
interface ProgressDelegate { fun show() fun hide() }
Внутрь функции передаётся Observable, который оборачивается в показ прогресса на onSubscribe и его скрытие на onFinally.
Дефолтная реализация прогресса представлена ниже.
class ProgressDialogManager constructor(activity: Activity, defaultMessage: CharSequence) : ProgressDelegate { private val progressDialog = ProgressDialog(activity) init { progressDialog.setCancelable(false) progressDialog.setMessage(defaultMessage) } override fun show() { if (progressDialog.isShowing) return progressDialog.show() } override fun hide() { if (!progressDialog.isShowing) return progressDialog.dismiss() } }
Но и также есть реализации ProgressDelegate, добавляющие в RecycleView в конец списка элемент с прогрессом:
private class StatisticsServersListProgressDelegate(private val statisticsServersAdapter: StatisticsServersAdapter) : ProgressDelegate { override fun show() { with(statisticsServersAdapter) { transaction { clear() add(ArchAdapterItem(PROGRESS_VIEW_TYPE.toString(), SimpleProgressItemData(isFooter = false), PROGRESS_VIEW_TYPE)) } } } override fun hide() { statisticsServersAdapter.removeItem(PROGRESS_VIEW_TYPE.toString()) } }

Или, например, экстеншен, добавляющий задержку на показ любой реализации ProgressDelegate, позволяющий убрать мелькания диалогов на коротких действиях.
class DelayedProgressDelegate internal constructor(private val progressDelegate: ProgressDelegate, private val delayTimeMillis: Long = SHOW_DELAY_MILLIS) : ProgressDelegate { companion object { private const val SHOW_DELAY_MILLIS = 800L } enum class ProgressState { SHOW, HIDE } private val stateChangeListener = object : StateMachine.StateChangeListener<ProgressState> { override fun onStateChanged(oldState: ProgressState, newState: ProgressState) { when (newState) { ProgressState.SHOW -> progressDelegate.show() ProgressState.HIDE -> progressDelegate.hide() } } } private val stateMachine = StateMachine(ProgressState.HIDE, stateChangeListener) override fun show() { stateMachine.gotoState(ProgressState.SHOW, delayTimeMillis, true) } override fun hide() { stateMachine.gotoState(ProgressState.HIDE, 0, true) } fun reset() { stateMachine.clear() } }
Этот экстеншен можно использовать с абсолютно любыми реализациями диалогов.
В итоге вышеописанный пример принимает следующий вид:
private fun bindRegistrationFlow(archView: AuthFullRegistrationView) { val handleFields = Observable .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(), Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence -> FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText) }) .take(1) archView.doneClick() .flatMap { handleFields } .withProgress(progressDialogManager.delayed()) { // flatMap -> withProgress fullRegistrationInteractor.registration(it) .subscribeOn(Schedulers.io()) } .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail(VerifyEmailParams(archView.getEmailText())) }, { handleError(archView, it) }) .addToDisposable(compositeDisposable) }
В этот момент мы поняли, что любые диалоги проще показывать через Rx.
fun createSimpleDialog(@StringRes messageId: Int, @StringRes positiveTitle: Int, @StringRes negativeTitle: Int, postCreateDialogAction: (dialog: AlertDialog) -> Unit = {}): Observable<Boolean> { val result = BehaviorSubject.create<Boolean>() val dialog = AlertDialog.Builder(activity, styleId) .setMessage(messageId) .setPositiveButton(positiveTitle) { _, _ -> result.onNext(true) } .setNegativeButton(negativeTitle) { _, _ -> result.onNext(false) } .create() dialog.setOnDismissListener { result.onComplete() } return result.doOnSubscribe { dialog.show() postCreateDialogAction.invoke(dialog) } .subscribeOn(AndroidSchedulers.mainThread()) .doOnDispose { dialog.dismiss() } }
Вся цепочка удаления сервера принимает следующий вид:
private fun bindDeleteMenuItem(archView: CurrentServerArchView, server: ServerEntity) { archView.serverMenuDeleteClicks() .concatMap { alertDialogRxFactory.createSimpleDialog(R.string.delete_server_alert_message, R.string.common_yes, R.string.common_no) } .filter { it } .withProgress(progressDialogManager.delayed()) { deleteServerInteractor.deleteServer(server.id) .subscribeOn(Schedulers.io()) } .safeResponseSubscribe(onError = { errorDialogProvider.showModelError(it) }) .addToDisposable(serverActions) }
archView.serverMenuDeleteClicks — это клик по элементу меню, который тоже обрабатывается реактивно.
class ActionMenuArchView @Inject constructor(private val activity: Activity) : BaseArchView() { val items: List<MenuItem> get() = menuView.menu.items() val menuClicks: Observable<MenuItem> by lazy { menuView.menuClicks().share() } private val menuView: ActionMenuView get() = viewHolder.view as ActionMenuView fun inflateMenu(@MenuRes menu: Int): List<MenuItem> { menuView.menu.clear() activity.menuInflater.inflate(menu, menuView.menu) return items } fun itemClicks(@IdRes itemId: Int): Observable<MenuItem> = menuClicks.filter { it.itemId == itemId } }
Для примера, показ диалогов с календарём:
fun openCalendar(activity: Activity): Observable<Date> { val result = BehaviorSubject.create<Date>() val dateCalendar = Calendar.getInstance() dateCalendar.timeInMillis = System.currentTimeMillis() val dialog = DatePickerDialog(activity, DatePickerDialog.OnDateSetListener { _, year, month, dayOfMonth -> val calendar = Calendar.getInstance() calendar.set(year, month, dayOfMonth) result.onNext(calendar.time) result.onComplete() },dateCalendar.get(Calendar.YEAR), dateCalendar.get(Calendar.MONTH), dateCalendar.get(Calendar.DAY_OF_MONTH) ) dialog.setOnDismissListener { result.onComplete() } dialog.show() return result.doOnDispose { dialog.dismiss() } }
Или выбором пола:
fun openSexChooser(activity: Activity): Observable<String> { val result = BehaviorSubject.create<String>() val items = arrayOf("Male", "Female") val builder = AlertDialog.Builder(activity) builder.setItems(items) { _, which -> result.onNext(items[which]) result.onComplete() } val alertDialog = builder.create() alertDialog.setOnDismissListener { result.onComplete() } alertDialog.show() return result.doOnDispose { alertDialog.dismiss() } }
Правильнее показывать диалог в doOnSubscribe, но никто не идеален:)
Почему это очень удобно?
- диалог полностью отвязывается от жизненного цикла;
- диалог полностью отвязывается от действий, которые он инициирует, и его можно легко переиспользовать;
- диалог остаётся частью цепочки действий, а не разбивает её на «до» и «после» и не теряется контекст действия;
- как результат предыдущего пункта — не падает читаемость кода.
Жизненный цикл
Далее можно обернуть вызовы методов жизненного цикла. Зачем? Например, чтобы повторить поведение, аналогичное LiveData, т.е. чтобы не обновлять UI между onPause и onResume.
@ActivityScope class ActivityLifecycleDispatcher @Inject constructor() { private val isResumedSubject = BehaviorSubject.create<Boolean>() fun isResumed(): Observable<Boolean> = isResumedSubject fun onResumed(isResumed: Boolean) { isResumedSubject.onNext(isResumed) } }
private fun startServers() { updateServersDisposable?.dispose() updateServersDisposable = visibleScreen() .flatMap { serversInteractor.load().repeatWhen { it.delay(UPDATING_PERIOD_SECONDS, TimeUnit.SECONDS) } } .subscribeOn(Schedulers.io()) .subscribe() } private fun visibleScreen() = activityLifecycleDispatcher.isResumed().filter { it }.take(1)
Здесь мы периодически обновляем список серверов, но только если экран виден пользователю.
Таким же образом можно обрабатывать onNewIntent.
@ActivityScope class ActivityResultDispatcher @Inject constructor() { private val onActivityResultSubject = PublishSubject.create<OnActivityResultInfo>() fun onActivityResult(requestCode: Int): Observable<OnActivityResultInfo> = onActivityResultSubject.filter { it.requestCode == requestCode } fun handleOnActivityResult(requestCode: Int, resultCode: Int, data: Intent?) { onActivityResultSubject.onNext(OnActivityResultInfo(requestCode, resultCode, data)) } } data class OnActivityResultInfo constructor(val requestCode: Int, val resultCode: Int, val data: Intent?)
Например, цепочка действий при возврате в галерею после создания нового канала:
activityResultDispatcher.onActivityResult(ServerChannelsListPresenter.CREATE_CHANNEL_REQUEST_CODE) .filter { it.resultCode == Activity.RESULT_OK } .map { it.data!!.extras!!.revealNavigationParams<ChannelSettingsFinishParams>()!! } .flatMap { getChannelInteractor.get(it.channelId) .take(1) .subscribeOn(Schedulers.io()) } .observeOn(AndroidSchedulers.mainThread()) .subscribe { if (it.data != null) { galleryNavigator.openChannel(it.requireData) mainMenuVisibilityController.closeImmediately() } } .addToDisposable(compositeDisposable)
- получили необходимый onActivityResult;
- развернули параметры из интента;
- загрузили ожидаемый канал;
- на главном потоке открыли этот канал и скрыли меню.
Аналогично мы обрабатываем запросы разрешений, onNewIntent, остальные методы жизненного цикла, появление клавиатуры, клик по клавише громкости и прочие системные события.
Для обработки ошибок сети можно написать механизм повторения запросов при ошибках либо ожидания сети:
fun <T> Observable<SafeResponse<T>>.repeatOnNetwork(networkController: NetworkController): Observable<SafeResponse<T>> { return this .flatMap { if (it.exception is ServerException) { Observable.just(it) } Observable.error<SafeResponse<T>>(Exception()) } .retryWhen { it.flatMap { networkController.networkState } } }
Работа с плеером
RxBinding не поддерживает методы ExoPlayer, поэтому пришлось написать пару десятков экстеншенов, чтобы поддержать работу с плеером в таком же стиле:
archView.renderFirstFrame() .take(1) .subscribe { trackViewed() } .addToDisposable(contentDisposable) archView.playWhenReadyState() .subscribe { onChangePlayWhenReadyState(it) } .addToDisposable(contentDisposable) archView.videoSize() .take(1) .subscribe { archView.setSize(it.height, it.width) } .addToDisposable(contentDisposable) archView.playerState() .distinctUntilChanged() .subscribe { onChangePlayerState(it, archView) } .addToDisposable(contentDisposable)
fun SimpleExoPlayer.renderFirstFrame(): Observable<Unit> = ExoPlayerRenderFirstFrameObservable(this) private class ExoPlayerRenderFirstFrameObservable(private val view: SimpleExoPlayer) : Observable<Unit>() { override fun subscribeActual(observer: Observer<in Unit>) { val listener = Listener(observer) val disposable = object : MainThreadDisposable() { override fun onDispose() { view.removeVideoListener(listener) } } observer.onSubscribe(disposable) view.addVideoListener(listener) } private class Listener(private val observer: Observer<in Unit>) : VideoListener { override fun onRenderedFirstFrame() { observer.onNext(Unit) } } }
Аналогично мы написали все остальные экстеншены для ExoPlayer.
Работа с RecycleView
Адаптер, который мы используем у себя в проектах:
abstract class ArchRecyclerViewAdapter constructor(diffExecutor: Executor, private val componentsFactory: ArchRecycleComponentsFactory) : RecyclerView.Adapter<ArchRecycleViewHolder>() { /** this items are update after diff is applied. They may be used when you items adapter is currently showing */ val adapterItems: List<ArchAdapterItem<out Any>> get() = differ.currentList /** this items are updated on [update] call. They represent the future state of this adapter and may be used as a cache for further updates*/ var dataItems: List<ArchAdapterItem<out Any>> = emptyList() private set private val differ = AsyncListDifferFactory(this, diffExecutor).create() fun update(newList: List<ArchAdapterItem<out Any>>?) { dataItems = newList ?: emptyList() differ.submitList(newList) } @Suppress("unused") fun transaction(transaction: MutableList<ArchAdapterItem<out Any>>.() -> Unit) { update(dataItems.toMutableList().apply(transaction)) } override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ArchRecycleViewHolder { val view = componentsFactory.inflateView(viewType, parent) return componentsFactory.createViewHolder(viewType, view)!! } override fun onBindViewHolder(holder: ArchRecycleViewHolder, position: Int) { getBinder(holder).bind(holder, adapterItems[position].data) } override fun onViewRecycled(holder: ArchRecycleViewHolder) { super.onViewRecycled(holder) getBinder(holder).unbind(holder) } override fun onViewDetachedFromWindow(holder: ArchRecycleViewHolder) { super.onViewDetachedFromWindow(holder) getBinder(holder).detach(holder) } override fun onViewAttachedToWindow(holder: ArchRecycleViewHolder) { super.onViewAttachedToWindow(holder) getBinder(holder).attach(holder) } override fun getItemViewType(position: Int) = adapterItems[position].viewType override fun getItemCount() = adapterItems.size private fun getBinder(holder: ArchRecycleViewHolder) = componentsFactory.createViewBinder(holder.itemViewType) as ArchRecycleViewBinder<ArchRecycleViewHolder, Any?> } interface ArchRecycleViewBinder<V: ArchRecycleViewHolder, D> { fun bind(holder: V, data: D) fun unbind(holder: V) {} fun attach(holder: V) {} fun detach(holder: V) {} }
Если коротко, то есть ArchRecycleViewBinder, который в методе bind связывает данные и холдер. На каждый тип элемента имеется один общий на элементы этого типа биндер. Он возвращает действия пользователя, которые инициируются внутри холдеров адаптера. View проксирует эти методы в Presenter.
class ServerMemberViewBinder @Inject constructor() : ArchRecycleViewBinder<ServerMemberViewHolder, ServerMemberItemData> { private val memberClickSubject = BehaviorSubject.create<ServerMember>() private val roleMenuClickSubject = BehaviorSubject.create<ServerMember>() private var roleSettingsEnable: Boolean = false fun memberClicks(): Observable<ServerMember> = memberClickSubject fun roleMenuClicks(): Observable<ServerMember> = roleMenuClickSubject fun setRoleSettingsEnable(enable: Boolean) { roleSettingsEnable = enable } override fun bind(holder: ServerMemberViewHolder, data: ServerMemberItemData) { val serverMember = data.serverMember ViewUtils.setViewVisibility(holder.moreMenu, roleSettingsEnable && serverMember.role != ServerRole.OWNER) holder.itemView.clicks() .map { serverMember } .subscribe(memberClickSubject) holder.moreMenu.clicks() .map { serverMember } .subscribe(roleMenuClickSubject) } }
Вывод
В подобном стиле у нас написано ещё множество других частей приложения: обработка диплинков, работа с редактированием изображений, туториалы по ходу работы приложения, шаринг, пагинация и т.д. Самое удивительное, что порог вхождения в подобную архитектуру остается довольно низким. Большая часть задач решается минимальным количеством операторов, а, например, Room уже из коробки поддерживает Rx, и достаточно просто подписаться на данные, чтобы получать уведомления об их изменении. Довольно быстро разработчик перестраивается на такой стиль написания кода и начинает рассматривать любое действие в реактивном стиле, код становится читабельнее, а писать его быстрее.
Буду искренне рад любому фибдеку и комментариям об уместности использования реактивного подхода в разных слоях архитектуры приложения.
