Основы реактивного программирования под Android на практическом примере

1. Введение в реактивное программирование


Разрабатывая сложное приложение под Android со множеством сетевых соединений, взаимодействием с пользователем и анимацией — означает писать код, который полон вложенных обратных вызовов. И по мере развития проекта такой код становится не только громоздким и трудно понимаемым, но также сложным в развитии, поддержке и подвержен множеством трудноуловимым ошибкам.

ReactiveX или функциональное реактивное программирование предлагает альтернативный подход, который позволяет значительно сократить код приложения и создавать изящные понимаемые приложения для управления асинхронными задачами и событиями. В реактивном программировании потребитель реагирует на данные, как они придут и распространяет изменения события в зарегистрированных наблюдателях.

RxJava — реализация ReactiveX с открытым исходным кодом на Java. Базовыми строительными блоками реактивного кода являются Observables и Subscribers. Подробнее с базовой основой можно ознакомиться в статье Грокаем* RxJava, часть первая: основы.

RxAndroid — расширение к RxJava, которое позволяет планировщику запускать код в основном и дополнительных потоках Android приложения и обеспечивает передачу результатов из созданных дополнительных потоках в основное для агрегации и взаимодействия с интерфейсом пользователя.
С целью более полного понимания основных принципов реактивного программирования рассмотрим практический пример для платформы Android. И начнем с настройки окружения для разработки.

2. Подготовка окружения


Подключаем основные библиотеки и прописываем зависимости в секции dependencies{} конфигурационного файла buil.gradle:
dependencies { 
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6' 
}

Подключаем поддержку лямбда-выражений — используем новые возможности языка Java 8 на платформе Android N. Чтобы использовать возможности языка Java 8 также необходимо подключить и новый компилятор Jack, для чего добавьте в файл build.gradle:
android {
	...
  	defaultConfig {
    	...
    	jackOptions {
      		enabled true
    	}
  	}
  	compileOptions {
    		sourceCompatibility JavaVersion.VERSION_1_8
    		targetCompatibility JavaVersion.VERSION_1_8
  	}
}

Примечание: Jack поддерживается только в Android Studio 2.1 и также необходимо выполнить обновление до JDK 8.

При внесении изменений в конфигурационном файле gradle появляется предупреждение о необходимости синхронизировать проект и, чтобы применить все изменения нажмите на ссылку Sync Now вверху-справа.

3. Создаем базовый пример


В связи с тем, что применение RxAndroid в большинстве случаев связано с проектами с много-поточной обработкой сетевых соединений — рассмотрим простой пример обработки результатов парсинга сайта.
Для отображения результатов создадим простой layout:
<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
   ...>
   <ScrollView
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:id="@+id/scrollView" >
       <TextView
           android:layout_width="wrap_content"
           android:layout_height="wrap_content"
           android:id="@+id/textView" />
   </ScrollView>
</RelativeLayout>

Для парсинга создадим простой класс WebParsing с двумя методами getURLs и getTitle:
public class WebParsing {
public List<String> getURLs(String url) {
   Document doc;
   List<String> stringList = new ArrayList<>();
   try {
       doc = Jsoup.connect(url).get();
       Elements select = doc.select("a");
       for (Element element : select) {
           stringList.add(element.attr("href"));
       }
   } catch (IOException e) {
       e.printStackTrace();
       return null;
   }
   return stringList;
}
}

public String getTitle(String url) {
   String title;
   try {
       Document doc = Jsoup.connect(url).get();
       title = doc.title();
   } catch (MalformedURLException mue) {
       mue.printStackTrace();
       return null;
   } catch (HttpStatusException hse) {
       hse.printStackTrace();
       return null;
   } catch (IOException e) {
       e.printStackTrace();
       return null;
   } catch (IllegalArgumentException iae) {
       iae.printStackTrace();
       return null;
   }
   return title;
}

Метод getURLs просматривает содержимое сайта и возвращает список всех найденных ссылок, а метод getTitle возвращает Title сайта по ссылке.

4. Подключаем реактивность


Для того, чтобы использовать возможности RxAndroid на основе приведенных выше методов создадим два соответствующих Observables:
Observable<List<String>> queryURLs(String url) {
   WebParsing webParsing = new WebParsing();
   return Observable.create(
           new Observable.OnSubscribe<List<String>>() {
               @Override
               public void call(Subscriber<? super List<String>> subscriber) {
                   subscriber.onNext(webParsing.getURLs(url));
                   subscriber.onCompleted();
               }
   }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}

Observable<String> queryTitle(String url) {
   WebParsing webParsing = new WebParsing();
   return Observable.create(new Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {
           subscriber.onNext(webParsing.getTitle(url));
           subscriber.onCompleted();
       }
   }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}

Первый Observable будет порождать список URL ссылок, найденных на сайте, второй будет порождать Title. Разберем пример перового метода подробно и построчно:
  1. Observable<List > queryURLs(String url) — строка объявляет Observable метод, который принимает в виде входного параметра ссылку на сайт для парсинга и возвращает результат парсинга в виде списка ссылок <List> с указанного сайта;
    WebParsing webParsing = new WebParsing() — создает переменную для доступа к нашим функциям парсинга;
    return Observable.create — создает Observable, возвращающего список ссылок;
    new Observable.OnSubscribe<List>() — строка объявляет интерфейс OnSubscribe с одним методом (см. ниже), который вызовется при подписке;
    public void call(Subscriber<? super List> subscriber) — перегружает метод call, который будет вызываться после подписки Subscriber;
    subscriber.onNext(webParsing.getURLs(url)) — вызывает метод onNext для передачи данных Subscriber всякий раз, когда порождаются данные. Этот метод принимает в качестве параметра объект, испускаемый Observable;
    subscriber.onCompleted() — Observable вызывает метод onCompleted() после того, как вызывает onNext в последний раз, если не было обнаружено никаких ошибок;
    subscribeOn(Schedulers.io()) — метод subscribeOn подписывает всех Observable выше по цепочке на планировщик Schedulers.io();
    observeOn(AndroidSchedulers.mainThread()) — метод observeOn позволяет получить результат в основном потоке приложения.

5. Запускаем первое реактивное приложение


Итак, Observables созданы, реализуем простейший пример на основе первого выше метода, который будет выводить список ссылок сайта:
public void example0(final TextView textView, String url) {
   queryURLs(url)
           .subscribe(new Action1<List<String>>() {
               @Override
               public void call(List<String> urls) {
                   for (String url: urls) {
                       String string = (String) textView.getText();
                       textView.setText(string + url + "\n\n");
                   }
               }
           });
}

Обернем наш реализуемый пример в класс MainExample и вызовем в MainActivity:
public class MainActivity extends AppCompatActivity {
   TextView textView;
   @Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);
       textView = (TextView) findViewById(R.id.textView);
       MainExample mainExample = new MainExample();
       mainExample.example0(textView, "https://yandex.ru/");
   }
}

6. Наращиваем реактивность — использование операторов


Observable может трансформировать выходные данные с помощью операторов и они могут быть использованы в промежутке между Observable и Subscriber для манипуляции с данными. Операторов в RxJava очень много, поэтому для начала рассмотрим наиболее востребованные.
И начнем с того, что избавимся от цикла в подписчике и заставим наблюдателя последовательно испускать данные полученного массива ссылок, и поможет в этом нам оператор from():
public void example1(final TextView textView, String url) {
   queryURLs(url)
           .subscribe(new Action1<List<String>>() {
               @Override
               public void call(List<String> urls) {
                   Observable.from(urls)
                           .subscribe(new Action1<String>() {
                               @Override
                               public void call(String url) {
                                   String string = (String) textView.getText();
                                   textView.setText(string + url + "\n\n");
                               }
                           });
               }
           });
}

Выглядит не совсем красиво и немного запутанно, поэтому применим следующий оператор flatMap(), который принимает на вход данные, излучаемые одним Observable, и возвращает данные, излучаемые другим Observable, подменяя таким образом один Observable на другой:
public void example2(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .subscribe(new Action1<String>() {
                               @Override
                               public void call(String url) {
                                   String string = (String) textView.getText();
                                   textView.setText(string + url + "\n\n");
                               }
                           });
}

На следующем шаге еще разгрузим наш Subscriber и воспользуемся оператором map(), через который можно преобразовывать один элемент данных в другой. Оператор map() также может преобразовывать данные и порождать данные необходимого нам типа, отличного от исходного. В нашем случае наблюдатель будет формировать список строк, а подписчик только выведет их на экран:
public void example3(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .map(new Func1<String, String>() {
               @Override
               public String call(String url) {
                   return textView.getText() + url + "\n\n";
               }
           })
           .subscribe(new Action1<String>() {
               @Override
               public void call(String url) {
                   textView.setText(url);
               }
           });
}

Основные возможности мы рассмотрели и сейчас пришло время воспользоваться лямбдами, чтобы упростить наш код:
queryURLs(url)
       .flatMap(urls -> Observable.from(urls))
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(url1 -> {
           textView.setText(url1);
       });

или еще проще:
queryURLs(url)
       .flatMap(Observable::from)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

Сравним конструкцию выше с получившимся кодом и ощутим мощь и простоту лямбда-выражений.

7. Увеличиваем мощности


На следующем шаге усложним нашу обработку и воспользуемся оператором flatMap(), чтобы подключить второй подготовленный метод queryTitle(), также возвращающий наблюдателя. Этот метод возвращает Title сайта по ссылке на сайт. Создадим пример, в котором будем формировать и выводить список заголовков сайтов по ссылкам, найденным на веб-странице, т.е. вместо полученного списка ссылок на сайты в предыдущем примере выведем заголовки (Title) этих сайтов:
public void example4(final TextView textView, String url) {
   queryURLs(url)
           .flatMap(new Func1<List<String>, Observable<String>>() {
               @Override
               public Observable<String> call(List<String> urls) {
                   return Observable.from(urls);
               }
           })
           .flatMap(new Func1<String, Observable<String>>() {
               @Override
               public Observable<String> call(String url) {
                   return queryTitle(url);
               }
           })
           .subscribe(new Action1<String>() {
               @Override
               public void call(String title) {
                   textView.setText(title);
               }
           });
}

или в сокращенном виде:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .subscribe(textView::setText);

добавляем map() для формирования списка заголовков:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

с помощью оператора filter() отфильтровываем пустые строки со значением null:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .filter(title -> title != null)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

с помощью оператора take() возьмем только первые 7 заголовков:
queryURLs(url)
       .flatMap(Observable::from)
       .flatMap(this::queryTitle)
       .filter(title -> title != null)
       .take(7)
       .map(url1 -> textView.getText() + url1 + "\n\n")
       .subscribe(textView::setText);

Последний пример показал, что объединение множества методов плюс использование большого количества доступных операторов плюс лямбда-выражения и мы получаем буквально из нескольких строк мощный обработчик потоков различных данных.

Все примеры, приведенные в статье выложены здесь.

Источники:


  1. Официальная документация
  2. Грокаем* RxJava, часть первая: основы
  3. Getting Started With ReactiveX on Android
  4. RxJava — Tutorial
  5. Getting Started with RxJava and Android
  6. Reactive Programming with RxJava in Android
  7. Party tricks with RxJava, RxAndroid & Retrolambda
Share post

Similar posts

AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 23

    0
    Подключаем поддержку лямбда-выражений — используем новые возможности языка Java 8 на платформе Android N. Чтобы использовать возможности языка Java 8 также необходимо подключить и новый компилятор Jack, для чего добавьте в файл build.gradle

    Доступны ли лямбды на предыдущих версиях? Или придется использовать retrolambda?

      0
      В принципе, retrolambda уже можно и не использовать. Ставим поддержку SDK API 24 и в проекте устанавливаем minSdkVersion на необходимую нам версию и все должно работать. У меня, например, все отлично работает на телефоне с API 17.
      +1
      Я бы не рекомендовал новичкам учиться по этой статье, даже если забыть, что есть Retrofit, и рассматривать это как просто как базовый пример.
      Масса недочетов, плохой код-стайл и беспорядок в терминах и понятиях.
        +1
        А можно конкретизировать ваши замечания? Доработаю статью.
          0
          Я не силён в Android, но:
          1) getURLs и getTitles у вас могут null вернуть. Тогда все методы в MainExample.java, которые вызывают queryURLs, при попытке прогнать полученный список String url: urls выбросят исключение.
          2) Не уверен, что Jsoup нормально хендлит повороты экрана и т.п. вещи связанные с жизненным циклом Android приложений.
            0
            1) getURLs и getTitles у вас могут null вернуть. Тогда все методы в MainExample.java, которые вызывают queryURLs, при попытке прогнать полученный список String url: urls выбросят исключение.

            Может быть не совсем красиво получилось, но здесь специально сделал возврат null, чтобы потом показать использование .filter(title -> title != null)

            2) Не уверен, что Jsoup нормально хендлит повороты экрана и т.п. вещи связанные с жизненным циклом Android приложений.

            Jsoup никак не связан с поворотом экрана, в данном простом примере при повороте экрана происходит новая загрузка данных.

            +1
            Как уже написали выше, у вас проблемы с методами getURLs и getTitles. Следовало бы поймать exception внутри Observable и вернуть его подписчику в onError, чтобы он решил, что с ним делать.

            — Observable queryURLs(String url) — строка объявляет метод, который порождает строку ссылки на сайт для парсинга и возвращающего список ссылок.
            Если уж решили зачем-то вдаваться в такие мелочи, выражайтесь корректно.

            — new Observable.OnSubscribe() — интерфейс OnSubscribe создает подписчика
            Интерфейс OnSubscribe ничего не создает, это просто интерфейс с 1 методом, который вызовется при подписке.

            — subscribeOn(Schedulers.io()) — метод subscribeOn запускает наш код в дополнительном потоке;
            Не совсем так. Этот метод подписывает всех Observable выше по цепочке на определенный планировщик. Повторный вызов метода ниже по цепочке (с другим планировщиком) не даст никакого результата, например.

            Названия методов Example0...1 и т.д. с заглавной буквы.

            Статьи в списке источников куда полезнее.
            Добавил бы еще это, Rx далеко не заканчивается на сетевых запросах:
            Пагинация 1
            Пагинация 2
            Shake detector
            Доклад Артема Зинатулина

            И просто десятки статей всяких блоггеров, не знаю почему вдруг «хороших материалов мало»
              +1
              И да, так себе идея создавать Observable, который сразу подписан на какие-то планировщики —
              .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
              

              На каком планировщике обработать результат, пусть решает сам подписчик.
              Кроме того, у вас все операторы под queryURLs(...) выполняются на планировщике с Looper'ом, то есть каждый из них постит результат в очередь событий. Хотя можно было бы это сделать только для метода subscribe.
                0
                Но здесь ведь работа с сетью, которая без вариантов требует отдельного потока, поэтому сразу и подписываю на отдельный поток.
                  0
                  Ну, допустим.
                  Но observeOn(...) не стоит там вызывать, уже объяснил почему.
                    0
                    Хорошо, подправлю.
                +1
                Как уже написали выше, у вас проблемы с методами getURLs и getTitles. Следовало бы поймать exception внутри Observable и вернуть его подписчику в onError, чтобы он решил, что с ним делать.

                Придумал может не совсем корректный пример с возвратом null, чтобы потом показать использование .filter(title -> title != null)
                А exception решил пока не обрабатывать по аналогии статьи “Грокаем* RxJava, часть первая: основы”, чтобы упростить пример.

                — Observable queryURLs(String url) — строка объявляет метод, который порождает строку ссылки на сайт для парсинга и возвращающего список ссылок.
                Если уж решили зачем-то вдаваться в такие мелочи, выражайтесь корректно.

                Переделал:
                — строка объявляет Observable метод, который принимает в виде входного параметра ссылку на сайт для парсинга и возвращает результат парсинга в виде списка ссылок <List> с указанного сайта.

                — new Observable.OnSubscribe() — интерфейс OnSubscribe создает подписчика
                Интерфейс OnSubscribe ничего не создает, это просто интерфейс с 1 методом, который вызовется при подписке.

                Переделал:
                — строка объявляет интерфейс OnSubscribe с одним методом (см. ниже), который вызовется при подписке.

                — subscribeOn(Schedulers.io()) — метод subscribeOn запускает наш код в дополнительном потоке;
                Не совсем так. Этот метод подписывает всех Observable выше по цепочке на определенный планировщик. Повторный вызов метода ниже по цепочке (с другим планировщиком) не даст никакого результата, например.

                Переделал:
                — метод subscribeOn подписывает всех Observable выше по цепочке на планировщик Schedulers.io().

                Названия методов Example0...1 и т.д. с заглавной буквы.

                Сам не понимаю, почему назвал с заглавной буквы и почему потом не бросилось в глаза. Все соответственно исправил.

                Статьи в списке источников куда полезнее.
                Добавил бы еще это, Rx далеко не заканчивается на сетевых запросах:
                Пагинация 1
                Пагинация 2
                Shake detector
                Доклад Артема Зинатулина

                Огромное спасибо за ссылки на статьи, большинство из них мне не попались при поиске.
            0
            Было бы не плохо увидеть комментарии профессионалов с конкретными замечаниями и пожеланиями по доработке и развитию статьи. Хочется создать полезный материал для ИТ-сообщества. Тема не простая и хороших материалов мало.
              0
              Нужно стараться делать все вызовы pure.
              Оператор map модифицирующий внешний контекст или захватывающий textView неприемлем.
              Если где — то кроме subscribe есть closure, значит, что-то пошло не так, нужно пересматривать решение.

              Зачем .flatMap(Observable::from)? Ради того, чтобы взять 7 первых ссылок?
              queryTitle должен принимать список url и возвращать список title и не нужен будет flatMap, не нужно будет проверять на null.

              Вы отсеиваете уже проделанную работу queryTitle, на которую было потрачено процессорное время,
              если вы хотите брать 7 первых ссылок то надо фильтровать их на входе, или передавать ограничение в качестве параметра queryTitle.

              subscribeOn и observeOn лучше оставить на усмотрение вызывающей стороне, так она сможет решить сама где она хочет производить парсинг, а где обновление UI.
              В крайнем случае subscribeOn можно вызвать внутри реализации api, если ваш контракт подразумевает только асинхронное выполнение.

              Что — то вроде того:
              queryURLs(url)
                     .map(this::queryTitle)
                     .subscribeOn(Schedulers.io())
                     .observeOn(AndroidSchedulers.mainThread())
                     .subscribe(titles -> textView.setText(Joiner.on("\n\n").join(titles));
              


              У вас же, queryURLs делает работу на background потоке и потом пушит результат в ui, а следом queryTitle опять начинает выполнять загрузку и парсинг в своем background потоке.
              Это приводит к ненужным пушам в looper ui потока, которому и так есть чем заняться.
              Поэтому лучше оставлять такие решения вызывающей стороне.

              Надеюсь не очень скомкано получилось.
                0
                Спасибо за хороший и развернутый комментарий!
                Получилось так, потому что увлекся демонстрацией возможностей операторов по аналогии с прочитанными статьями и получилось в ущерб самому примеру.
              0

              Пожалуйста, хватит рекомендовать использовать Observable.create(). Вы не делаете поддержку backpressure, не проверяете, что подписчик уже отписался и не обрабатываете ошибки (хотя тут RxJava спасет вас сама, но тем не менее).


              Используйте Observable.fromAsync() если вам нужно конвертнуть callback api в реактивный (пока не рекомендую для библиотек, но рекомендую для приложений, api еще не стабильный).

                0
                Single, не?
                  0

                  Callback api, как правило, кидает больше одного события — поэтому Observable, можно и Single, если он подходит.

                  0
                  Если не затруднит, можно ли пример с Observable.fromAsync()?
                    0

                    Он есть прямо в javadoc у оператора

                  0
                  Как же нелепо выглядит rx на джаве 7.
                  На джаве 8 или с retrolambda, rx начинает выглядеть нелепо, когда нужно добавить свой кастомный оператор.
                  Когда android разработчики начнут использовать Kotlin?
                    0
                    А зачем? Ретролямбы/лямбдыJava8 вполне себе справляются с чистотой кода. А переходить на новый язык ради «модного веяния» — это для вейперов.
                      0
                      Ну, на мой взгляд, он просто лучше. Если вы переживаете насколько он production ready в его защиту могу сказать — половина андройд студии написана на нем, a в 3 версии gradle будет его использовать в качестве скриптового языка вместо groovy.
                      Что касается rx, Java 8 и retro справляются ровно до тех пор пока вы используете стандартные операторы rx — сказывается отсутствие extension методов. Если вы хотите покинуть rx парадигму или наоборот в нее перейти, также возникают проблемы.

                      Пока все хорошо:
                      observable.map(x->x.getProp()).distinct();
                      

                      Но, если нам нужны свои кастомные операторы:
                      ObservableOperators.anotherOperator(ObservableOperators.custom(observable.map(x->x.getProp()).distinct()), 10)
                      

                      Вместо:
                      observable.map(x->x.getProp()).distinct().custom().anotherOperator(10);
                      

                      На самом деле это далеко не все. Чего стоят inline лямбды не напрягающие GC. И много чего еще…
                      Основная мысль, которую я пытаюсь донести, android — frontend разработка, текущая тенденция — тонкий, легкий клиент. Мы не пишем супернагруженные суперпараллельные вундеркластеры, так почему бы не писать наш тонкий (как правило) клиент лаконично и красиво.

                  Only users with full accounts can post comments. Log in, please.