Привет, Хабр.
Сегодня разбираем COPY в PostgreSQL. Это рабочая лошадка для массовой загрузки и выгрузки данных.
Что делает COPY и чем он отличается от INSERT
COPY
переносит данные между таблицей и файлом или потоками STDIN/STDOUT. Вариант COPY FROM
загружает, COPY TO
выгружает. Умеет в форматы text
, csv
, binary
.
Поддерживает параметры ON_ERROR
, FREEZE
, HEADER
и HEADER MATCH
, FORCE_*
, ENCODING
, WHERE
, а также запуск внешних программ через PROGRAM
. Это раза в два быстрее любого батчевого INSERT
при равных условиях и заметно проще в эксплуатации.
Минимальные конструкции, которыми пользуемся:
-- выгрузка таблицы в CSV в поток
COPY public.events TO STDOUT WITH (FORMAT csv, HEADER true);
-- загрузка из CSV из потока
COPY public.events FROM STDIN WITH (FORMAT csv, HEADER true);
-- загрузка из файла на сервере (нужны права)
COPY public.events FROM '/var/lib/postgresql/import.csv' WITH (FORMAT csv, HEADER true);
-- через внешнюю программу (распаковка на лету)
COPY public.events FROM PROGRAM 'gzip -dc /data/import.csv.gz' WITH (FORMAT csv, HEADER true);
-- выгрузка результата запроса, не всей таблицы
COPY (
SELECT id, payload, created_at
FROM public.events
WHERE created_at >= DATE '2025-01-01'
) TO STDOUT WITH (FORMAT csv, HEADER true);
COPY против \copy в psql
Важно разделять два мира. COPY ... FROM/TO 'filename'
и COPY ... PROGRAM
работают на стороне сервера и требуют специальных ролей. Файлы должны быть доступны именно серверу. Метакоманда \copy
из psql
это обёртка вокруг COPY ... FROM STDIN/TO STDOUT
, и файлы читаются/пишутся на стороне клиента. Если нет серверных прав или файл лежит у вас локально, то используем уже \copy
. Если нужно максимальное быстродействие и файлы уже на сервере,то тут уже нужен серверный COPY
.
Пример для psql
:
-- клиентская выгрузка в сжатый файл
\copy (SELECT * FROM public.events) TO PROGRAM 'gzip > /tmp/events.csv.gz' WITH (FORMAT csv, HEADER true)
-- клиентская загрузка
\copy public.events FROM '/home/user/import.csv' WITH (FORMAT csv, HEADER true)
Обработка ошибок при загрузке
По дефолту COPY FROM
завершится на первой проблемной строке. В ситуациях «загружаем всё, что валидно, остальное в карантин» помогает ON_ERROR ignore
:
COPY public.events FROM STDIN
WITH (
FORMAT csv,
HEADER true,
ON_ERROR ignore,
LOG_VERBOSITY verbose
);
При ignore
все некорректные строки будут пропущены, в конце придёт NOTICE
с количеством пропусков. С LOG_VERBOSITY verbose
сервер дополнительно пишет, какая строка и какая колонка упали на конверсии. Это удобно, но не превращаем это в норму на постоянной основе: для чистых пайплайнов лучше staging-таблицы с текстовыми колонками, грузить туда, а затем явной валидацией и приведение типов вносить в целевую структуру.
Пример staging-потока:
-- сырой слой
CREATE UNLOGGED TABLE staging.events_raw (
id_text text,
payload_text text,
created_at_text text
);
-- без ограничений, чтобы грузить максимально быстро
COPY staging.events_raw FROM STDIN WITH (FORMAT csv, HEADER true);
-- чистовой слой
INSERT INTO public.events (id, payload, created_at)
SELECT
id_text::bigint,
payload_text::jsonb,
created_at_text::timestamptz
FROM staging.events_raw
WHERE id_text ~ '^\d+$'
AND created_at_text IS NOT NULL
ON CONFLICT (id) DO UPDATE
SET payload = EXCLUDED.payload,
created_at = EXCLUDED.created_at;
Где смотреть прогресс
Во время работы COPY
сервер публикует состояние в pg_stat_progress_copy
.
SELECT pid, datname, relid::regclass AS relation, command, type, bytes_processed, bytes_total,
tuples_processed, tuples_excluded, error_count
FROM pg_stat_progress_copy;
tuples_excluded
растёт, если используется WHERE
и строки отбрасываются. error_count
полезен при ON_ERROR ignore
.
Форматы: когда какой
Три режима покрывают почти все случаи.
Текстовый формат даёт табуляцию как разделитель и \N
как NULL. Он есть, но редко нужен: CSV удобнее и предсказуемее, а бинарный быстрее.
CSV — рабочий стандарт для интеграций. Пара нюансов:
HEADER true
добавляет заголовок на выгрузке и пропускает первую строку на загрузке. Вход можно усилитьHEADER MATCH
, тогда заголовок обязан совпасть с реальными именами колонок и порядком.Пустая строка и NULL различаются. Пустая строка это
""
, NULL это пусто без кавычек, либо строка из параметраNULL '...'
.Для явного управления кавычками и экранированием используем
QUOTE
,ESCAPE
, а для принудительного заключения некоторых колонокFORCE_QUOTE (col1, col2)
.
Бинарный формат выгоден для потока Postgres → Postgres одной версии, например при переносе больших объёмов между кластерами в одной инфраструктуре. Версии должны совпадать, типы колонок совместимы, данные не перемещаем между архитектурами, где отличается порядок байтов.
Примеры CSV-выгрузки и загрузки с тонкостями:
-- выгружаем с принудительными кавычками для текстовых колонок
COPY public.users (id, email, country)
TO STDOUT WITH (FORMAT csv, HEADER true, FORCE_QUOTE (email, country));
-- грузим, требуя строгого совпадения заголовка
COPY public.users (id, email, country)
FROM STDIN WITH (FORMAT csv, HEADER match);
FREEZE: когда да, когда нет
COPY FROM ... WITH (FREEZE true)
замораживает строки сразу, как после VACUUM FREEZE
. Это ускоряет первичную загрузку новой или только что очищенной таблицы и уменьшает давление на автovacuum. Условия там строгие: таблица должна быть создана или опустошена в текущей транзакции, без открытых курсоров и конкурирующих снимков. На секционированных таблицах FREEZE
сейчас не применяется.
Сценарий:
BEGIN;
CREATE TABLE public.import_users (LIKE public.users INCLUDING ALL);
COPY public.import_users FROM PROGRAM 'gzip -dc /data/users.csv.gz'
WITH (FORMAT csv, HEADER true, FREEZE true);
ANALYZE public.import_users;
COMMIT;
WHERE прямо в COPY
На загрузке можно отбрасывать строки по условию.
COPY public.events (id, payload, created_at) FROM STDIN
WITH (FORMAT csv, HEADER true)
WHERE id IS NOT NULL AND created_at >= DATE '2025-01-01';
Количество исключённых строк будет видно в pg_stat_progress_copy
.
Безопасность
Если используете COPY ... FROM/TO 'filename'
или PROGRAM
, нужны привилегии суперпользователя или членство в ролях pg_read_server_files
, pg_write_server_files
, pg_execute_server_program
.
Это осознанное ограничение: сервер получает доступ к файловой системе и shell. В PROGRAM
команда запускается через оболочку, поэтому недоверенный ввод встраивать нельзя. Для таких сценариев держите фиксированные строки, белые списки и внимательное экранирование.
Ещё два момента. Для COPY TO
путь обязан быть абсолютным. Для COPY FROM
это рекомендация, но лучше не полагаться на рабочую директорию кластера. И не забываем про права на таблицы: для COPY TO
нужно право SELECT
на колонки, для COPY FROM
право INSERT
.
Кодировки и форматы дат
Если файл не в кодировке клиента, задаем ENCODING '...'
в COPY
. Для переносимой выгрузки я перед выгрузкой ставим ISO-формат дат:
SET DateStyle TO ISO, YMD;
COPY (SELECT * FROM public.events WHERE created_at >= DATE '2025-01-01')
TO STDOUT WITH (FORMAT csv, HEADER true, ENCODING 'UTF8');
На CSV все символы значимы, включая пробелы. Если источник дополняет строки пробелами до фиксированной ширины, чистим файл до загрузки. В CSV значение может включать переносы строк, если оно в кавычках.
COPY из кода: Python, Go, Rust
Python, psycopg3
Нормальная потоковая работа делается через cursor.copy(...)
. COPY не поддерживает параметры, поэтому SQL-строку формируем сами из белого списка идентификаторов.
import psycopg
from psycopg.rows import dict_row
def copy_csv_to_table(dsn: str, table: str, csv_path: str):
if table not in {"public_events", "public_users"}:
raise ValueError("table not allowed")
copy_sql = f"COPY {table} FROM STDIN WITH (FORMAT csv, HEADER true)"
with psycopg.connect(dsn, autocommit=False) as conn:
with conn.cursor(row_factory=dict_row) as cur, open(csv_path, "rb") as f:
with cur.copy(copy_sql) as cp:
while chunk := f.read(256 * 1024):
cp.write(chunk)
conn.commit()
def copy_table_to_csv(dsn: str, table: str, out_path: str):
if table not in {"public_events", "public_users"}:
raise ValueError("table not allowed")
copy_sql = f"COPY {table} TO STDOUT WITH (FORMAT csv, HEADER true)"
with psycopg.connect(dsn, autocommit=True) as conn:
with conn.cursor() as cur, open(out_path, "wb") as f:
with cur.copy(copy_sql) as cp:
for data in cp:
f.write(data)
def copy_between(dsn_src: str, dsn_dst: str, src_query: str, dst_table: str):
# перенос Postgres → Postgres в бинарном формате при совпадающих версиях
sql_out = f"COPY ({src_query}) TO STDOUT WITH (FORMAT binary)"
sql_in = f"COPY {dst_table} FROM STDIN WITH (FORMAT binary)"
with psycopg.connect(dsn_src) as s, psycopg.connect(dsn_dst) as d:
with s.cursor() as cs, d.cursor() as cd:
with cs.copy(sql_out) as cp_out, cd.copy(sql_in) as cp_in:
for data in cp_out:
cp_in.write(data)
d.commit()
Замечания по безопасности: никаких пользовательских фрагментов в строке COPY
, только whitelisting таблиц и колонок. Пакуем данные крупными блоками, на маленьких кусках будет лишняя системная возня.
Go, pgx
В pgx
есть CopyFrom
, который забирает источник строк и шлёт их по протоколу COPY. Это удобнее, чем самому форматировать CSV, и быстрее, чем INSERT
.
package bulk
import (
"context"
"encoding/csv"
"io"
"strconv"
"github.com/jackc/pgx/v5"
)
func CopyRows(ctx context.Context, conn *pgx.Conn, table string, columns []string, rows [][]any) (int64, error) {
// columns вроде []string{"id", "payload", "created_at"}
return conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(rows))
}
func CopyCSV(ctx context.Context, conn *pgx.Conn, table string, columns []string, r io.Reader) (int64, error) {
dec := csv.NewReader(r)
// пропускаем заголовок
if _, err := dec.Read(); err != nil {
return 0, err
}
const batch = 5000
buf := make([][]any, 0, batch)
var total int64
flush := func() error {
if len(buf) == 0 {
return nil
}
n, err := conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(buf))
total += n
buf = buf[:0]
return err
}
for {
rec, err := dec.Read()
if err == io.EOF {
break
}
if err != nil {
return total, err
}
id, _ := strconv.ParseInt(rec[0], 10, 64)
row := []any{id, rec[1], rec[2]} // пример
buf = append(buf, row)
if len(buf) >= batch {
if err := flush(); err != nil {
return total, err
}
}
}
if err := flush(); err != nil {
return total, err
}
return total, nil
}
Здесь не генерируем CSV сами. pgx
упакует всё в протокол COPY
.
Rust, tokio-postgres
У клиента есть copy_out
и copy_in
. Работает потоками, без промежуточных файлов.
use tokio_postgres::{NoTls, Error};
use futures_util::{StreamExt, SinkExt};
use bytes::Bytes;
pub async fn copy_out_csv(conn_str: &str, table: &str) -> Result<Vec<u8>, Error> {
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
tokio::spawn(async move { let _ = connection.await; });
let sql = format!("COPY {} TO STDOUT WITH (FORMAT csv, HEADER true)", table);
let mut stream = client.copy_out(&*sql).await?;
let mut out = Vec::new();
while let Some(chunk) = stream.next().await {
out.extend_from_slice(&chunk?);
}
Ok(out)
}
pub async fn copy_in_csv(conn_str: &str, table: &str, data: &[u8]) -> Result<(), Error> {
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
tokio::spawn(async move { let _ = connection.await; });
let sql = format!("COPY {} FROM STDIN WITH (FORMAT csv, HEADER true)", table);
let mut sink = client.copy_in(&*sql).await?;
sink.send(Bytes::from(data.to_vec())).await?;
sink.close().await?;
Ok(())
}
COPY
закрывает большинство задач массовых загрузок и выгрузок в Postgres. Если использовать его с оглядкой на права, форматы, кодировки и план обслуживания таблицы, получаем стабильные и быстрые пайплайны. В спорных местах проще выдержать staging, в горячих работать потоками без диска, в инфраструктурных переносах просто использовать бинарный формат при совпадающих версиях.
Приглашаем вас пройти вступительное тестирование по теме «Базы данных». Тестирование поможет объективно оценить текущий уровень знаний и навыков, необходимых для эффективного усвоения материала курса.
Курс «Базы данных» подробно рассматривает современные подходы к работе с системами управления базами данных, включая эффективные методы загрузки и выгрузки данных, управление таблицами и транзакциями, а также практическое применение SQL. Приглашаем вас присоединиться к обучению, чтобы получить системные знания и навыки в этой области.