Вдохновившись недавней публикацией «Персистентное декартово дерево по неявному ключу», решил написать про реализацию персистентной очереди. Те, кто подумал сейчас, что раз обычная очередь — структура тривиальная, то и её персистентный вариант должен быть очень простым, ошиблись, получающаяся реализация как минимум не проще, чем для вышеуказанного дерева.
Реализовывать мы будем, естественно, так называемую полную персистентность — это значит, что промежуточные версии доступны не только в режиме read-only и мы можем в любой момент времени добавлять и извлекать из них элементы. Кроме того, нам, конечно, хотелось бы иметь такую же асимптотику времени работы и дополнительной памяти, как и для неперсистентного варианта, а именно O(n), где n — суммарное количество операций, совершаемых с нашей очередью. К слову, если ослабить требования до O(n log n), то очередь тривиально эмулируется с помощью персистентного декартового дерева по неявному ключу.
Примем следующий простой интерфейс работы с нашей структурой данных: получающиеся в результате запросов версии очередей будем нумеровать целыми неотрицательными числами, изначально имеется лишь пустая очередь под номером 0. Пусть у нас есть N версий очередей (включая исходную пустую), тогда они занумерованы числами от 0 до N-1 и мы можем выполнить следующие четыре запроса:
Как известно, персистентный стек — очень простая структура данных и асимптотика у него та, что нам и нужна. Кроме того, мы знаем, что очередь можно имитировать с помощью двух стеков. Возникает очевидная идея: сделаем эти два стека персистентными и задача решена! К сожалению, такой простой подход не сработает. Все дело в том, что при данной имитации у нас не всякая операция имеет асимптотику O(1): если при pop’е стек, из которого мы достаем элементы, оказывается пустым, то мы перекладываем в него все элементы из другого стека. В случае с обычным очередью каждый элемент перекладывается только один раз, поэтому суммарная асимптотика остается O(n), но в персистентном случае каждый элемент может принадлежать многим очередям и соответственно быть переложен несколько раз. Пусть, например, q — номер очереди, у которой этот стек пустой, тогда после push(q, 1) и push(q, 2) у полученных очередей этот стек останется пустым и при pop’е из них каждый элемент очереди q будет переложен по два раза. Организовать же какое-либо совместное использование переложенных элементов невозможно в силу того, что самые нижние элементы у полученных перекладыванием стеков будут различны (1 и 2 соотвественно), а персистентный стек устроен таким образом, что каждый элемент хранит указатель на элемент, лежащий ниже его, поэтому в любом случае мы будем вынуждены иметь 2 копии предпоследнего элемента, которые будут указывать на соответствующий последний, а значит и 2 копии предпредпоследнего, чтобы указывать на нужный предпоследний и так далее по цепочке.
Тем не менее существует алгоритм, основанный на такой идее (имитации очереди с помощью двух стеков), гарантирующий, что стек, из которого мы достаем при pop’e, никогда не окажется пустым и использующий для обеспечения этого 6 стеков. Мой же алгоритм эту идею в явном виде не использует, хотя при общие моменты, конечно, найти можно.
Попытаемся использовать ту же структуру, что и при реализации персистентного стека: будем хранить добавляемые элементы в виде дерева (точнее говоря, набора деревьев — леса), в каждой вершине будет лежать добавленный элемент и указатель на предшествующий ему в очереди (точнее, на соответствующую этому предшествующему элементу вершину дерева). Говоря в дальнейшем элемент очереди, я часто буду подразумевать именно соответствующую элементу вершину.
Непустая очередь в таком случае может быть представлена парой указателей на первый и последний элемент очереди, причем вершина, соответствующая первому элементу, обязательно будет предком вершины, соответствующей последнему (ну или совпадать с ней в случае очереди из одного элемента). Пример такой структуры и отображения в ней очереди:

Заметим, что операция push просто создает новую вершину, устанавливая ей в качестве родителя последний элемент исходной очереди (либо она становится новым корнем, если добавляли в пустую), а операция pop наш лес вообще не трогает, просто сдвигает указатель на первый элемент. Тем самым, после создания вершина остается неизменной до конца работы и можно не переживать, что новые операции могут каким-либо образом испортить уже существующие очереди.
Наша единственная проблема при таком подходе – это реализация операции pop (остальные три — тривиальны), непонятно, как определить на какую вершину сдвинуть указатель на первый элемент, ведь для вершины мы храним лишь указатель на родителя, но не на сыновей (коих у одной вершины может быть несколько штук, и, даже если их все хранить, совершенно неясно, какой из них соответствует следующему элементу нашей очереди). Поэтому для каждой очереди, в которой больше двух элементов, будем дополнительно хранить указатель на элемент некоторого односвязного списка, такого, что элемент списка по нашему указателю содержит указатель на второй элемент нашей очереди, а элементы, лежащие ниже его по списку, указатели на третий, четвертый и так далее соответственно. Имея такой указатель для исходной очереди, при выполнении pop'а нам не составит теперь труда определить новую голову очереди, а указатель на такой элемент списка для получившейся очереди – это просто указатель на следующий элемент в данном списке.
Однако, по причинам, аналогичным тем, по которым эмуляция очереди двумя стеками не работает, добиться того, чтобы для каждой очереди ниже по списку лежали абсолютно все её промежуточные вершины, у нас не выйдет, поэтому, пускай, в нем будут лежать только сколько-то первых из них, а мы, когда этот список начнет становиться слишком маленьким, будем строить новый список, продвигаясь шаг за шагом (то есть увеличивая строящийся список на 1 элемент с каждой операцией push или pop) с конца очереди в начало, идя по указателям на родителей вершин нашего дерева и стараясь, чтобы к моменту, когда старый список закончится, у нас уже был новый наготове.
Говоря более формально, для каждой очереди помимо указателей на начало и конец и указателя на элемент уже готового списка будем хранить ещё и указатель на элемент строящегося списка (в случае, если мы ничего не строим, он будет равен 0) и вычислять его значение для новой очереди (полученной после применения к исходной push’a или pop’a) по следующему простому алгоритму:
При этом ни сам исходный элемент списка, ни указатель на него, хранящийся для исходной очереди, мы менять не будем, и вообще, у нас все данные (вершины дерева, элементы списков, данные, хранящиеся для очередей) будут иммутабельны (то есть после создания никогда не будут менять своего значения). Изображение того, как это всё устроено и что происходит при push’e или pop’e, можно увидеть на схеме (сделать её более понятной не получилось, сколько ни старался):

Соответственно, если при достройке списка мы доходим до первого промежуточного элемента, то конструирование для этой очереди завершено (при этом какие-то другие очереди могут продолжать достраивать этот список, но так как старые элементы никогда не меняются, нам это никак не навредит). В таком случае на место указателя на элемент старого списка записываем указатель на элемент нового, а указатель на конструируемый список присваиваем нулю.
Из описания алгоритма видно, что при выполнении каждого запроса мы создаем не более одной новой вершины дерева и не более одного нового элемента списка и тратим на выполнение O(1) времени, то есть асимптотика та, которой мы и добивались, и осталось лишь обсудить правильный критерий.
Будем действовать следующим образом: пока размер нашего списка больше либо равен 1/2 от количества промежуточных вершин (то есть вершин без первой и последней), мы ничего не предпринимаем, как только станет меньше, начинаем конструировать новый. Заметим, что если промежуточных вершин нет, то мы имеем неравенство 0 ≥ 0 и ничего не делаем, то есть в особый случай это выделять не нужно. Докажем, что при таком подходе мы никогда не окажемся в ситуации, когда старый список кончился, а новый ещё не готов.
Для начала докажем следующий инвариант: если мы конструируем новый список, то на каждом шаге 2k + l = s, где k — количество элементов в старом списке, l — в конструируемом (после уже выполненной на этом шагу достройки), s — суммарное количество промежуточных элементов. Доказывать будем методом математической индукции. Рассмотрим первый шаг, когда мы только начали конструирование (l = 1). Заметим, что для новой очереди 2k – s < 0 (наш критерий), однако для исходной 2kold – sold ≥ 0. Посмотрим, как такое могло случиться: если наша операция push, то k = kold, а s = sold + 1, а если pop, то k = kold – 1 и s = sold – 1. Как нетрудно заметить, в обоих случаях 2k – s меньше 2kold – sold ровно на единицу и, так как обе разности — целые числа, значит, вторая из них 0, а первая -1, следовательно, 2k + 1 = s, а наше l как раз единица и есть. База индукции доказана. Докажем переход: пусть на каком-то шаге конструирования 2kold + lold = sold. Тогда l = lold + 1 (достроили список на 1 элемент), в случае push’a: k = kold, s = sold + 1, в случае pop’a: k = kold – 1, s = sold – 1. В обоих случаях равенство выполняется, а значит утверждение доказано.
Предположим теперь, что мы делаем операцию pop, а у исходной очереди нужный нам список пустой (при push’е мы этот список никак не используем). Тогда может быть одно из двух:
Осталось лишь доказать, что, когда мы заканчиваем конструирование, условие отсутствия конструирования выполняется, то есть длина получившегося списка не меньше 1/2 от общего количества промежуточных вершин, иными словами, что в момент завершения 2l ≥ s или, что равносильно, l ≥ s – l. Но что такое s – l? Это количество промежуточных вершин, которые не содержатся в нашем списке. Нетрудно понять, что оно равно количеству операций push, произошедших в процессе конструирования. Однако при каждой такой операции количество элементов в нашем списке также увеличивалось на 1 (возможна ситуация, когда на предыдущем шаге элемент конструируемого списка соответствует третьему по счету элементу очереди и мы делаем pop, тогда для новой очереди элемент списка начинает соответствовать первому промежуточному элементу сразу без достройки, но такое возможно только на последнем шаге конструирования и только при операции pop). Поэтому l ≥ количества таких push’ей (не трудно понять, что даже строго больше), что нам и требовалось доказать.
Для наглядности, продемонстрируем, что происходит в двух крайних случаях: когда мы все время выполняем pop и когда все время push'им:

Напоследок приведу реализацию вышеописанной структуры данных на языке С++ (при реализации используются нововведения С++11). Стоит воспринимать этот код в качестве прототипа, я старался написать его как можно проще и короче, жертвуя всем остальным в угоду этому.
Типы TreeNode, Queue и QueueIntermediateTreeNodeList соотвествуют вершине дерева, очереди и элементу односвязного списка из нашего алгоритма. Булевы переменные в Queue используются для корректного освобождения памяти в деструкторе нашего класса: вершину дерева и элемент списка удаляет та очередь, которая их и создавала (как уже было отмечено выше, при каждом запросе создается не больше одной вершины и не больше одного элемента списка). Вершина дерева создается только при операции push и указатель на неё записывается в last, соответственно, own_last указывает, записана ли в last новая или старая вершина. Созданный же при конструировании элемент списка почти всегда записывается в constructing_intermediate_list, кроме случая, когда он был создан и конструирование на этом же шаге закончилось, в таком случае он переносится в known_intermediate_list, а constructing_intermediate_list становится нулем, и вторая булева переменная own_known_intermediate_list служит как раз для идентификации такого случая.
Второй возможно неочевидный момент — это закрытая функция-член manage_intermediate_lists, которая выглядит следующим образом:
Эта функция реализует вышеописанный алгоритм работы с указателем на строящийся список (обратите внимание, первые три параметра передаются по неконстантной ссылке, причем для первых двух предполагается, что переменные по ссылк�� инициализированы нужными значениями перед вызовом). Важно перед достройкой списка сначала проверять, а не стал ли старый элемент первым промежуточным, потому что, как уже отмечалось, в случае pop'а такой вариант возможен. Может быть кто-то заметил, что мы всегда проверяем на наш критерий начала конструирования:
Во-первых, как видно на схеме, при совершении нескольких различных операций c одной и той же очередью, для которой идет конструирование (на схеме это операции pop(Q) и push(Q, 11)), создается несколько абсолютно идентичных между собой элементов списка (они указывают на один и тот же следующий элемент и одну и ту же вершину дерева — на схеме это два голубых кружочка с цифрой 7). Мы могли бы попробовать «склеить» такие элементы в один, используя, например, структуру, которая при достраивании говорила бы нам, а не сделал ли уже это кто-то перед нами (в качестве такой структуры можно использовать банальный вектор или двусвязный список). Очевидно, такая оптимизация будет хорошо работать (экономить много памяти) в случае сильно разветвленных деревьев.
Во-вторых, у нас есть ещё одно дублирование: когда элементы готового и строящегося списка указывают на одну и ту же вершину дерева (речь идет о зелёно-оранжевых элементах на этой картинке). Конечно, полностью такого дублирования нам не избежать, потому что у списков вполне может быть одинаковое начало и отличающиеся друг от друга продолжения, однако мы могли бы, когда первый из строящихся списков достигнет конца готового (а не начала очереди) просто «подклеить» его к этому концу, тем самым сэкономив немножко памяти. Для оставшихся списков (соответствующих этому готовому, разумеется) все равно придется дублировать. Тем самым, данная оптимизация лучше всего будут себя проявлять, когда у нас на деревьях много длинных прямых веток.
Наконец в-третьих, можно попробовать поиграть с константами. Например, можно достраивать за шаг не по 1 элементу, а по 2, тогда начинать конструирование достаточно при k < 1/3s. Возможно, это будет как-то полезно.
На этом всё, спасибо всем, кто дочитал.
Постановка задачи
Реализовывать мы будем, естественно, так называемую полную персистентность — это значит, что промежуточные версии доступны не только в режиме read-only и мы можем в любой момент времени добавлять и извлекать из них элементы. Кроме того, нам, конечно, хотелось бы иметь такую же асимптотику времени работы и дополнительной памяти, как и для неперсистентного варианта, а именно O(n), где n — суммарное количество операций, совершаемых с нашей очередью. К слову, если ослабить требования до O(n log n), то очередь тривиально эмулируется с помощью персистентного декартового дерева по неявному ключу.
Примем следующий простой интерфейс работы с нашей структурой данных: получающиеся в результате запросов версии очередей будем нумеровать целыми неотрицательными числами, изначально имеется лишь пустая очередь под номером 0. Пусть у нас есть N версий очередей (включая исходную пустую), тогда они занумерованы числами от 0 до N-1 и мы можем выполнить следующие четыре запроса:
- empty(query_id) — проверяет пуста ли очередь c заданным номером;
- front(query_id) — возвращает первый элемент очереди, сама же очередь при этом никак не изменяется и никаких новых очередей не создается. Операция допустима, если очередь не пуста;
- push(query_id, new_element) — создает новую очередь под номером N, получающуюся дописыванием в конец к исходной нового элемента. Старая версия при этом по-прежнему доступна под номером query_id;
- pop(query_id) — создает новую очередь под номером N, получающуюся извлечением из исходной первого элемента. Исходная очередь все также доступна под старым номером. В случае, если исходная очередь была пуста, новая также будет пустой.
Имитация очереди с помощью стеков
For every problem there is a solution which is simple, fast, and wrong.
— The First Law of Online Judges
Как известно, персистентный стек — очень простая структура данных и асимптотика у него та, что нам и нужна. Кроме того, мы знаем, что очередь можно имитировать с помощью двух стеков. Возникает очевидная идея: сделаем эти два стека персистентными и задача решена! К сожалению, такой простой подход не сработает. Все дело в том, что при данной имитации у нас не всякая операция имеет асимптотику O(1): если при pop’е стек, из которого мы достаем элементы, оказывается пустым, то мы перекладываем в него все элементы из другого стека. В случае с обычным очередью каждый элемент перекладывается только один раз, поэтому суммарная асимптотика остается O(n), но в персистентном случае каждый элемент может принадлежать многим очередям и соответственно быть переложен несколько раз. Пусть, например, q — номер очереди, у которой этот стек пустой, тогда после push(q, 1) и push(q, 2) у полученных очередей этот стек останется пустым и при pop’е из них каждый элемент очереди q будет переложен по два раза. Организовать же какое-либо совместное использование переложенных элементов невозможно в силу того, что самые нижние элементы у полученных перекладыванием стеков будут различны (1 и 2 соотвественно), а персистентный стек устроен таким образом, что каждый элемент хранит указатель на элемент, лежащий ниже его, поэтому в любом случае мы будем вынуждены иметь 2 копии предпоследнего элемента, которые будут указывать на соответствующий последний, а значит и 2 копии предпредпоследнего, чтобы указывать на нужный предпоследний и так далее по цепочке.
Тем не менее существует алгоритм, основанный на такой идее (имитации очереди с помощью двух стеков), гарантирующий, что стек, из которого мы достаем при pop’e, никогда не окажется пустым и использующий для обеспечения этого 6 стеков. Мой же алгоритм эту идею в явном виде не использует, хотя при общие моменты, конечно, найти можно.
Описание алгоритма
Попытаемся использовать ту же структуру, что и при реализации персистентного стека: будем хранить добавляемые элементы в виде дерева (точнее говоря, набора деревьев — леса), в каждой вершине будет лежать добавленный элемент и указатель на предшествующий ему в очереди (точнее, на соответствующую этому предшествующему элементу вершину дерева). Говоря в дальнейшем элемент очереди, я часто буду подразумевать именно соответствующую элементу вершину.
Непустая очередь в таком случае может быть представлена парой указателей на первый и последний элемент очереди, причем вершина, соответствующая первому элементу, обязательно будет предком вершины, соответствующей последнему (ну или совпадать с ней в случае очереди из одного элемента). Пример такой структуры и отображения в ней очереди:

Заметим, что операция push просто создает новую вершину, устанавливая ей в качестве родителя последний элемент исходной очереди (либо она становится новым корнем, если добавляли в пустую), а операция pop наш лес вообще не трогает, просто сдвигает указатель на первый элемент. Тем самым, после создания вершина остается неизменной до конца работы и можно не переживать, что новые операции могут каким-либо образом испортить уже существующие очереди.
Наша единственная проблема при таком подходе – это реализация операции pop (остальные три — тривиальны), непонятно, как определить на какую вершину сдвинуть указатель на первый элемент, ведь для вершины мы храним лишь указатель на родителя, но не на сыновей (коих у одной вершины может быть несколько штук, и, даже если их все хранить, совершенно неясно, какой из них соответствует следующему элементу нашей очереди). Поэтому для каждой очереди, в которой больше двух элементов, будем дополнительно хранить указатель на элемент некоторого односвязного списка, такого, что элемент списка по нашему указателю содержит указатель на второй элемент нашей очереди, а элементы, лежащие ниже его по списку, указатели на третий, четвертый и так далее соответственно. Имея такой указатель для исходной очереди, при выполнении pop'а нам не составит теперь труда определить новую голову очереди, а указатель на такой элемент списка для получившейся очереди – это просто указатель на следующий элемент в данном списке.
Однако, по причинам, аналогичным тем, по которым эмуляция очереди двумя стеками не работает, добиться того, чтобы для каждой очереди ниже по списку лежали абсолютно все её промежуточные вершины, у нас не выйдет, поэтому, пускай, в нем будут лежать только сколько-то первых из них, а мы, когда этот список начнет становиться слишком маленьким, будем строить новый список, продвигаясь шаг за шагом (то есть увеличивая строящийся список на 1 элемент с каждой операцией push или pop) с конца очереди в начало, идя по указателям на родителей вершин нашего дерева и стараясь, чтобы к моменту, когда старый список закончится, у нас уже был новый наготове.
Говоря более формально, для каждой очереди помимо указателей на начало и конец и указателя на элемент уже готового списка будем хранить ещё и указатель на элемент строящегося списка (в случае, если мы ничего не строим, он будет равен 0) и вычислять его значение для новой очереди (полученной после применения к исходной push’a или pop’a) по следующему простому алгоритму:
- если у исходной очереди данный указатель был равен 0, проверим, не пора ли нам начать строить новый список (критерий такой проверки будет описан чуть ниже):
- если пора, то создаем элемент списка с нулевым указателем на следующий по списку (тем самым этот созданный элемент будет концом конструируемого списка), а указатель на вершину ставим на предпоследний элемент нашей очереди (родителя последнего элемента),
- если ещё не время, то указатель оставляем равным нулю.
- В случае же ненулевого исходного указателя, достроим наш список вверх на один элемент: создадим новый элемент, указывающий на исходный в качестве следующего по списку и на родителя соответствующей этому исходному элементу вершины.
При этом ни сам исходный элемент списка, ни указатель на него, хранящийся для исходной очереди, мы менять не будем, и вообще, у нас все данные (вершины дерева, элементы списков, данные, хранящиеся для очередей) будут иммутабельны (то есть после создания никогда не будут менять своего значения). Изображение того, как это всё устроено и что происходит при push’e или pop’e, можно увидеть на схеме (сделать её более понятной не получилось, сколько ни старался):

Соответственно, если при достройке списка мы доходим до первого промежуточного элемента, то конструирование для этой очереди завершено (при этом какие-то другие очереди могут продолжать достраивать этот список, но так как старые элементы никогда не меняются, нам это никак не навредит). В таком случае на место указателя на элемент старого списка записываем указатель на элемент нового, а указатель на конструируемый список присваиваем нулю.
Из описания алгоритма видно, что при выполнении каждого запроса мы создаем не более одной новой вершины дерева и не более одного нового элемента списка и тратим на выполнение O(1) времени, то есть асимптотика та, которой мы и добивались, и осталось лишь обсудить правильный критерий.
Критерий проверки на начало конструирования
Будем действовать следующим образом: пока размер нашего списка больше либо равен 1/2 от количества промежуточных вершин (то есть вершин без первой и последней), мы ничего не предпринимаем, как только станет меньше, начинаем конструировать новый. Заметим, что если промежуточных вершин нет, то мы имеем неравенство 0 ≥ 0 и ничего не делаем, то есть в особый случай это выделять не нужно. Докажем, что при таком подходе мы никогда не окажемся в ситуации, когда старый список кончился, а новый ещё не готов.
Для начала докажем следующий инвариант: если мы конструируем новый список, то на каждом шаге 2k + l = s, где k — количество элементов в старом списке, l — в конструируемом (после уже выполненной на этом шагу достройки), s — суммарное количество промежуточных элементов. Доказывать будем методом математической индукции. Рассмотрим первый шаг, когда мы только начали конструирование (l = 1). Заметим, что для новой очереди 2k – s < 0 (наш критерий), однако для исходной 2kold – sold ≥ 0. Посмотрим, как такое могло случиться: если наша операция push, то k = kold, а s = sold + 1, а если pop, то k = kold – 1 и s = sold – 1. Как нетрудно заметить, в обоих случаях 2k – s меньше 2kold – sold ровно на единицу и, так как обе разности — целые числа, значит, вторая из них 0, а первая -1, следовательно, 2k + 1 = s, а наше l как раз единица и есть. База индукции доказана. Докажем переход: пусть на каком-то шаге конструирования 2kold + lold = sold. Тогда l = lold + 1 (достроили список на 1 элемент), в случае push’a: k = kold, s = sold + 1, в случае pop’a: k = kold – 1, s = sold – 1. В обоих случаях равенство выполняется, а значит утверждение доказано.
Предположим теперь, что мы делаем операцию pop, а у исходной очереди нужный нам список пустой (при push’е мы этот список никак не используем). Тогда может быть одно из двух:
- либо для исходной очереди мы конструирование нового списка не начинали, тогда у этой очереди 2k ≥ s,
k = 0 => s = 0, а значит, это очередь размером 0, 1 или 2 и знания её последнего элемента для совершения pop’a достаточно, - либо начинали, тогда l ≠ 0, в силу инварианта 2k + l = s, k = 0 => l = s ≠ 0 => конструируемый список был непуст и содержал все промежуточные элементы, а значит, мы должны были произвести на том шаге замену и получить непустой список.
Осталось лишь доказать, что, когда мы заканчиваем конструирование, условие отсутствия конструирования выполняется, то есть длина получившегося списка не меньше 1/2 от общего количества промежуточных вершин, иными словами, что в момент завершения 2l ≥ s или, что равносильно, l ≥ s – l. Но что такое s – l? Это количество промежуточных вершин, которые не содержатся в нашем списке. Нетрудно понять, что оно равно количеству операций push, произошедших в процессе конструирования. Однако при каждой такой операции количество элементов в нашем списке также увеличивалось на 1 (возможна ситуация, когда на предыдущем шаге элемент конструируемого списка соответствует третьему по счету элементу очереди и мы делаем pop, тогда для новой очереди элемент списка начинает соответствовать первому промежуточному элементу сразу без достройки, но такое возможно только на последнем шаге конструирования и только при операции pop). Поэтому l ≥ количества таких push’ей (не трудно понять, что даже строго больше), что нам и требовалось доказать.
Для наглядности, продемонстрируем, что происходит в двух крайних случаях: когда мы все время выполняем pop и когда все время push'им:

Реализация на С++
Напоследок приведу реализацию вышеописанной структуры данных на языке С++ (при реализации используются нововведения С++11). Стоит воспринимать этот код в качестве прототипа, я старался написать его как можно проще и короче, жертвуя всем остальным в угоду этому.
persistent_queue.h
#ifndef PERSISTENT_QUEUE_H #define PERSISTENT_QUEUE_H #include <cstdlib> #include <vector> using Element_type = int; class Persistent_queue { public: Persistent_queue(); ~Persistent_queue(); Persistent_queue(const Persistent_queue&) = delete; Persistent_queue& operator =(const Persistent_queue&) = delete; using Queue_id = size_t; static const Queue_id start_queue_id = 0; Queue_id push(Queue_id queue_id, const Element_type& value); Queue_id pop(Queue_id queue_id); const Element_type& front(Queue_id queue_id); size_t size(Queue_id queue_id); bool empty(Queue_id queue_id); private: struct TreeNode; struct QueueIntermediateTreeNodeList; struct Queue { const TreeNode* const first; const TreeNode* const last; const size_t size; const QueueIntermediateTreeNodeList* const known_intermediate_list; const QueueIntermediateTreeNodeList* const constructing_intermediate_list; const bool own_last; const bool own_known_intermediate_list; Queue(const TreeNode* const first, const TreeNode* const last, const size_t size, const QueueIntermediateTreeNodeList* const known_intermediate_list, const QueueIntermediateTreeNodeList* const constructing_intermediate_list, const bool own_last, const bool own_known_intermediate_list); ~Queue() = default; Queue(Queue&& source) = default; // needed for vector reallocation Queue(const Queue&) = delete; Queue& operator =(const Queue&) = delete; }; std::vector<Queue> queues_; Queue_id register_new_queue(const TreeNode* const first, const TreeNode* const last, const size_t size, const QueueIntermediateTreeNodeList* const known_intermediate_list, const QueueIntermediateTreeNodeList* const constructing_intermediate_list, const bool own_last, const bool own_known_intermediate_list); void manage_intermediate_lists(const QueueIntermediateTreeNodeList*& known_intermediate_list, const QueueIntermediateTreeNodeList*& constructing_intermediate_list, bool& own_known_intermediate_list, const TreeNode* const first, const TreeNode* const last, const size_t size); }; #endif // PERSISTENT_QUEUE_H
persistent_queue.cpp
#include "persistent_queue.h" #include <cassert> struct Persistent_queue::TreeNode { const TreeNode* const parent; const Element_type element; TreeNode(const TreeNode* const parent, const Element_type& value) : parent(parent), element(value) {} ~TreeNode() = default; TreeNode(const TreeNode&) = delete; TreeNode& operator =(const TreeNode&) = delete; }; struct Persistent_queue::QueueIntermediateTreeNodeList { const Persistent_queue::TreeNode* const front; const QueueIntermediateTreeNodeList* const next; const size_t size; QueueIntermediateTreeNodeList(const Persistent_queue::TreeNode* const front, const QueueIntermediateTreeNodeList* const tail_list) : front(front), next(tail_list), size{tail_list ? tail_list->size + 1 : 1} { assert(front); } ~QueueIntermediateTreeNodeList() = default; QueueIntermediateTreeNodeList(const QueueIntermediateTreeNodeList&) = delete; QueueIntermediateTreeNodeList& operator =(const QueueIntermediateTreeNodeList&) = delete; }; Persistent_queue::Queue::Queue( const Persistent_queue::TreeNode* const first, const Persistent_queue::TreeNode* const last, const size_t size, const Persistent_queue::QueueIntermediateTreeNodeList* const known_intermediate_list, const Persistent_queue::QueueIntermediateTreeNodeList* const constructing_intermediate_list, const bool own_last, const bool own_known_intermediate_list ) : first(first), last(last), size(size), known_intermediate_list(known_intermediate_list) , constructing_intermediate_list(constructing_intermediate_list) , own_last(own_last), own_known_intermediate_list(own_known_intermediate_list) { // Some asserts if (size == 0) { assert(first == nullptr); assert(last == nullptr); } else { assert(first); assert(last); if (size > 1) assert(last->parent); } if (size <= 2) { assert(known_intermediate_list == nullptr); assert(constructing_intermediate_list == nullptr); if (size == 1) assert(first == last); if (size == 2) assert(first == last->parent); } else { assert(known_intermediate_list); assert(first == known_intermediate_list->front->parent); } } size_t Persistent_queue::size(const Persistent_queue::Queue_id queue_id) { return queues_.at(queue_id).size; } bool Persistent_queue::empty(const Queue_id queue_id) { return size(queue_id) == 0; } const Element_type& Persistent_queue::front(const Persistent_queue::Queue_id queue_id) { assert(!empty(queue_id)); return queues_.at(queue_id).first->element; } Persistent_queue::Queue_id Persistent_queue::register_new_queue(const TreeNode* const first, const TreeNode* const last, const size_t size, const QueueIntermediateTreeNodeList* const known_intermediate_list, const QueueIntermediateTreeNodeList* const constructing_intermediate_list, const bool own_last, const bool own_known_intermediate_list) { queues_.emplace_back(first, last, size, known_intermediate_list, constructing_intermediate_list, own_last, own_known_intermediate_list); return queues_.size() - 1; } Persistent_queue::Persistent_queue() { register_new_queue(nullptr, nullptr, 0, nullptr, nullptr, false, false); } Persistent_queue::~Persistent_queue() { for (const auto& q : queues_) { if (q.own_last) delete q.last; if (q.own_known_intermediate_list) delete q.known_intermediate_list; delete q.constructing_intermediate_list; } } Persistent_queue::Queue_id Persistent_queue::push(const Persistent_queue::Queue_id queue_id, const Element_type& value) { const auto& queue_for_push = queues_.at(queue_id); const size_t size = queue_for_push.size + 1; const bool own_last = true; const TreeNode* first; const TreeNode* last; if (queue_for_push.size == 0) { first = last = new TreeNode(nullptr, value); } else { first = queue_for_push.first; last = new TreeNode(queue_for_push.last, value); } bool own_known_intermediate_list; const QueueIntermediateTreeNodeList* known_intermediate_list = queue_for_push.known_intermediate_list; const QueueIntermediateTreeNodeList* constructing_intermediate_list = queue_for_push.constructing_intermediate_list; manage_intermediate_lists(known_intermediate_list, constructing_intermediate_list, own_known_intermediate_list, first, last, size); return register_new_queue(first, last, size, known_intermediate_list, constructing_intermediate_list, own_last, own_known_intermediate_list); } Persistent_queue::Queue_id Persistent_queue::pop(const Persistent_queue::Queue_id queue_id) { const auto& queue_for_pop = queues_.at(queue_id); const bool own_last = false; const TreeNode* first; const TreeNode* last; size_t size; const QueueIntermediateTreeNodeList* known_intermediate_list; if (queue_for_pop.size <= 1) { first = last = nullptr; size = 0; known_intermediate_list = nullptr; } else { last = queue_for_pop.last; size = queue_for_pop.size - 1; if (queue_for_pop.size == 2) { first = queue_for_pop.last; known_intermediate_list = nullptr; } else { assert(queue_for_pop.known_intermediate_list != nullptr); first = queue_for_pop.known_intermediate_list->front; known_intermediate_list = queue_for_pop.known_intermediate_list->next; } } bool own_known_intermediate_list; const QueueIntermediateTreeNodeList* constructing_intermediate_list = queue_for_pop.constructing_intermediate_list; manage_intermediate_lists(known_intermediate_list, constructing_intermediate_list, own_known_intermediate_list, first, last, size); return register_new_queue(first, last, size, known_intermediate_list, constructing_intermediate_list, own_last, own_known_intermediate_list); } void Persistent_queue::manage_intermediate_lists( const Persistent_queue::QueueIntermediateTreeNodeList*& known_intermediate_list, const Persistent_queue::QueueIntermediateTreeNodeList*& constructing_intermediate_list, bool& own_known_intermediate_list, const Persistent_queue::TreeNode* const first, const Persistent_queue::TreeNode* const last, const size_t size) { own_known_intermediate_list = false; const size_t intermediate_nodes_count = (size > 2 ? size - 2 : 0); size_t known_intermediate_list_size = (known_intermediate_list ? known_intermediate_list->size : 0); if (2*known_intermediate_list_size < intermediate_nodes_count) { auto try_to_replace_known_to_constructing = [&](){ if (constructing_intermediate_list && constructing_intermediate_list->front->parent == first) { known_intermediate_list = constructing_intermediate_list; known_intermediate_list_size = constructing_intermediate_list->size; constructing_intermediate_list = nullptr; return true; } return false; }; if (!try_to_replace_known_to_constructing()) { const auto adding_node = (constructing_intermediate_list ? constructing_intermediate_list->front->parent : last->parent); constructing_intermediate_list = new QueueIntermediateTreeNodeList( adding_node, constructing_intermediate_list); if (try_to_replace_known_to_constructing()) own_known_intermediate_list = true; } } // Check invariants if (2*known_intermediate_list_size >= intermediate_nodes_count) assert(constructing_intermediate_list == nullptr); const size_t constructing_intermediate_list_size = (constructing_intermediate_list ? constructing_intermediate_list->size : 0); const auto invariant_sum = 2*known_intermediate_list_size + constructing_intermediate_list_size; assert(invariant_sum >= intermediate_nodes_count); if (constructing_intermediate_list) assert(invariant_sum == intermediate_nodes_count); }
Типы TreeNode, Queue и QueueIntermediateTreeNodeList соотвествуют вершине дерева, очереди и элементу односвязного списка из нашего алгоритма. Булевы переменные в Queue используются для корректного освобождения памяти в деструкторе нашего класса: вершину дерева и элемент списка удаляет та очередь, которая их и создавала (как уже было отмечено выше, при каждом запросе создается не больше одной вершины и не больше одного элемента списка). Вершина дерева создается только при операции push и указатель на неё записывается в last, соответственно, own_last указывает, записана ли в last новая или старая вершина. Созданный же при конструировании элемент списка почти всегда записывается в constructing_intermediate_list, кроме случая, когда он был создан и конструирование на этом же шаге закончилось, в таком случае он переносится в known_intermediate_list, а constructing_intermediate_list становится нулем, и вторая булева переменная own_known_intermediate_list служит как раз для идентификации такого случая.
Второй возможно неочевидный момент — это закрытая функция-член manage_intermediate_lists, которая выглядит следующим образом:
void Persistent_queue::manage_intermediate_lists( const Persistent_queue::QueueIntermediateTreeNodeList*& known_intermediate_list, const Persistent_queue::QueueIntermediateTreeNodeList*& constructing_intermediate_list, bool& own_known_intermediate_list, const Persistent_queue::TreeNode* const first, const Persistent_queue::TreeNode* const last, const size_t size) { own_known_intermediate_list = false; const size_t intermediate_nodes_count = (size > 2 ? size - 2 : 0); size_t known_intermediate_list_size = (known_intermediate_list ? known_intermediate_list->size : 0); if (2*known_intermediate_list_size < intermediate_nodes_count) { auto try_to_replace_known_to_constructing = [&](){ if (constructing_intermediate_list && constructing_intermediate_list->front->parent == first) { known_intermediate_list = constructing_intermediate_list; known_intermediate_list_size = constructing_intermediate_list->size; constructing_intermediate_list = nullptr; return true; } return false; }; if (!try_to_replace_known_to_constructing()) { const auto adding_node = (constructing_intermediate_list ? constructing_intermediate_list->front->parent : last->parent); constructing_intermediate_list = new QueueIntermediateTreeNodeList( adding_node, constructing_intermediate_list); if (try_to_replace_known_to_constructing()) own_known_intermediate_list = true; } } // Check invariants }
Эта функция реализует вышеописанный алгоритм работы с указателем на строящийся список (обратите внимание, первые три параметра передаются по неконстантной ссылке, причем для первых двух предполагается, что переменные по ссылк�� инициализированы нужными значениями перед вызовом). Важно перед достройкой списка сначала проверять, а не стал ли старый элемент первым промежуточным, потому что, как уже отмечалось, в случае pop'а такой вариант возможен. Может быть кто-то заметил, что мы всегда проверяем на наш критерий начала конструирования:
не выделяя отдельно случай, когда конструирование уже идет. Дело в том, что во время всего процесса конструирования наш критерий будет оставаться верным, это очевидным образом следует из нашего инварианта конструирования: 2k + l = s, l > 0 => 2k < s.if (2*known_intermediate_list_size < intermediate_nodes_count) {
Возможные оптимизации
Во-первых, как видно на схеме, при совершении нескольких различных операций c одной и той же очередью, для которой идет конструирование (на схеме это операции pop(Q) и push(Q, 11)), создается несколько абсолютно идентичных между собой элементов списка (они указывают на один и тот же следующий элемент и одну и ту же вершину дерева — на схеме это два голубых кружочка с цифрой 7). Мы могли бы попробовать «склеить» такие элементы в один, используя, например, структуру, которая при достраивании говорила бы нам, а не сделал ли уже это кто-то перед нами (в качестве такой структуры можно использовать банальный вектор или двусвязный список). Очевидно, такая оптимизация будет хорошо работать (экономить много памяти) в случае сильно разветвленных деревьев.
Во-вторых, у нас есть ещё одно дублирование: когда элементы готового и строящегося списка указывают на одну и ту же вершину дерева (речь идет о зелёно-оранжевых элементах на этой картинке). Конечно, полностью такого дублирования нам не избежать, потому что у списков вполне может быть одинаковое начало и отличающиеся друг от друга продолжения, однако мы могли бы, когда первый из строящихся списков достигнет конца готового (а не начала очереди) просто «подклеить» его к этому концу, тем самым сэкономив немножко памяти. Для оставшихся списков (соответствующих этому готовому, разумеется) все равно придется дублировать. Тем самым, данная оптимизация лучше всего будут себя проявлять, когда у нас на деревьях много длинных прямых веток.
Наконец в-третьих, можно попробовать поиграть с константами. Например, можно достраивать за шаг не по 1 элементу, а по 2, тогда начинать конструирование достаточно при k < 1/3s. Возможно, это будет как-то полезно.
На этом всё, спасибо всем, кто дочитал.
