Как стать автором
Обновить

Потоковая обработка на go1.18

Время на прочтение7 мин
Количество просмотров4.9K

В версии 1.18 языка Go появились генерики, дающие возможность писать обобщенный код, то есть код, не зависящий от конкретного типа данных. Например можно написать код, обрабатывающий потоки данных - применить к ним одну и ту же функцию, отфильтровать, просуммировать и т.д., не завязываясь на конкретные типы данных. Так как я вижу потенциал в парадигме поточной обработки с помощью итераторов/стримов и теперь есть возможность реализовать ее в Go, я решил сделать это.

Эта идея появилась у меня после данной публикации. В ней рассказывается о библиотеке для языка Go для обработки данных с использованием генериков. Я попробовал эту библиотеку в своем проекте и столкнулся с следующими недостатками:

  • настойчивое использование IO[A], который по сути представляет собой func()(A,error) то есть любую функцию, которая может вернуть ошибку. При написании кода это создает неудобства, потому что нужно всё заворачивать в этот IO и код превращается в жонглирование io.Map и io.FlatMap. Видимо это вдохновлено библиотеками с чисто функциональным подходом. Сделано это было, чтобы была функциональная чистота, что в итоге не очень (как по мне) ложится на процедурность языка Go.

  • Stream[A] это обертка над стейт-машиной. Стейт-машина в данном случае это структура, в которой либо ничего нет (конец потока), либо значение и продолжение (следующее состояние) машины. В итоге это создает две проблемы. Первая: чтобы создать стрим, нужно создать его продолжение, продолжение продолжения и тд. Единственный адекватный способ этим пользоваться, который я нашел - это рекурсия. Вкупе с отсутствием Tail call optimization это приводит к второй проблеме.

  • Работа с стримами переполняет стек. Модель потока как стейт-машины была построена так, что переход к новому состоянию - вызов функции. Поэтому, чтобы обойти стрим целиком, надо уйти в рекурсию на глубину, равную длине потока.

  • Построение типа Either[A,B]. По моему скромному мнению, это вообще тип, которого не хватает в Go и который разумно реализован в языке Rust. В библиотеке этот тип это структура, содержащая и элемент типа A и элемент типа B . Плюс булевое поле как способ идентификации какой из полей действителен на самом деле. Важно то, что по сути тип Either[A,B], семантика которого элемент типа A ИЛИ B, представляется в языке как пара из элемента типа A И элемента типа B.

  • Не совсем очевидный интерфейс. Хотя это скорее я не до конца разобрался, что и почему. Так или иначе, я считаю, что построил интерфейс более простой и прямолинейный, не потеряв в общности.

Дизайн библиотеки

Если предыдущая работа была вдохновлена библиотеками для языка Scala cats-effect
и fs2, то я вдохновлялся реализацией итераторов и неопределенных вычислений в
языке Rust. Основные идеи и решения указаных выше недостатков следующие:

  • Поток Stream это все, что угодно, что реализует метод Next() Option[A],
    где Option[A] - или элемент типа A(Some) или ничего (None). Другими
    словами Either[A, Unit]. Пока в стриме есть элементы, метод возвращает
    элементы стрима, как только он заканчивается, он должен возвращать None.

  • Either[A, B] это структура из указателя на элемент типа A и указателя на элемент типа B с следующим инвариантом: только один из указателей не nil. С одной стороны это ближе к семантике один элемент или другой, потому что другого элемента фактически нет. С другой, это приводит к копированию элемента при разыменовании.

  • Отдельно стоит выделить две спецификации структуры Either[A, B]. Первая, это Option[A]. Изначально он был реализован как Either[A, Unit], затем я решил изменить его просто на *A: nil - значения нет, не nil - значение есть.

  • Вторая спецификация это Result[A], который на самом деле просто Either[A, error]. Изначально я хотел была идея сделать этот тип интерфейсом, типа такого:

    type Result[A any] interface {
      Map[B any](func(A) B) Result[B]
      FlatMap[B any](func(A) Result[B]) Result[B]
    }

но это просто невозможно, в силу того, что методы (по крайней мере на момент версии 1.18 языка Go) не могут иметь генерики. По семантике, Result[A] - это результат недетерминированного вычисления: либо результат, либо ошибка. С соответствующим монадическим интерфейсом.

  • Плюс я решил убрать ненужные функции для работы с ресурсами, Pipe, Sink, утилиты для слайсов, так как они не нужны и загрязняют интерфейс почем зря. Чтобы сделать все те же операции над слайсами их можно преобразовать в стрим, применить преобразование, собрать в слайс.

Устройство потоков

(Везде далее слово стрим обозначает то же, что и поток данных выше, а так же Stream[A])

Стрим это любая структура, которая реализует интерфейс стрима:

 type Stream[A] interface {
   Next() Option[A]
 }

Работает это следующим образом: чтобы получить следующий элемент стрима, надо вызвать метод Next. Если стрим не закончился, то он отдает очередной элемент, иначе отдает None. Поведение похоже на чтение из каналов, за исключением того, что вместо булевого флага используется тип Option. Функции для работы со стримами можно разделить на три отдельных части:

  • создание стримов из чего-то, что не является изначально стримом: слайса, канала, генератора и т.д.

  • обработка стримов, то есть создание одного или нескольких стримов из одного или нескольких стримов. При использовании этих функций, как и при использований функций создания стрима, никакой работы не происходит, просто создается структура, реализующая интерфейс стрима. При применении функций из этой категории, старым стримом обычно лучше не пользоваться, так как новый стрим будет активно его же менять.

  • функции, которые разрушают стрим (в том смысле, что после них стрим ни для чего не получиться использовать) и создают что-то, что стримом не является: сбор в слайс, вычисление длины, суммирование всех элементов, печать элементов на консоль и т.д.

Первые две категории функций строят потоки по приведенной ниже схеме. Аналогично можно определить свои стримы в пользовательском коде. Разберем шаги определения стрима на примере создания стрима из слайса (функция FromSlice) и функции, которая создает стрим из элементов-результатов применения функции к элементам исходного стрима (функция Map)

  1. Определяются структуры "реализации" стримов, которые содержат состояние стрима, необходимое для отдачи следующего элемента:

    type fromSliceImpl[A any] struct {
       data []A // original slice
       idx int // index of current element, len(data) if stream ended
    }
    
    type mapImpl[A, B any] struct {
       Stream[A] // original stream
       fn func(A) B // function to apply to every element
    }
  2. Определяем для этих структур интерфейс Stream[A], для этого достаточно реализовать упомянутый выше метод Next:

    func (xs *fromSliceImpl[A]) Next() Option[A] {
      // if stream ended
      if xs.idx == len(xs.data) {
        return None
      }
      // advance stream by one element
      xs.idx++
      // return next element
      return Some(xs.data[xs.idx - 1])
    }
    
    func (xs *mapImpl[A, B]) Next() Option[B] {
      // take next element from source stream
      x := xs.Stream.Next()
      // apply function or do nothing and return resulting Option
      return Map(x, xs.fn)
    }
  1. Реализуем функции для работы со стримами, работа которых - просто создать структуру-реализацию стрима:

    func FromSlice[A any](xs []A) Stream[A] {
      return &fromSliceImpl[A]{xs, 0}
    }
    
    func Map[A, B any](xs Stream[A], fn func(A) B) {
      return &mapImpl[A, B]{xs, fn}
    } 

Из указанного примера можно выделить особенности такой модели стримов:

  • Стримы это просто обертки над данными, которых достаточно, чтобы дать следующий элемент. Стримы, которые являются результатом обработки другого стрима, это обертки над исходным стримом плюс дополнительные данные. По сути это паттерн Декоратор из ООП.

  • Итерация по стриму осуществляется простым циклом (если не пользоваться функциями, которые разрушают стрим и реализуют этот цикл внутри себя) без каких-либо рекурсий. Есть дополнительная нагрузка (по сравнению с сырыми циклами с вычислениями) на вызовы функций Next, но рекурсивных вызовов Next ровно столько, сколько стримов вложено друг в друга. По моему опыту это не слишком большое число, так что переполнение стека нам не грозит.

  • Легко реализовать свой стрим по вышеуказанной модели для чтения потока данных из внешнего API или потока данных с бэкенда или чего-нибудь другого.

  • Стримы в некотором смысле ленивы. Более точно, элементы будут вычисляться только при вызове функции Next, а это происходит либо при ручном вызове пользователем, либо в функциях третьей категории, например при суммировании элементов стрима.

Параллельная обработка

Отдельный вопрос, который вроде бы не вписывается в модель стримов выше, это то, как распараллеливать обработку. Раз элементы можно получать только по одному, то и обрабатывать можно только по одному, так?

В качестве решения предлагается разделить исходный стрим на несколько стримов, параллельно обработать их и затем соединить в один.

Для разделения элементов можно воспользоваться функциями разделения одного стрима на несколько:

  • Scatter - разделить стрим на N стримов с случайным распределением элементов по ним. Самый простой и эффективный способ, так как использует просто пересылку по каналам в отдельных горутинах. Но не гарантирует какие элементы и в каком количестве в какой стрим попадут.

  • ScatterEvenly - разделить стрим на N стримов, отдавая им элементы по очереди в порядке Round robin. Таким образом, каждый стрим получит поровну элементов.

  • ScatterRoute - разделить стрим на N+1 стримов по первому выполнившемуся из N предикатов. Если ни один предикат не выполнился для элемента, он идет в N+1 -ый стрим "дефолтный" стрим. Эту функцию можно использовать для определения кастомной нагрузки на выходные стримы и вообще для разделения стрима для разной обработки.

  • ScatterCopy - скопировать стрим в N стримов. В отличие от предыдущих функций, в которых каждый элемент исходного стрима попадет ровно в один выходной стрим, ScatterCopy пошлет элемент в каждый из N стримов. Для этого он поддерживает внутренний буффер, который необходимо защищать мьютексом, поэтому это очень нагруженный метод. Можно сказать этот метод больше предназначен для копирования стримов и лучше стараться обходиться без него.

За счет того, что внутри функций Scatter* пересылка в результирующие стримы происходит в горутинах с помощью каналов, их обработка автоматически распаралелена.

Собрать результаты из нескольких параллельно обработанных стримов можно с помощью функции Gather, которая собирает элементы из N стримов в один в случайном порядке.

Пример кода с параллельной обработкой:

sourceStream := Take(nats(), 100) // first 100 numbers
pool := Scatter(sourceStream, 10) // split into 10 streams
// process every stream
for i, stream := range pool {
  pool[i] = Map(stream, func(id int) int {
    // long processing task
    time.Sleep(10 * time.Millisecond)
    return id
  })
}
got := Gather(pool)

Каждый из 100 элементов обрабатывается 10 мс, и если обрабатывать их последовательно, то это займет 1000 мс. С разделением на 10 стримов с помощью Scatter это займет немногим больше 100 мс.

Заключение

Код того, что получилось можно посмотреть здесь.

Итоговая библиотека:

  • содержит примитивы функционального программирования и вспомогательные
    структуры: Pair, Either, Option, Set, Counter

  • содержит примитивы и методы для обработки данных с помощью абстракции
    потоков

  • простая для использования и расширения

  • может быть встроена постепенно за счет преобразований в/из стандартные типы данных, а именно стримы могут оперировать с слайсами и каналами, а тип Result с представлением неопределённого результата в Go в виде (A, error).

Теги:
Хабы:
Всего голосов 7: ↑6 и ↓1+5
Комментарии6

Публикации