Pull to refresh

Рецепты PostgreSQL: отправка писем с отчётом о доставке

Reading time5 min
Views6.6K

Для приготовления отправки электронных писем с отчётом о доставке (а точнее с подтверждением о принятии или отклонении писем почтовым сервером получаетеля) нам понадобится сам postgres и его плагины планировщик асинхронных задач и cURL, а также локальный почтовый сервер, gawk и его плагин. Можно также воспользоваться docker-compose.

Итак, для начала, компилируем и устанавливаем плагины планировщик асинхронных задач и cURL.

CREATE EXTENSION IF NOT EXISTS pg_curl;

Далее,

CREATE TABLE email ( -- создаём таблицу для писем
    id bigserial NOT NULL PRIMARY KEY, -- первичный ключ
    timestamp timestamp without time zone NOT NULL DEFAULT now(), -- дата-время создания письмя
    subject text NOT NULL, -- тема письма
    sender text NOT NULL, -- отправитель письма
    recipient text[] NOT NULL, -- массив получателей письма
    body text NOT NULL, -- тело письма
    message_id integer, -- идентификатор письма (из локального почтового сервера)
    result text[] NOT NULL -- массив результатов (статусов) письма (для каждого получателя)
);

а также

CREATE FUNCTION email_trigger() RETURNS trigger -- создаём триггерную функцию
    LANGUAGE plpgsql -- на языке plpgsql
    AS $_$ <<local>> declare -- с локальными переменными
    input text; -- текст асинхронной задачи
    recipient text; -- получатель письма
begin
    if tg_when = 'BEFORE' then -- перед
    		-- вставкой или обновлением (когда массив получателей обновился)
        if tg_op = 'INSERT' or (tg_op = 'UPDATE' and new.recipient is distinct from old.recipient) then
        		-- заполняем/обнуляем массив результатов в соответствии с массивом получателей
            new.result = array_fill('new'::text, array[array_length(new.recipient, 1)]);
        end if;
    elsif tg_when = 'AFTER' then -- после
        if tg_op = 'INSERT' then -- вставки
        		-- генерируем текст задачи
            local.input = format($format$select send(%1$L)$format$, new.id);
            -- и добавляем новую задачу с указанным текстом
            -- и датой-временем из письма (что позволяет сделать отложенную отправку),
            insert into task (input, plan) values (local.input, new.timestamp);
        end if;
    end if;
    -- и завершаем триггер
    return case when tg_op = 'DELETE' then old else new end;
end;$_$;
-- затем создаем триггер перед
CREATE TRIGGER email_after_trigger AFTER INSERT OR UPDATE OR DELETE ON email FOR EACH ROW EXECUTE PROCEDURE email_trigger();
-- и после
CREATE TRIGGER email_before_trigger BEFORE INSERT OR UPDATE OR DELETE ON email FOR EACH ROW EXECUTE PROCEDURE email_trigger();

и конечно

CREATE FUNCTION send(email_id bigint) RETURNS text -- создаём функцию отправки писем
    LANGUAGE plpgsql -- на языке plpgsql
    AS $$ <<local>> declare -- с локальными переменными
    email email; -- письмо
    headers text; -- заголовки
    recipient text; -- получатель
begin
		-- задаём адрес нашего локального почтового сервера
    perform curl_easy_setopt_url('smtp://smtp:25');
    -- загружаем письмо в локальную переменную
    select * from email where id = send.email_id into local.email;
    -- для всех получателей письма
    foreach local.recipient in array local.email.recipient loop
    		-- задаём получаетеля
        perform curl_recipient_append(local.recipient);
        -- и ещё раз
        perform curl_header_append('To', local.recipient);
    end loop;
    -- задаём тему письма
    perform curl_header_append('Subject', local.email.subject);
    -- задаём отправителя письма
    perform curl_header_append('From', local.email.sender);
    -- тут ещё раз можно задать отправителя письма, но в этом случае
    -- отправителю будет приходить уведомление о недоставке
    --perform curl_easy_setopt_mail_from(local.email.sender);
    perform curl_mime_data(local.email.body, type:='text/plain; charset=utf-8', code:='base64');
    -- задаём таймаут
    perform curl_easy_setopt_timeout(10);
    -- устанавливаем массив статусов в соответствии с массивом получателей
    update email as e set result = array_fill('sent'::text, array[array_length(e.recipient, 1)]) where id = send.email_id;
    -- выполняем отправку
    perform curl_easy_perform();
    -- получаем заголовки отправки
    local.headers = curl_easy_getinfo_header_in();
    begin
    		-- вычисляем идентификатор письма
        update email set message_id = ('x'||(regexp_match(local.headers, E'250 2.0.0 (\\w+) Message accepted for delivery'))[1])::bit(28)::int where id = send.email_id;
        -- при ошибке - предупреждаем об этом
        exception when others then raise warning 'ERROR: % - %', sqlstate, sqlerrm;
    end;
    -- возвращаем заголовки
    return local.headers;
end;$$;

Теперь перейдём к настройке нашего локального почтового сервера. Файл конфигурации smtpd.conf

# задаём процедуру
proc "smtpd.awk" "/usr/bin/gawk -l /usr/local/lib/gawk/pgsql -f /etc/smtpd/smtpd.awk" user smtpd group smtpd
# при выполнении фильтра
filter "smtpd.awk" proc "smtpd.awk"
# слушаем везде с заданным выше фильтром
listen on 0.0.0.0 filter "smtpd.awk"
# и действием
action "relay" relay filter "smtpd.awk"
# для всеговыполняем заданное выше действие
match from any for any action "relay"

и собсвенно сама процедура smtpd.awk

function connect() { # функция соединения с базой
		# создаём подключение
    conn = pg_connect("application_name=smtp target_session_attrs=read-write")
    if (!conn) { # если не получилось
        print("!pg_connect") > "/dev/stderr" # сообщаем об этом
        exit 1 # и выходим
    }
    # создаём подготовленный оператор для обновления массива результатов с заданными параметрами
    rcpt = pg_prepare(conn, "UPDATE email SET result[array_position(recipient, $3)] = $2 WHERE message_id = ('x'||$1)::bit(28)::int")
    if (!rcpt) { # если не получилось
        print(pg_errormessage(conn)) > "/dev/stderr" # сообщаем об этом
        exit 1 # и выходим
    }
    # а также для обновления неудачей
    rollback = pg_prepare(conn, "UPDATE email SET result = array_fill('permfail'::text, array[array_length(recipient, 1)]) WHERE array_length(recipient, 1) = 1 and message_id = ('x'||$1)::bit(28)::int")
    if (!rollback) { # если не получилось
        print(pg_errormessage(conn)) > "/dev/stderr" # сообщаем об этом
        exit 1 # и выходим
    }
}
BEGIN { # вначале
    FS = "|" # задаём разделитель
    OFS = FS # и ещё раз
    _ = FS # и ещё
    connect() # и подключаемся к базе
}
"config|subsystem|smtp-out" == $0 { # по команде настройки
		# регистрируем колбэк на получателя
    print("register|report|smtp-out|tx-rcpt")
    # и колбэк на ошибку
    print("register|report|smtp-out|tx-rollback")
    next # переходим к следующей команде
}
"config|ready" == $0 { # по команде готовности
    print("register|ready") # сообщаем о готовности
    fflush() # сбрасываем буферы
    next # и переходим к следующей команде
}
"report|smtp-out|tx-rcpt" == $1_$4_$5 { # по команде получателя
		# создаём массив агрументов для подготовленного выше оператора
    val[1] = $7 # идентификатор письма
    val[2] = $8 # результат получателя
    val[3] = $9 # получатель письма
    res = pg_execprepared(conn, rcpt, 3, val) # выполняем подготовленный выше оператор
    if (res == "ERROR BADCONN PGRES_FATAL_ERROR") { # при ошибке (отключения от базы)
        connect() # ещё раз подключаемся к базе
        res = pg_execprepared(conn, rcpt, 3, val) # и снова выполняем
    }
    if (res != "OK 1") { # если неудачно
        print(pg_errormessage(conn)) > "/dev/stderr" # сообщаем об этом
    }
    pg_clear(res) # освобождаем рузультат
    delete val # и аргументы
    next # затем переходим к следующей команде
}
"report|smtp-out|tx-rollback" == $1_$4_$5 { # по команде ошибки
		# создаём массив агрументов для второго подготовленного выше оператора
    val[1] = $7 # идентификатор письма
    res = pg_execprepared(conn, rollback, 1, val) # выполняем подготовленный выше оператор
    if (res == "ERROR BADCONN PGRES_FATAL_ERROR") { # при ошибке (отключения от базы)
        connect() # ещё раз подключаемся к базе
        res = pg_execprepared(conn, rollback, 1, val) # и снова выполняем
    }
    if (res != "OK 1") { # если неудачно
        print(pg_errormessage(conn)) > "/dev/stderr" # сообщаем об этом
    }
    pg_clear(res) # освобождаем рузультат
    delete val # и аргументы
    next # затем переходим к следующей команде
}
END { # в конце
    pg_disconnect(conn) # отключаемся от базы
}

Ну а собственно для отправки письма используем команду

insert into email (subject, sender, recipient, body)
values ('subject', 'sender@mail.com', '{recipient1@mail.com,recipient2@mail.com}', 'body');
Tags:
Hubs:
Rating0
Comments7

Articles