Поиск под капотом. Облачная индексация

  • Tutorial

В прошлой статье я рассказал о том, каким образом поисковая система может узнать о том, что существует та или иная веб-страница, и сохранить ее себе в хранилище. Но узнать о том, что веб-страница существует, — это только самое начало. Гораздо более важно за доли секунды успеть найти те страницы, которые содержат ключевые слова, введенные пользователем. О том, как это работает, я и расскажу в сегодняшней статье, проиллюстрировав свой рассказ «учебной» реализацией, которая тем не менее спроектирована таким образом, чтобы иметь возможность масштабироваться до размеров индексирования всего Интернета и учитывать современное состояние технологий анализа больших объемов данных.



Заодно у меня получилось рассмотреть основные функции и методы Apache Spark, так что данную статью можно рассматривать еще и как небольшой туториал по спарку.


Формулировка задачи


Более формальная постановка задачи, которую я сегодня разберу: имеется хранилище, содержащее набор веб-страниц, скачанных из Интернета краулером. Необходимо спроектировать механизм, который позволит за доли секунды указывать ссылки на все веб-страницы из этого хранилища, включающие все ключевые слова, содержащиеся в пользовательском запросе. Этот механизм должен быть:


  1. масштабируемым по количеству данных — потенциально мы должны иметь возможность обработать весь Интернет;
  2. масштабируемым по количеству запросов в секунду: у «взрослых» поисковых систем, таких как «Яндекс» и Google, количество поисковых запросов может достигать десятков тысяч запросов в секунду.

Несколько важных ограничений разбираемой сегодня задачи:


  1. В рамках этой статьи я не буду пытаться упорядочить найденные страницы. Движок будет возвращать просто множество. Задача упорядочивания, или, более правильно, ранжирования, — отдельная важная задача, которую я разберу в следующих статьях.
  2. Моя имплементация будет подразумевать наличие всех слов из поискового запроса. Современные поисковые системы позволяют исправлять опечатки, искать по синонимам и т. д., но в конце концов это все равно сводится к нескольким запросам «по всем» словам и объединению или пересечению их результатов.

Давайте теперь разберем, как решить задачу в рамках поставленных ограничений.


Инвертированный индекс


Рассмотрим следующую структуру данных: словарь, ключами которого будут слова из нашего языка, значениями — множества веб-страниц, на которых это слово встречается:



Такая структура данных называется инвертированным индексом, и она является ключевой для работы поисковой системы. Настолько ключевой, что, например, «Яндекс» даже называется в честь нее (yandex — это не что иное, как yet another index).


В реальности этот словарь будет иметь размер намного больший, чем в приведенном примере: количество элементов в нем будет равно количеству разнообразных слов на веб-страницах, а максимальный размер множества для одного элемента — все веб-страницы в индексируемой части Интернета.


Допустим, мы смогли построить такую структуру данных. В этом случае поиск веб-страниц, содержащих слова из запроса, будет происходить следующим образом:


  1. Разбиваем запрос на слова.
  2. По каждому слову обращаемся в обратный индекс и извлекаем множество веб-страниц.
  3. Результат — это пересечение всех множеств, извлеченных в пункте 1.

Например, если мы ищем все веб-страницы по запросу «алгоритм визуализация», а обратный индекс соответствует приведенному в таблице, то результирующее множество будет содержать всего лишь одну веб-страницу — habr.ru/post/325422/, так как только она содержится в пересечении множеств для слов «алгоритм» и «визуализация».


Для того чтобы построить такую структуру данных, можно воспользоваться подходом MapReduce. Об этом подходе у меня есть отдельная статья, но основная идея заключается в следующем:


  1. На первом этапе (map-шаг) можно исходные объекты (в нашем случае документы) преобразовать в пары ключ-значение (ключами будут слова, а значениями — URL документа).
  2. Пары ключ-значение автоматически группируются по ключам (шаг shuffle).
  3. Обработать все значения по заданному ключу (шаг reduce). В нашем случае — сохранить в обратный индекс.

Ниже в разделе с имплементацией я покажу, как реализовать описанный алгоритм, используя популярный open source инструмент для работы с большими данными — apache spark.


Чуть-чуть NLP


Под аббревиатурой NLP в области анализа данных понимают компьютерную обработку естественного языка (Natural Language Processing, не путать с псевдонаучным нейролингвистическим программированием). При работе с поисковой системой не избежать хотя бы отдаленного столкновения с обработкой языка, поэтому нам понадобятся некоторые понятия и инструменты из этой области. В качестве библиотеки работы с естественным языком я буду использовать популярную библиотеку для python NLTK.


Токенизация


Первое понятие из области NLP, которое нам понадобится, — это токенизация. При описании работы с реверсивным индексом я пользовался понятием слово. Однако слово — это не очень хороший термин для разработчика поисковой системы, так как на веб-страницах встречается много разнообразных наборов символов, не являющихся словом в прямом смысле этого слова (например, masha545 или 31337). Поэтому мы будем вместо этого пользоваться токенами. В библиотеке NLTK есть специальный модуль для выделения токенов: nltk.tokenize. Там есть разнообразные способы разбить текст на токены. Мы воспользуемся самым простым способом выделить токены — токенизацией по регекспу:


#code
from nltk.tokenize import RegexpTokenizer
tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
text = "Съешь ещё этих мягких французских булок, да выпей же чаю."
tokenizer.tokenize(text.lower())

#result
['съешь', 'ещё', 'этих', 'мягких', 'французских', 'булок', 'да', 'выпей', 'же', 'чаю']

Лемматизация


Многие языки, и русский в особенности, богаты на формы слов. Понятно, что, когда мы ищем слово «компьютер», мы ожидаем, что найдутся страницы, содержащие слово «компьютера», «компьютеров» и так далее. Для этого все токены нужно привести в так называемую «нормальную форму». Это можно сделать при помощи разных инструментов. Например, на github есть библиотека pymystem, являющаяся оберткой над библиотекой, разработанной яндексом. Я для простоты воспользуюсь методом стемминга — отброса незначащего окончания — и использую для этого стеммер русского языка, входящий в библиотеку nltk:


#code
from nltk.stem.snowball import RussianStemmer
stemmer = RussianStemmer()
tokens = ['съешь', 'ещё', 'этих', 'мягких', 'французских', 'булок', 'да', 'выпей', 'же', 'чаю']
stemmed_tokens = [stemmer.stem(token) for token in tokens]
print(stemmed_tokens)

#result
['съеш', 'ещ', 'эт', 'мягк', 'французск', 'булок', 'да', 'вып', 'же', 'ча']

Отсечение незначимых слов


В обратном индексе для каждого слова мы храним множество URL-страниц, в которых это слово опубликовано. Проблема заключается в том, что некоторые слова (например, предлог «в») встречаются практически на каждой веб-странице. При этом информативность наличия или отсутствия таких слов на странице очень маленькая. Поэтому, для того чтобы не хранить огромные массивы в индексе и не делать лишнюю работу, такие слова мы будем просто игнорировать.


Чтобы определить множество слов, которые будем игнорировать, можно для каждого слова подсчитать долю веб-страниц, на которых это слово встречается, и поставить некоторую границу по этой частоте. Расчет частотности слов — это классическая задача в парадигме map-reduce, подробно об этом можно почитать в моих статьях.



Я для своей реализации вычислял не просто частоту слов, а некоторое ее монотонное преобразование — Inverse Document Frequency (IDF), которое нам в дальнейшем пригодится еще и для ранжирования документов. Методом «пристального взгляда» я определил, что подходящая константа для отсечения слов будет примерно равна 1.12, между словами «за» и «код» (слово «код» очень часто встречается на хабре).


Индексируем



Архитектура моего учебного поискового движка


Apache Spark


Документов, которые я индексирую, довольно много — несколько миллионов. Для их обработки необходим инструментарий для работы с большими данными. Я выбрал apache spark, который является одним из самых популярных фреймворков на сегодняшний день. Так как я использую amazon web services для своей имплементации, я воспользовался дистрибутивом спарка, входящим в состав elastic map reduce. Apache Spark имеет несколько вариантов представления датасетов. Один из основных — так называемый Resilient Distributed Dataset (RDD) — по сути представляет собой распределенный массив данных, которые можно обрабатывать параллельно. Я буду использовать его для своей реализации (хотя есть и другие API для работы со спарком, которые в некоторых случаях могут быть быстрее, см например Dataframe API)


Так как данные в нашем случае у нас хранятся на объектном хранилище амазона (S3), то сначала спарку необходимо сообщить необходимую информацию для работы с этим хранилищем:


#sc-это spark context, он содержит параметры подключения к спарку

def init_aws_spark(sc, config):
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config.AWS_ACCESS_KEY)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config.AWS_SECRET_KEY)

Дальше можно создать RDD из данных, хранящихся на S3 (которые туда сохранил краулер), и заодно сразу распарсить документы из json-формата:


rdd = sc.textFile("s3a://minicrawl/habrahabr.ru/*",)
jsons_rdd = rdd.map(lambda doc: json.loads(doc))

Тут мы применили одну из базовых функций спарка — map, которая применяет функцию ко всем элементам массива, делая это параллельно на всех узлах кластера.


Далее мы несколько раз применим эту функцию для предварительной обработки текста:


Очистка от HTML-разметки


Тут ничего особо интересного, использую библиотеку lxml для парсинга html-а и удаления разметки:


import copy
import lxml.etree as etree

def stringify_children(node):
    if str(node.tag).lower() in {'script', 'style'}:
        return []
    from lxml.etree import tostring
    parts = [node.text] 
    for element in node.getchildren():
        parts += stringify_children(element)
    return ''.join(filter(None, parts)).lower()

def get_tree(html):
    parser = etree.HTMLParser()
    tree   = etree.parse(StringIO(html), parser)
    return tree.getroot()

def remove_tags(html):
    tree = get_tree(html)
    return stringify_children(tree)

def get_text(doc):
    res = copy.deepcopy(doc)
    res['html'] = res['text']
    res['text'] = remove_tags(res['html'])
    return res

clean_text_rdd = jsons_rdd.map(get_text).cache()

Токенизация и стемминг



from nltk.tokenize import RegexpTokenizer
from nltk.stem.snowball import RussianStemmer
def tokenize(doc):
    tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
    res = copy.deepcopy(doc)
    tokens  = tokenizer.tokenize(res['text'])
    res['tokens'] = list(filter(lambda x: len(x) < 15, tokens))
    return res

def stem(doc):
    stemmer = RussianStemmer()
    res = copy.deepcopy(doc)
    res['stemmed'] = [stemmer.stem(token) for token in res['tokens']]
    return res

stemmed_docs = clean_text_rdd.map(tokenize).map(stem).cache()

Функция cache(), вызванная после функции map(), подсказывает, что этот датасет необходимо закешировать. Если этого не сделать, при многократном использовании спарк будет его рассчитывать заново.


Фильтрация высокочастотных слов на spark


Как я писал, фильтровать будем слова, для которых мера IDF меньше чем 1.12. Для этого нам сначала надо посчитать частоты всех слов. Это прямо классическая задача на анализ больших данных:


def get_words(doc):
    return [(word, 1) for word in set(doc['stemmed'])]

word_counts = stemmed_docs.flatMap(get_words)\
            .reduceByKey(lambda x, y: x+y)

Здесь используются две интересные функции спарка:


  1. flatMap — работает аналогично map, но возвращает не одно значение для одного значения входного датасета, а несколько. В нашем случае — возвращаем пару ключ-значение (<слово>, 1) для каждого слова, хотя бы раз входящего в документ.
  2. reduceByKey — позволяет обработать все значения для одного ключа. В нашем случае просуммировать.

Далее рассчитаем IDF для всех токенов:


doc_count = stemmed_docs.count()
def get_idf(doc_count, doc_with_word_count):
    return math.log(doc_count/doc_with_word_count)

idf = word_counts.mapValues(lambda word_count: get_idf(doc_count, word_count))

Получим список высокочастотных стоп-слов:


idf_border = 1.12
stop_words_list =idf.filter(lambda x: x[1] < idf_border).keys().collect()
stop_words = set(sop_words)

Тут используются функции спарка:


  • filter() — оставляет в датасете только элементы, подходящие под определенный критерий;
  • keys() — оставляет только ключи в датасете (есть аналогичная функция values(), которая оставляет только значения);
  • collect() — собирает распределенный датасет в локальный список. После этого над ним больше нельзя выполнять спарсковские функции.

У меня получилось 50 стоп-слов, среди которых присутствуют как очевидные: "в", "о", "не", так и менее очевидные, но логичные для хабра: "хабрахабр", "мобильн", "песочниц", "поддержк", "регистрац".


Построение обратного индекса


Задача построить датасет типа слово -> множество URL очень похожа на задачу посчитать количество документов, в которых встречается слово, с одним различием: мы будем не прибавлять единицу каждый раз, а добавлять новый URL в множество.


def token_urls(doc):
    res = []
    for token in set(doc['stemmed']):
        if token not in stop_words:
            res.append((token, doc['url']))
    return res

index_rdd = stemmed_docs.flatMap(token_urls)\
       .aggregateByKey(set(),\
                       lambda x, y: x.union({y}),
                       lambda x, y: x.union(y))

Тут в дополнение к уже использованной ранее функции flatMap используется еще и функция aggregateByKey, которая очень похожа на reduceByKey, но принимает три параметра:


  • пустой объект-аккумулятор, в котором будет накапливаться результат;
  • функцию, которая добавляет в аккумулятор одно значение;
  • функцию, которая может слить два аккумулятора в один. Значения для одного ключа могут агрегироваться параллельно, эта функция нужна для объединения частично агрегированных результатов.
    Дальше осталось только сохранить обратный индекс. Для того чтобы его сохранить, нам подойдет любое распределенное key-value хранилище. Я выбрал aerospike— он быстрый, хорошо распределяется. Записывать в значение для токена прямо сериализованное множество url-ов:

В общем, все просто:


import pickle
storage = LazyAerospike(config.AEROSPIKE_ADDRESS)
results = index_rdd.map(lambda x: storage.put(x[0], pickle.dumps(x[1]))).collect()

Тут я использую pickle — стандартный питоновский способ сериализации почти любых объектов. Также использую небольшую обертку над стандартным клиентом aerospike, которая позволяет инициализировать соединение с базой данных в момент первой записи или чтения. Это нужно, так как spark не может распараллелить подключение к базе данных по всем узлам кластера, приходится каждый раз подключаться заново.


class LazyAerospike(object):
    import aerospike 
    def __init__(self, addr, namespace='test', table='index'):
        self.addr = addr
        self.connection = None
        self.namespace = namespace
        self.table = table

    def check_connection(self):
         if self.connection is None:
            config = {
                'hosts': [ (self.addr, 3000) ]
            }
            self.connection = self.aerospike.Client(config).connect()

    def _get_full_key(self, key):
        return (self.namespace, self.table, key)

    def put(self, key, value):
        self.check_connection()
        key_full = self._get_full_key(key)
        self.connection.put(key_full, {'value': value})
        return True

    def get(self, key):
        self.check_connection()   
        key_full = self._get_full_key(key)
        value =  self.connection.get(key_full)
        return value[2]['value']

API для извлечения данных


Осталось написать функцию, которая будет исполняться во время пользовательского запроса. С ней все просто: разбиваем запрос на токены, извлекаем множества URL-ов для каждого токена и пересекаем их:


def index_get_urls(keyword):
    raw_urls = storage.get(keyword)
    return pickle.loads(raw_urls)

def search(query):
    stemmer = RussianStemmer()
    tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
    keywords_all = [stemmer.stem(token) \
            for token in  tokenizer.tokenize(query)]
    keywords = list(filter(lambda token: \
             token not in stop_words, keywords_all))
    if len(keywords) == 0:
        return []
    result_set= index_get_urls(keywords[0])
    for keyword in keywords[1:]:
        result_set=\
            result_set.intersection(index_get_urls(keyword))
    return result_set

Запускаем и убеждаемся, что все работает как надо (тут я запускал на небольшом семпле):


Заключение


Настоящая поисковая система, конечно, устроена гораздо сложнее. Например, я храню множества очень неоптимально, достаточно было бы вместо самих url-ов хранить только их id-шники. Тем не менее архитектура получилась распределенная и потенциально может действительно работать на большой части Интернета и под большими нагрузками.


В продакшене вам, скорее всего, не нужно будет реализовывать поисковую систему руками — лучше воспользоваться готовыми решениями, например таким, как ElasticSearch.


Однако понимание основ того, как работает обратный индекс, может помочь его правильно настроить и использовать, ну и для решения похожих задач может оказаться очень полезным.


В этой статье я не затронул самую интересную, на мой взгляд, часть поиска — ранжирование. О нем речь пойдет в следующих статьях.


Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Написать отдельную статью о том как развернуть spark+jupyter в облаке amazon?

  • 82,1%Да23
  • 17,9%Нет, и так все понятно5
  • +18
  • 4,6k
  • 6
Поделиться публикацией
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 6

    +1
    Если честно, не очень понятна цель статьи. Мы пытаемся сделать реальный поиск, который будет лучше современных решений по полнотекстовому поиску, или мы играемся со Спарком на примере этой задачи? Если второе — то ок, если первое — ну, такое: перебить в одиночку современные индустриальные разработки вряд ли получится.
      +1
      Спасибо за комментарий. Свои цели я описывал в предыдущей статье, в ваших терминах это «поиграться со спарком». Конечно же я не планирую в одиночку перебить индустриальные разработки, а на примере задачи продемонстрировать разные технологии и алгоритмы.
      0
      Мне кажется, автор слегка жульничает :) всю работу по распараллеливанию запросов за вас сделает aerospike, причем вы даже не задумываетесь, что у него внутри. И фактически вы сделали работу только по построению индекса. И да, попытка добавить сюда например исправление опечаток (любой нечеткий поиск) весьма вероятно приведет к изменению всей архитектуры.
        0
        Я сделал работу только по построению индекса, так как в этом была цель этого шага:) Aerospike тут выполняет роль которую может любое другое key-value хранилище построенное на этих принципах, например dynamodb. Работа key-value хранилищ это интересная область, но для моей задачи это просто инструмент (так же как например spark).

        По поводу исправления ошибок и например обработки синонимов — на самом деле архитектура не меняется от этого. Просто в момент извлечения данных из индекса необходимо будет извлечь большее количество документов, а уже потом правильным образом их отсортировать (об этом в следующей статье)
          0
          Понятно, что в одном посте всего не рассказать, так что это не претензия.

          >любое другое key-value хранилище

          На каких «этих»? Тут в моем понимании дело в том, что с ростом объема данных и индексов вы просто обязаны их распределять, и данные, и работу, по узлам кластера. И я не знаю универсального автоматического способа это сделать, не вникая в то, что у нас за данные. Ну то есть — хранилище распределяет данные по кластеру на основе ключа, а ключ строите вы. Я не особо большой спец в этом вопросе, но мне кажется, что такого хранилища, которое позволяло бы ни о чем не думать на реально больших объемах не существует.

          И еще я имел в виду, что для нечеткого поиска обычно приходится что-то делать при построении индекса, а не только при поиске по нему. Так что если не архитектура в целом, что вид индекса или индексов вполне может поменяться. И если у вас не было изначально скажем N-грамм, то при поиске им взяться негде, и о дополнительных индексах нужно очевидно подумать заранее.
            0
            Принцип — это в первую очередь Distributed Hash Table. Примеры других технологий — Apache Cassandra, Amazon Dynamo Db и так далее.

            По поводу N-Gram — возможно что-то и понадобится поменять при построении индекса, однако предусмотреть все невозможно. Архитектурно и идейно получится все равно то же самое: обратный индекс построенный при помощши технологий больших данных.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое