Распределенная обработка графов со Spark GraphX

image

«Simplicity is prerequisite for reliability» by Edsger Dijkstra

Пролог


Графы — столь наглядная и проста для понимания структура данных, еще со времен Леонарда Эйлера заставляла ломать умы человечества над разнородными задачами, вроде того как можно пройти по всем семи мостам Кёнигсберга, не проходя ни по одному из них дважды или как разъездному посреднику, найти самый выгодный маршрут.

Со времен Эйлера много чего поменялось: появились транзисторы, языки программирования и распределенные вычисления. Именно последние из этого списка кардинально упростили хранение и обработку графов. Собственно, это то, о чем и пойдет речь в этой статье.

Если вы не знакомы с базовыми концепциями Apache Spark такими как RDD, Driver program, Worker node etc., то прежде чем продолжить чтение этой статьи, я бы рекомендовал вам прочитать документацию от Databricks.

Как по мне, лучший способ разобраться с какой-либо технологией это попробовать что-то на ней написать. В этой статье мы сделаем анализ подобия «социальной сети» используя базовые понятия теории графов.

Пишем код


Способ хранения нашей «социальной сети» я выбрал предельно простой и наглядный: tsv файлы на диске, естественно это могли бы быть файлы любого другого формата как Parquet, Avro. Место хранения файлов не принципиально это могло бы быть HDFS или S3, даже если нам надо что-либо поменять, то Spark SQL сделает за нас основную работу. Структура сети будет иметь следующий вид: первый файл это пара Id пользователя и его имя, второй файл Id пользователя и список его пиров. Apache Spark поддерживает следующие языки программирования Java, Scala и Python в качестве API. Я выбрал второй.

Сразу хочу ответить на популярный вопрос о том стоит ли использовать Spark GraphX для хранения графов когда у вас много операций insert/update — ответ нет, все операции изменения RDD вынуждают менять целый RDD в кластере, что не является оптимальным решением, для этого случая подходят специальные NoSql решение такие Neo4J, Titanium или даже Cassandra, Hbase. Ничто не мешает вам использовать Spark GraphX вместе с ними именно для обработки графов, загружая сами данные с БД, например, по шедулеру или в event driven стиле.

Ну что ж, давайте приступим к написанию кода. Сначала нам нужно загрузить граф в память, берем исходные файлы и вытаскиваем нужные вершины и ребра (здесь показаны основные моменты, ссылку на полный листинг с исходным кодом сможете найти в конце статьи):

def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES)
                                       .flatMap(InputDataFlow.parseNames)

def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH)
                                       .flatMap(InputDataFlow.makeEdges)

Pregel


Основным механизмом итерации графа в GraphX является алгоритм Pregel. Алгоритм разработан компанией Google, модель Pregel использует передачу сообщений между вершинами в графе. Передача сообщений благодаря последовательности итераций, называемых супер шагами (supersteps) является основной идеей, положенной в этот алгоритм. Также основную идею можно описать так: ”think like a vertex”, то есть состояние текущей вершины зависит только от состояния его соседей.

Pregel становится крайне необходимым в том случае когда решения задачи с обычным MapReduce становится крайне затруднительным процессом. Занимательно, что имя Pregel происходит от названия реки, которая охватила семь мостов Кенигсберга.

Основным примитивом, для обхода графа является триплет (triplet) — он состоит со следующих компонентов: текущая вершина (a source vertex), вершина в которую переходим (a destination vertex) и ребро между ними (an edge connecting) — здесь все понятно: откуда переходим, куда переходим и по какому пути переходим. Также для Pregel надо указать дефолтные расстояние между вершинами, как правило, это PositiveInfinity, UDF (user defined function) функцию для каждой вершины, чтобы обработать входящее сообщение и посчитать следующею вершину, и UDF для слияния двух входящих сообщения, эта функция должна быть коммутативной и ассоциативной. Так как Scala является функциональным языком, то две последние функции будут представлены как, два лямбда выражения.

Когда мы уже разобрали основные компоненты Pregel, стоит прийти к практике. Первый алгоритм, который реализуем будет алгоритм Дейкстры для поиска кратчайшего пути от произвольной вершины до всех остальных.

def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = {
    val initialGraph = graph.mapVertices((id, _) =>
      if (id == sourceId) 0.0 else Double.PositiveInfinity)

    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (_, dist, newDist) => math.min(dist, newDist),
      triplet => {
        //Distance accumulator
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },
      (a, b) => math.min(a, b)
    )
    sssp.vertices.sortByKey(ascending = true).collect.mkString("\n")
  }

Здесь все очевидно: начинаем от заданной вершины, используем функцию минимума, чтобы определить минимальное расстояние на каждом шаге. Первая функция, которая используется в Pregel, сохраняет кратчайшее расстояние между входящим сообщением и текущей вершиной. Вторая функция распространяет сообщения соседям при этом сохраняя расстояние. Последняя функция — это аналог Reduce стадии — выбирает минимальное значение в случае нескольких входящих сообщений. Дальше просто формируем удобный вывод графа.

Degree of separation


Уверен, что многие читатели этой статьи слышали о теории шести рукопожатий (Six degrees of separation) — это недоказанная теория, согласно которой любые два человека разделены не более чем пятью уровнями общих знакомых, то есть максимум 6 рукопожатий нужно для того, чтобы соединить двух произвольных человек на Земле. В терминах теории графов это звучит так: диаметр графа знакомств не превышает 6 для любых двух человек на Земле.

Начнем написания кода со следующего, нам понадобится поиск в ширину на графе для поиска контактов указанной вершины, для этого нужно модифицировать код алгоритма Дейкстры:

def getBFS(root: VertexId) = {
    val initialGraph = graph.mapVertices((id, _) =>
      if (id == root) 0.0 else Double.PositiveInfinity)

    val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)(
      (_, attr, msg) => math.min(attr, msg),
      triplet => {
        if (triplet.srcAttr != Double.PositiveInfinity) {
          Iterator((triplet.dstId, triplet.srcAttr + 1))
        } else {
          Iterator.empty
        }
      },
      (a, b) => math.min(a, b)).cache()
    bfs
  }

Все очень похоже тому, что было выше, но мы уже указываем количество итераций — для вашего графа это может быть другим числом — 10 для моего графа я получил эмпирическим путем. Дальше выполняем join с именами пользователей и берем 100 первых значений для произвольного пользователя:

def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = {
    getBFS(root).vertices.join(verts).take(100)
  }

Сейчас мы ищем degree of separation от заданной вершины до всех остальных, также можно сделать поиск degree of separation для двух произвольных вершин:

def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = {
    getBFS(firstUser)
      .vertices
      .filter { case (vertexId, _) => vertexId == secondUser }
      .collect.map { case (_, degree) => degree }
  }

Spark GraphX с коробки дает возможность получить множество информации о графе, например, получить компоненту связности графа (сonnected component):

def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = {
    graph.degrees.join(verts)
      .sortBy({ case (_, (userName, _)) => userName }, ascending = false)
      .take(amount)
  }

Или же получить такую метрику как количество треугольников в графе (triangle count):

def socialGraphTriangleCount = graph.triangleCount()

Page Rank


Алгоритм PageRank появился благодаря на тот момент аспирантам Стэнфорда Ларри Пейджу и Сергею Брину. Для каждой вершины графа алгоритм назначает важность среди всех остальных. Например если пользователь Twitter имеет большое количество подписок от других пользователей, то он будет иметь высокий рейтинг, следовательно, его можно будет легко найти в поисковой системе.
  
GraphX имеет статичную и динамичную версию реализации PageRank. Статичная версия имеет фиксированное количество итераций, в то время как динамическая версия будет работать до того момента пока рейтинг не начнет сходится к заданному значению.

Для нашего графа это будет иметь следующий вид:


def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = 
  socialGraph.graph.pageRank(tol = tolerance).vertices

def staticRanks(socialGraph: SocialGraph, tolerance: Double) = 
  socialGraph.graph.staticPageRank(numIter = 20).vertices

Заключение


Внимательный читатель заметил, что тема этой статьи это распределенная обработка графов, но во время написания кода мы ничего не сделали, чтобы обработка была действительно распределенной. И тут следует вспомнить цитату Эдсгера Дейкстры в самом начале. Spark кардинально упрощает нам жизнь беря на себя бремя и тяготы распределенных вычислений. Написания кода, который будут выполнения на распределенном кластере становится не такой уж и тяжелой задачей, как это могло показаться в начал. И тут даже есть несколько опций для менеджмента ресурсов кластера: Hadoop YARN, Apache Mesos (лично мой любимый вариант) и с недавних пор есть поддержка Kubernetes. Весь исходный код, который был разобран в этой статье, можно найти на гитхабе.
Поделиться публикацией

Похожие публикации

Комментарии 0

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое