company_banner

Impala для Python-разработчика на примере определения фрода при анализе трафика в маркетинговой платформе

    Всем привет.

    Как известно, есть множество различных систем хранения. Большинство из них рассчитаны на определенный объем данных. Если данных больше, то система хранения начинает вести себя непредсказуемо. Этих проблем лишены системы на базе Hadoop, основанные на файловой системе HDFS. Подобные не слишком часто используются в веб-разработке, но незаменимы для анализа данных и построения отчетов.

    Если нужно предоставить пользователям (менеджерам компании) аналитику за несколько лет относительно кликов пользователей на рекламных объявлениях, измеряющихся в сотнях миллионов в месяц, отговорка в духе “нуу... наш postgres не умеет нормально работать с такими объемами” не работает. Без правильной системы хранения тут не обойтись. Можно было использовать, в том числе, Clickhouse, Snowflake и т.д., но в компании уже была инфраструктура и готовые решения на Impala и Hive. Первая быстрее и работает с памятью. Второй – медленнее и работает с диском. Никто не любит ждать, поэтому была выбрана Impala.

    Недостатков подобных систем не избежать:

    • Доступ только на добавление данных и чтение. Никаких update, delete, etc (за исключением Kudu, о чем речь чуть ниже).

    • Гигабайты… нет, терабайты памяти.

    • Как в python нету ORM для Impala? Какое еще ODBC?

    • Запросы к таблицам из 1000 и 1 000 000 строк занимают сравнимое время.

    • Никаких индексов и прочих способов оптимизации производительности.

    • Тормозит? То, что время ответа измеряется в секундах и десятках секунд, хорошо для построения аналитических отчетов, но так себе для получения информации в real-time’е.

    • Чтобы хватало памяти, необходимо партиционировать данные по какому-то параметру, благо что всегда есть дата. Перезапись данных так же возможна или в рамках всей таблицы, или в рамках партиции.

    В нашем распоряжении оказался кластер из 30 нод с ограничениями в 90 ГБ памяти на ноде и 1 ТБ суммарно. Для работы с данными существует хороший инструмент – Hue ("Хью") – довольно удобный веб-интерфейс для запросов.

    Работа с Impala для python-разработчика сильно отличается от работы с традиционными БД. Драйверов всего 2 – impyla и cloudera odbc. С первым когда-то давно был плохой опыт, плюс у него куча открытых issue на гитхабе. Решено было использовать второй. Для этого нужно качать сам коннектор с сайта, настраивать его, и затем использовать pyodbc. Т.к. наше приложение асинхронное, вместо pyodbc у нас aioodbc – обертка вокруг pyodbc.

    def _convert_timestamp(value):
        unpacked = struct.unpack('6Hxxxx', value)
        if unpacked == (1970, 1, 1, 0, 0, 0):
            return None
        return datetime(*unpacked)
     
     
    class Impala:
        def __init__(self):
            self.dsn = (
                f'DRIVER={{{cfg.impala_driver}}};'
                f'HOST={{{cfg.impala_host}}};'
                f'PORT={{{cfg.impala_port}}};'
                f'SCHEMA={{{cfg.impala_default_schema}}};'
                f'UID={{{cfg.impala_uid}}};'
                f'PWD={{{cfg.impala_pwd}}};'
                f'AUTHMECH=3;'
                f'USESASL=1;'
                f'SSL=0;'
            )
     
            self.pool: Optional[Pool] = None
     
        async def connect(self):
            self.pool: Pool = await create_pool(
                minsize=cfg.impala_min_poolsize,
                maxsize=cfg.impala_max_poolsize,
                dsn=self.dsn,
                after_created=self.after_created,
                autocommit=True,
                pool_recycle=55,
            )
     
            async with self.pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    cursor: Cursor
     
                    result = await cursor.execute('select 1')
                    await result.fetchone()
     
        async def disconnect(self):
            if self.pool is not None:
                self.pool.close()
                await self.pool.wait_closed()
     
        @staticmethod
        async def after_created(connection):
            # Driver return zero date values incorrectly
            connection.add_output_converter(SQL_TYPE_TIMESTAMP, _convert_timestamp)
     
            connection.setdecoding(SQL_CHAR, encoding='utf-8')
            connection.setdecoding(SQL_WCHAR, encoding='utf-8')
            connection.setdecoding(SQL_WMETADATA, encoding='utf-16')
            connection.setencoding(encoding='utf-8')
     
        async def _run_with_retries(
            self,
            query,
            *params,
            retries=0,
            max_retries=cfg.impala_max_retries,
        ):
            try:
                async with self.pool.acquire() as connection:
                    async with connection.cursor() as cursor:
                        result = await cursor.execute(query, *params)
                        return await getattr(result, 'execute').__call__()
            except Error as e:
                if retries < max_retries:
                    retries += 1
                    return await self._run_with_retries(
                        query,
                        *params,
                        retries=retries,
                        max_retries=max_retries,
                    )
                raise
     
        async def execute(self, query, *params):
            return await self._run_with_retries(query, *params)
     
        @staticmethod
        def to_python(data: Union[List[Row], Row]) -> Union[List[dict], dict]:
            if isinstance(data, List):
                return list(map(Impala.to_python, data))
     
            return {info[0]: data[i] for i, info in enumerate(data.cursor_description)}

    Связка из самой Impala и ее драйвера по стабильности сильно уступает обычным БД, поэтому retry-и – наше все. Только после этого всё заработало более-менее корректно.

    Про ОРМ говорить не будем – его нет. Может, это и к лучшему, т.к. сложные запросы в ОРМ выглядят очень нечитаемо, писать raw-sql гораздо приятнее, и код лучше читается. Но отсюда следует и другой недостаток – отсутствие инструментария для миграций схемы таблиц. Пока что мы меняем таблицы вручную, но планируем написать свой маленький инструмент (или кто-то подскажет в комментариях уже готовое решение?)

    SELECT-запросы к Impala немного отличаются от запросов к обычным реляционным БД:

    • Любой запрос делает full scan, т.к. индексов нету, и выгружает запрашиваемые данные в память. Поэтому желательно всегда использовать партиции и фильтры по ним. По сути, одна партиция – один каталог с данными на диске. Меньше партиций выгружено – меньше памяти использовано.

    • Есть несколько форматов хранения данных в Impala, каждый со своими преимуществами и недостатками. Т.к. у нас таблицы с большим количеством колонок, одинаковых для всех записей, мы используем Parquet – он позволяет читать только необходимые колонки и экономить память. Подробнее про Parquet можно почитать тут: https://www.bigdataschool.ru/blog/apache-parquet-avro-spark-big-data.html

    • При добавлении или изменений данных в таблицах Impala перекладывает данные на диске, но при этом, чтобы она впоследствии знала, где и какие данные лежат, нужно обязательно собирать статистику (COMPUTE STATS), иначе анализатор будет работать очень медленно.

    Иногда без обновления существующих данных либо их удаления (привет, GDPR) не обойтись. Impala в базовом варианте так не умеет, кроме перезаписи партиции целиком. Тут нам поможет KUDU. Это движок хранения данных, который совмещает в себе возможность перезаписи данных по primary key и удаления отдельных строк, а также быстрый поиск по столбцам. Для использования KUDU обязательно нужно указать primary key (одно или несколько полей), по которым можно будет делать DELETE и UPSERT, а сами данные должны иметь четкую структуру – жестко заданы поля в таблице и их типы. При этом, KUDU не поддерживает некоторые типы полей (CHAR, DATE, etc.), и, как оказалось на практике, не очень хорошо работает с большими объемами данных. Мы решили мириться со всеми недостатками, чтобы использовать его возможности. Что важно: делать запросы к KUDU можно напрямую из Impala. После этого возможности Impala уже приближаются к возможностям реляционных БД, и пользоваться ими гораздо удобнее.

    Имея эти возможности, попробуем реализовать следующее. Компания платит за рекламу поставщикам трафика (Google, Yandex и др.). Есть несколько моделей оплаты: за клики пользователей по баннеру, за регистрации пользователей, за показы баннеров и т.д. Естественно, всякие недоброжелатели могут делать (и делают) накрутки событий, иначе говоря, фрод. Наша задача – определить такие накрученные события и не платить за них.

    Основной датасет, click_event – данные по рекламным кликам и событиям, к которым привели эти клики (регистрации, логины в игру, платежи и т.д.).

    Есть много правил, определяющих фродовые события, у нас их сейчас несколько десятков. Общая архитектура фичи такова: раз в сутки собирается датасет с фродом для каждого правила. Затем запускается запрос, который аггрегирует всех фродовых пользователей в один датасет. И потом в отдельный датасет складываются все события всех фродовых пользователей – это и есть нужные данные для анализа.

    Пример правила определения фрода: если несколько пользователей зарегистрировались с одного IP-адреса, то это фрод.

    UPSERT INTO {self.model.get_tablename()} (
        id,
        rule_run_id,
        player_id,
        event_id,
        ip_hash,
        regs_cnt,
        analysed_start_day,
        analysed_end_day,
        created_at
    )
    WITH same_ip_regs AS (
        SELECT
            CE.campaign_id,
            CE.campaign_sub_id,
            CE.player_id,
            CE.event_id,
            CE.ip_hash ip_address,
            count(CE.event_id) OVER (PARTITION BY campaign_id, campaign_sub_id, CE.ip_hash) cnt
        FROM click_event CE
        WHERE
            TO_DATE(CE.event_dt) = '{self.config.process_from}' AND
            CE.event_name='registration' AND
            CE.ip_hash != 'unknown'
    )
    SELECT UUID() id, '{rule_run_id}', player_id, event_id, ip_address, cnt, '{self.config.process_from}', '{self.config.process_until}', now() created_at
    FROM same_ip_regs
    WHERE cnt>1

    Далее аггрегируем всех найденных отдельными правилами фрод-пользователей:

    def generate_rules_select_sql(self):
        rules_select_sql = []
    
        for rule in all_rules:
            rule_sql = f'''
                SELECT player_id, '{rule["slug"]}' rule_slug, rule_run_id
                FROM {rule["tablename"]}
                WHERE created_at >= date_add(now(), -{self.config.process_lookback})
                GROUP BY player_id, rule_slug, rule_run_id
            '''
            rules_select_sql.append(rule_sql)
    
        return ' UNION ALL '.join(rules_select_sql)
     
    async def run_etl(self):
        rules_data_sql = self.generate_rules_select_sql()
    
        insert_sql = f'''
            INSERT INTO fraud_user
            (
                id,
                player_id,
                rule_slug,
                rule_run_id,
                created_at,
                load_day
            )
            WITH all_users AS (
                {rules_data_sql}
            )
            SELECT UUID() id, au.player_id, au.rule_slug, au.rule_run_id,
                now() created_at, to_date(now()) load_day
            FROM all_users au
            LEFT ANTI JOIN fraud_user fu
                USING(rule_run_id)
        '''
    
        await impala.execute(insert_sql)

    И, наконец, складываем все фродовые события:

    INSERT INTO fraud_event
    (
        id,
        event_id,
        player_id,
        rule_slug,
        rule_run_id,
        created_at,
        load_day
    )
    SELECT UUID() id,
        fn.event_id,
        fn.player_id,
        fu.rule_slug,
        fu.rule_run_id,
        now() created_at,
        to_date(now()) load_day
    FROM fraud_user fu
    INNER JOIN raw_event fn
        ON fn.player_id = fu.player_id
    LEFT ANTI JOIN fraud_event fe
        ON fe.event_id = fn.event_id AND fe.rule_run_id = fu.rule_run_id
    WHERE fn.dt>= date_add(to_date(now()), -{self.config.process_lookback_for_events})
        AND fu.load_day>=date_add(to_date(now()), -{self.config.process_lookback_for_users})

    Итого: проанализировав сотни тысяч событий за последние несколько дней, за несколько минут получили список фродовых событий, за которые не стоит платить поставщикам трафика. Часто эти данные анализируются вручную: аналитик смотрит, какие фрод-события найдены, какие их параметры, и все ли отработало так, как ожидалось. Если что-то не так – корректирует настройки. Impala позволяет отобразить данные в каких угодно разрезах, что позволяет сделать конструктор запросов. Пример получения списка событий с разбивкой по кампаниям и странам:

    WITH
    // Список фрод-эвентов, отфильтрованный по нужным правилам
    filtered_fraud_event AS (
        SELECT * FROM fraud_event
        WHERE fraud_event.rule_slug IN (<list_of_required_rules>)
    ),
    // Список фрод-эвентов, отфильтрованный по нужным правилам, без дублей
    fraud_events_wo_dups AS (
        SELECT * FROM (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_id) AS rn FROM filtered_fraud_event
        ) AS t
        WHERE t.rn=1
    ),
    // Список фрод-эвентов, отфильтрованный по нужным правилам, без дублей в рамках каждого правила
    fraud_events_wo_dups_by_rules AS (
        SELECT * FROM (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id, rule_slug ORDER BY event_id) AS rn FROM filtered_fraud_event
        ) AS t
        WHERE t.rn=1
    ),
    // Список событий с кампаниями, отфильтрованный по датам, эвентам и кампаниям
    augmented_click_event AS (
        SELECT campaign.full_name, click_event.country, click_event.event_id, click_event.event_name
        FROM click_event
        LEFT JOIN campaign ON campaign.id=click_event.campaign_id
        WHERE 
            click_event.event_dt <= '{self.config.report_to}' 
            AND click_event.event_dt >= '{self.config.report_from}' 
            AND click_event.event_name IN ('registration') 
            AND click_event.campaign_id IN (<list_of_required_campaigns>)
    )
     
    // Считаются нужные метрики. Записи с layer=1 - сами эвенты, по типам. С layer=2 - фродовые эвенты по типам правил. С layer=3 - фродовые эвенты по типам эвентов.
    // В следующем запросе отдельно выбираются данные по каждому из layer-ов и объединяются в один датасет для удобства оперирования.
    SELECT
        ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 THEN raw_data.event_count ELSE 0 END)) AS 'event_count',
        ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 and raw_data.event_name='registration' THEN raw_data.event_count ELSE 0 END)) AS 'event_count.registration',
        ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count',
        ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 and raw_data.event_name='registration' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count.registration',
        ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 2 and raw_data.rule_slug='<rule_name>' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_rule_count.<rule_name>',
        ...<the_same_block_for_each_fraud_rule>...
     
        raw_data.country, raw_data.full_name
    FROM (
        SELECT 
            augmented_click_event.country, 
            augmented_click_event.event_name,
            augmented_click_event.full_name,
            null AS rule_slug,
            COUNT(augmented_click_event.event_id) AS event_count,
            0 AS fraud_event_count,
            1 AS layer
        FROM augmented_click_event
        GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
     
        UNION ALL
     
        SELECT 
            augmented_click_event.country, 
            augmented_click_event.event_name, 
            augmented_click_event.full_name,
            null AS rule_slug,
            0 AS event_count,
            count(fraud_events_wo_dups.event_id) AS fraud_event_count,
            3 AS layer
        FROM augmented_click_event
        LEFT JOIN fraud_events_wo_dups
        ON fraud_events_wo_dups.event_id=augmented_click_event.event_id
        WHERE fraud_events_wo_dups.event_id IS NOT NULL
        GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
     
        UNION ALL
     
        SELECT 
            augmented_click_event.country, 
            augmented_click_event.event_name, 
            augmented_click_event.full_name,
            fraud_events_wo_dups_by_rules.rule_slug AS rule_slug,
            0 AS event_count,
            count(fraud_events_wo_dups_by_rules.event_id) AS fraud_event_count,
            2 AS layer
        FROM augmented_click_event
        LEFT JOIN fraud_events_wo_dups_by_rules
        ON fraud_events_wo_dups_by_rules.event_id=augmented_click_event.event_id
        WHERE 
            fraud_events_wo_dups_by_rules.event_id IS NOT NULL 
            AND fraud_events_wo_dups_by_rules.rule_slug IN (<list_of_required_rules>)
        GROUP BY 
            augmented_click_event.country, 
            augmented_click_event.event_name, 
            augmented_click_event.full_name, 
            fraud_events_wo_dups_by_rules.rule_slug
    ) AS raw_data
     
    GROUP BY raw_data.country, raw_data.full_name
    ORDER BY raw_data.country, raw_data.full_name

    В результате получаем красивую табличку, понятную менеджеру:

    Wargaming
    Компания

    Комментарии 2

      0
      бегите от куду. оно не масштабируется. там ограничение около 1000 таблетов на узел. учитывая что каждый таблет требует минимум 2 копии то реально лимит 333 таблетов на узел. это мало учитывая что в бигдате таблицы с сотнями партиций норма.
        0

        Именно поэтому мы использовали куду настолько мало, насколько это возможно.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое