Распараллеливание длительных операций

    Мне часто приходится сталкиваться с задачами, требующими от базы данных очень большой производительности при обработке больших массивов данных. Сегодня я расскажу об очень простом, но действенном приеме, который может вас выручить, если база уже не поспевает за тем количеством данных, которые скапливаются и должны быть обработаны. Метод не зависит от базы данных, но по привычке публикую в блог PostgreSQL, и пример будет именно на ней. Давайте сразу перейдем к примеру.

    Сферический конь


    Предположим, мы говорим о простейшем биллинге (понятно, что метод применим не только для биллинга, но с ним это будет выглядеть достаточно наглядно). Таблица, в которой скапливаются данные о звонках абонентов будет в нашем случае иметь такой формат:
    CREATE TABLE billing.calls
    (
    call_id BIGINT,
    call_time TIMESTAMP,
    subscriber_id INTEGER,
    duration INTERVAL
    );


    * This source code was highlighted with Source Code Highlighter.

    Вот так вот все простенько. Данные о звонках ложатся в эту табличку с безумной скоростью, и должны быть за разумное время протарифицированы.

    Для тарификации у нас есть функция БД с такой сигнатурой:
    FUNCTION calculate(IN subscriber_id INTEGER, IN duration INTERVAL, OUT status_code text) RETURNS void

    * This source code was highlighted with Source Code Highlighter.

    Тарификацию мы проводим запуская раз в 5 минут вот такой запрос:
    SELECT calculate(subscriber_id, duration) FROM billing.calls;

    * This source code was highlighted with Source Code Highlighter.

    И в один прекрасный момент мы понимаем, что этот запрос просто не успевает выполниться за 5 минут. За это время набегает еще больше данных, потом еще, и вот мы сидим и ждем ночи, когда поток чуть-чуть ослабнет, и очередь наконец разгребется. Такая вот перспектива. Надо сказать, что сидели и ждали мы не в одиночку. Вместе с нами просиживали 3 (к примеру) оставшихся ядра нашего сервера, пока одно корпело над запросом. PostgreSQL, к сожалению, не умеет распараллеливать запросы сам, но в нашем случае этого и не нужно. Намного лучшие результаты даст очень простая и очевидная уловка. Создаем индекс по функции «остаток от деления subscriber_id на 4»:
    CREATE INDEX billing.calls_subscriber_id_mod_idx ON billing.calls USING btree ((subscriber_id % 4));

    * This source code was highlighted with Source Code Highlighter.

    А теперь запускаем в четыре потока (например четыре разных джоба):
    SELECT calculate(subscriber_id, duration) FROM billing.calls WHERE subscriber_id % 4 = @mod;

    * This source code was highlighted with Source Code Highlighter.

    где mod равен 0,1,2 или 3 (для каждого потока свой).

    В результате


    Этот прием решает проблемы с блокировками, которые могут возникнуть, если двум разным потокам попадется звонок одного абонента. Также, в параллель эти джобы будут отрабатывать быстрее, чем если бы мы уповали на распараллеливание самой БД (если у нас не постгре, а оракл, например).

    Метод применим для любой базы, поддерживающей индекс по функциям (Oracle, Postgresql). В случае MSSQL можно создать calculated column и индекс по ней. В MySQL поддержки функциональных индексов нет, но, в качестве обходного пути, можно создать новый столбец с индексом по нему, и обновлять его триггером.
    Share post

    Similar posts

    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 65

      +2
      Как говорят: «дёшево и сердито». Хорошее решение всегда простое. Спасибо за заметку.
        +1
        Спасибо за топик. Сам работаю c хранилищами данных, так что интересно узнать, как там с OLTP-системами люди работают, для расширения кругозора, так сказать :)

        Еще маленькое лирическое отступление, а то сферический конь получился хромым :)
        Все-таки должно быть поле, например, billed (false или true), которое заполняется по-умолчанию false и переводится в true внутри функции.
        Тогда функция должна работать только по тем записям, где billed = false. Ошибочка небольшая и вообще топик не о том, но она способствует некоторому непониманию.
          +1
          Да, вы правы, так даже нагляднее. Индекс тогда выглядит как:
          CREATE INDEX billing.calls_subscriber_id_mod_idx ON billing.calls USING btree (subscriber_id % 4, billed);

          * This source code was highlighted with Source Code Highlighter.

          И выборка идет еще шустрее.
            0
            Не знаю, как это реализовано в Postgres, но в оракле я бы стал в данном случае использовать индекс по (billed, subscriber_id % 4), таким образом, чтобы range scan шел сначала по всем записям, у которых billed = 1, а затем уже делил на шарды по модулю.
              +1
              Тьфу, то есть по остатку от деления. И в оракле это будет не subscriber_id%4, а mod(subscriber_id, 4).
                +1
                Да, вы правы, спешил когда писал.
                  +2
                  В постгре еще можно partial индекс создать, тогда вообще проблем никаких:
                  CREATE INDEX billing.calls_subscriber_id_mod_idx ON billing.calls USING btree (subscriber_id % 4) WHERE billed = FALSE;

                  * This source code was highlighted with Source Code Highlighter.
              +1
              На самом деле этот прием я подглядел когда-то давно именно в биллинге (есть такой замечательный биллинг — БИС от петер-сервис, его Мегафон использует). У них отложенное применение скидок, и по таблице вызовов они идут в 8 или 16 потоков используя похожий индекс по полю % 8 или 16.
              0
              Хочу уточнить. Постгрес точно будет использовать индекс в случае запроса?

              SELECT calculate(subscriber_id, duration) FROM billing.calls WHERE subscriber_id % 4 = @mod;

              * This source code was highlighted with Source Code Highlighter.


              Самому проверить пока негде, а идея нравится. Думаю, дать ей шанс :)
                0
                Думаю, там имелось ввиду выражение call_id % 4, имеющее равномерное распределение.
                Если брать subscriber_id % 4, то в каждом «шарде» будет разное число записей, зависящее от трафика конкретных абонентов.
                  +1
                  Нет, имелось в виду именно subscriber_id % 4. Смысл в том, чтобы один абонент обрабатывался всегда в одном потоке — иначе мы возможно встрянем на блокировке, например, когда два потока возьмутся баланс одному абоненту обновлять.
                    0
                    Понял. В принципе, если у нас тысячи/миллионы абонентов и мы разделим их на 4 равных по количеству «лагеря», то и суммарный трафик у них будет примерно одинаков. Зато действительно удастся избежать лишних блокировок и потерянных обновлений.
                      0
                      Ага, когда у вас каждый поток тарифицирует по 100 тысяч вызовов за раз, встрять на блокировке на каком-то несчастном абоненте никак нельзя.
                  0
                  Да, использует.
                    0
                    Да использует. Но толку от него никакого.

                    Без индекса:
                     Seq Scan on a  (cost=0.00..186284.08 rows=47345 width=8) (actual time=0.025..9414.434 rows=2500000 loops=1)
                       Filter: ((num % 4) = 0)
                     Total runtime: 12837.127 ms
                    (3 rows)
                    


                    С индексом:
                     Bitmap Heap Scan on a  (cost=944.32..47774.81 rows=47345 width=8) (actual time=10434.219..21909.066 rows=2500000 loops=1)
                       Recheck Cond: ((num % 4) = 0)
                       ->  Bitmap Index Scan on a_idx  (cost=0.00..932.48 rows=47345 width=0) (actual time=10423.375..10423.375 rows=2500000 loops=1)
                             Index Cond: ((num % 4) = 0)
                     Total runtime: 25484.070 ms
                    

                0
                спасибо! попробуем применить на практике
                  +1
                  Действительно, вызывает некоторое сомнение эффективность такого индекса
                    0
                    Сейчас стоит задача на постгресе — сделать апдейт 500 000 записей (информация о книгах) из одной таблицы импорта в 3 таблицы живых данных. На девелоперской машине занимает около 15 мин, а на VPS больше часа. Ваш пост открыл мне глаза на то как можно легко и изящно ускорить процесс импорта. Спасибо.
                      +3
                      pg_try_advisory_lock() как раз для таких задач.
                      dklab.ru/chicken/nablas/53.html
                        0
                        Обалденно, спасибо большое, не знал об этом)
                        +1
                        Подправьте в статье
                        CREATE INDEX billing.calls_subscriber_id_mod_idx ON billing.calls USING btree ((subscriber_id % 4));

                        по крайней мере Postgres 8.4.1 так требует
                          +1
                          Подправил, спасибо
                          0
                          имхо btree индекс с таким плохим распределением не будет хорошо работать, возможно даже без него будет лучше — попробуйте

                          Если кстати процесса всего 4 и очень хочется индекс, то эффективнее будет сделать четыре индекса вида CREATE INDEX calls_idx$1 ON billing.calls WHERE id%4=$1
                            0
                            CREATE INDEX calls_idx$1 ON billing.calls (id) WHERE id%4=$1
                              0
                              Да, тоже правильно. Можно еще bitmap индекс создать. А вообще если мы каждый раз чешем по всей таблице, то индекс выигрыш даст минимальный, а вот если появляется поле billed (как выше в комментариях), то ситуация меняется. В примере billed нет чтобы не отвлекать внимание от сути.
                                +1
                                если чесать по всей таблице, то sequence scan сильно дешевле перебора по индексу из-за отсутствия random seek. В идеале постгресс сам это понимает и не пользует индекс, на практике за этим надо следить, особенно на больших таблицах и/или функциональных индексах
                                  0
                                  Не, если потоков 16, то выигрыш есть:).
                            0
                            А вы уверены, что они у вас будут выбираться по индексу?
                            При большом кол-ве строк и большой выборке база скорее всего будет делать seq scan и фильтровать. Хотя может зависеть от конкретных чисел.

                            Но это никак не повлияет на распределение задач.
                              0
                              Выше в комментариях идет дискуссия на этот счет. Более жизнеспособный пример habrahabr.ru/blogs/postgresql/76309/#comment_2217268, при котором индекс дает преимущество. Если идти по всей таблице в 4 потока — выигрыша по сути нет. Если в 16 — есть. Так что все зависит от задачи.
                              0
                              А если просто сделать N-таблиц, которые будут выглядеть как одна billing.calls (VIEW)?
                                0
                                Нет, это плохо. А если вам 4-х потоков уже не хватает, а нужно 8? Замучаетесь таблицы пересоздавать.
                                  0
                                  В PostgreSQL можно использовать partitions.
                                    0
                                    Наверное не так в первый раз сказал. Такой вариант возможен и применим. Его минус — ребалансировка если вам нужно увеличить количество потоков (легко обходится если вы сразу создадите большее количество секций, чем используется) + накладные расходы связанные с самим секционированием.
                                    0
                                    > А если вам 4-х потоков уже не хватает, а нужно 8?

                                    И тут-то вы лёгким движением руки превратите 4-хъ-ядерник в 8-и? :-)

                                    Вообще-то, ничто не мешает заранее подумать об этом, и раскидывать на большее кол-во таблиц, а просто менять схему выборки, в зависимости от действительного кол-ва ядер.
                                      0
                                      Да-да, я выше ответил также. Только лучше использовать секционирование, а не VIEW и несколько таблиц. Но это все очень накладно.
                                  0
                                  решение кривое, т.к. если сервер не успевает обрабатывать FIFO то на нескольких процессах он должен la сервера поднять и затормозить работу сервера. Плюс ко всему пойди потом разберись если один потом будет сваливаться на ошибке на кривой строке и остальные три будут работать нормально.

                                  Такая задача решается мониторингом длины очереди и контролем при запуске скрипта того что старый завершен.
                                  Для этого можно использовать flock() и lock-pid файлы.
                                    0
                                    и кстати проблема возникнет опять когда объем данных вырастет в 4 раза :)
                                      0
                                      Добавим еще 4 ядра и сделаем 8 потоков:). Только не спрашивайте, что будет если объем вырастет еще раз в 5:).
                                        0
                                        еще на всякий случай как минимум одно ядро надо свободным оставлять.
                                      +1
                                      Я вам об одном сферическом коне, а вы мне о своем:).

                                      >>решение кривое, т.к. если сервер не успевает обрабатывать FIFO
                                      В примере не было FIFO, но мне не жалко, пусть будет.

                                      >>то на нескольких процессах он должен la сервера поднять и затормозить работу сервера.
                                      Угу, три ядра у нас простаивало, а теперь они заняты работой и усиленно тормозят работу сервера, да.

                                      >>Такая задача решается мониторингом длины очереди и контролем при запуске скрипта того что старый завершен.
                                      В примере так и делается. Просто когда скрипт перестает справляться, очередь начинает накапливаться.
                                        0
                                        а триггером нельзя калькуляцию сделать?
                                          0
                                          плюс еще можно калькуляцию делать не одним запросом для всех, а для каждого свой запрос, тогда субд сама все раскидает как сумеет.
                                            0
                                            … я про асинхронный вызов запросов :)
                                              0
                                              ээ, а как вы себе это представляете?
                                                0
                                                Представляют так — сейчас бд у вас принимает запросы синхронно, друг за другом. А можно накидать ей кучу запросов без ожидания их завершения и пусть сама разгребает по мере своих способностей.
                                                Можно использовать например драйвер developer.postgresql.org/pgdocs/postgres/libpq-async.html
                                                  0
                                                  Каких конкретно запросов накидать?
                                                    0
                                                    SELECT calculate(subscriber_id, duration) FROM billing.calls WHERE subscriber_id = 1
                                                      0
                                                      А абонентов у вас 5 миллионов. Откуда знаете какой звонил за последние 5 минут?
                                                        +1
                                                        SELECT DISCTINCT subscriber_id FROM billing.calls

                                                        foreach result as item
                                                        SELECT calculate(subscriber_id, duration) FROM billing.calls WHERE subscriber_id = %item%
                                                          0
                                                          Это в один поток. Я сейчас молчу про потери при лишнем seq scan'е и сортировке таблицы. А если захотите асинхронно по каждому пустить — придется вытаскивать результат SELECT DISTINCT на клиента. А потом еще затрачивать время на открытие нового connection к базе для запуска очередного запроса.
                                                            0
                                                            нет.

                                                            distinct я указал для понимания, есть масса способов его обойти enable_seqscan = off.

                                                            открывается pconnect, узнаете из индекса который уже в памяти все id клиентов, в той же tcp сессии накидываете запросы SELECT calculate(subscriber_id, duration) FROM billing.calls WHERE subscriber_id = %item% по количеству клиентов и закрываете соединение. скрипт завершается. база начинает работать. т.к. каждый запрос оформлен отдельно, субд сама решает где у нее свободное ядро и кладет запрос на него.
                                                              0
                                                              вы изначально скриптом пытаетесь решить задачу которую должна решать субд короче.
                                                                0
                                                                Я пытаюсь быстро решить поставленную задачу. А вот что вы пытаетесь сделать для меня до сих пор загадка.

                                                                >>в той же tcp сессии накидываете
                                                                Для каждого запроса вам в любом случае придется открывать новое соединение с БД. Ну никак без этого — одно соединение = выполнение в 1 поток. Так что это заглохнет сразу же. Вы можете сейчас мне что-то ответить, но лучше объясните — какой смысл во всех этих телодвижениях, когда проблема решается примерно за 1 минуту созданием одного индекса и трех дополнительных джобов?
                                                                  0
                                                                  Зачем для каждого запроса открывать новое соединение я не пойму. Преимущество асинхронного подхода в том что т.к. скрипту не нужны результаты выполнения запросов, он может сразу отдать все запросы (по запросу на клиента) бд, а бд сама все сделает как надо. Делается это все в ту же минуту, огорода никакого нет, зависимости от количества ядер нет.

                                                                  В mysql аналог примерно такой
                                                                  INSERT DELAYED INTO result (uid, summa) SELECT uid, sum(mins) FROM log WHERE uid=$userId GROUP BY uid

                                                                  Скрипт отдает задание БД и завершается почти мгновенно.
                                                                    0
                                                                    Не бывает запроса без соединения с БД, хоть какого-нибудь. Вы по ссылке которую мне дали ходили? developer.postgresql.org/pgdocs/postgres/libpq-async.html

                                                                    int PQsendQuery(PGconn *conn, const char *command);
                                                                    Что такое *conn по вашему? На каждое новое соединение форкается новый процесс.
                                                                      0
                                                                      вы про персистент-коннекты слышали?
                                                                        0
                                                                        И что? Договаривайте уж.
                                                                          0
                                                                          так сказал же уже несколько раз выше — установили соединение, делаете запрос, зачем отконнекчиваться — делаете еще запрос, и так пока не закончатся запросы, потом закрываете соединение.
                                                                            0
                                                                            Вы понимаете, что если вы новое соединение создавать не будете, то запросы будут выполняться синхронно?
                                                                              0
                                                                              да, действительно вы правы — в исходниках увидел что асинхронность в либе кривая. Тогда надо смотреть на цену локального коннекта. Можно конечно сделать пачки по uid, но тогда получится нечто среднее между вашим и моим вариантом.
                                                                              а асинхронные коммиты смотрели как работают — они тоже в одной нити будут выполнятся если пачку селектов засунуть в один коммит и если каждый селект обернуть асинхронним коммитом?
                                                                              developer.postgresql.org/pgdocs/postgres/wal-async-commit.html

                                                                                0
                                                                                Цена коннекта большая — форкается новый процесс. Я руководствуюсь простым правилом — если есть простое решение, я использую его. Вне зависимости от результата, вы предлагаете очень сильно все усложнить.
                                                                                  0
                                                                                  мне архитектура такого подхода к обработке инфы не нравится. мне нравится модульность. то что скрипт ждет выполнение каждой команды мне кажется совершенно лишним. я бы, вероятно, сделал на mysql с insert delayed, или, что скорее всего такую задачу решил бы вообще не на реляционной субд.
                                                                                    0
                                                                                    В случае биллинга задача тарификации вызова далеко не единственная, так что без реляционной субд обойтись никак не получится. А так — решение всегда зависит от задачи, а не наоборот.
                                            0
                                            Это замедлит вставку, а при ошибке, вставка вообще не пройдет. Если вставлять быстро (COPY в postgres), то триггер игнорируется. Вообщем-то с триггерами я всегда руководствуюсь простым правилом — если есть возможность избежать использования триггеров — выбирай ее всегда.

                                      Only users with full accounts can post comments. Log in, please.