Pull to refresh

PostgreSQL под капотом. Часть 4. Цикл бэкэнда

Reading time21 min
Views5.1K

Приветствую.

Продолжаем изучать работу PostgreSQL под капотом. Сегодня рассмотрим работу главного цикла бэкэнда. Продолжаем с места где остановились — прямо перед главным циклом. Файл входной точки располагается в src/backend/tcop/postgres.c

Обработчик исключений

Для обработки исключений используется setjmp

Перед началом цикла настраивается обработчик исключений и сохраняется точка стека вызовов для возврата при исключениях.

Обработка исключений работает следующим образом:

  • Сбрасываются флаги (например, DoingCommandRead)

  • Сбрасываются таймауты

  • Откатывается текущая транзакция

  • Библиотека libpq откатывается

  • Сбрасываются слоты репликации

  • Контекст памяти сбрасывается

  • Пользователю отправляется сообщение об ошибке

if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
    // Обработка исключения
}

// Сохранение стека для возврата
PG_exception_stack = &local_sigjmp_buf;

// ...

for (;;)
{
  // Главный цикл
}
setjmp

setjmp — стандартная C библиотека позволяющая делать нелокальные переходы по стеку. Что это значит? Это значит, что можно вернуться вверх по стеку к нужной функции, в отличие от goto, где переход возможен только в пределах текущей фукнции. Работу с ней можно описать следующим кодом:

jmp_buf *global_buf;

void doSomeStaff();

// Всегда будет возвращать 1
int main() {
  jmp_buf buf;
  if (setjmp(&buf) != 0) {
    printf("Вернулись обратно!")
    return 1;
  }

  global_buf = &buf;
  doSomeStaff();
  return 0;
}

void doSomeStaff() {
  printf("Внутри doSomeStaff");
  longjmp(global_buf, 1);
}

В начале создается переменная типа jmp_buf, хранящая контекст стека, для восстановления. Создаются локальная, для восстановления в случае неполадок, и глобальная, для доступа к адресу возврата остальных функций, переменные.

После вызывается setjmp, которому передается созданная переменная. Когда нужно будет к ней вернуться, вызывается longjmp, которому передается та же переменная и код, который будет возвращен setjmp.

Замечание: setjmp при первом вызове возвращает 0, а в остальных случаях, возвращаться будут коды переданные longjmp. Таким образом, мы можем вечно получать 0 из setjmp, если будем передавать код 0 в longjmp

С помощью setjmp/longjmp можно создать свою систему обработки исключений — SJLJ. Существуют и другие способы обработки исключений, например, SEH или DWARF. Их краткое описание можно найти здесь.

Как можно реализовать свой обработчик исключений можно посмотреть в этой статье

P. S. существуют sigsetjmp/siglongjmp функции. Они работают аналогично, но позволяют учитывать сигналы. Например, сохранять маску игнорирования. Именно этот вариант используется в Postgres, т.к. в нем много работы с сигналами.

Обработка исключений в Postgres

В Postgres есть своя инфраструктура для обработки ошибок. В src/include/utils/elog.h определяются макросы для try/catch/finally конструкций:

#define PG_TRY()  \
  do { \
      sigjmp_buf *_save_exception_stack = PG_exception_stack; \
      ErrorContextCallback *_save_context_stack = error_context_stack; \
      sigjmp_buf _local_sigjmp_buf; \
      bool _do_rethrow = false; \
      if (sigsetjmp(_local_sigjmp_buf, 0) == 0) \
      { \
          PG_exception_stack = &_local_sigjmp_buf

#define PG_CATCH()	\
  } \
  else \
  { \
      PG_exception_stack = _save_exception_stack; \
      error_context_stack = _save_context_stack

#define PG_FINALLY() \
  } \
  else \
      _do_rethrow = true; \
  { \
      PG_exception_stack = _save_exception_stack; \
      error_context_stack = _save_context_stack

#define PG_END_TRY()  \
    } \
      if (_do_rethrow) \
              PG_RE_THROW(); \
      PG_exception_stack = _save_exception_stack; \
      error_context_stack = _save_context_stack; \
  } while (0)

Для хранения информации о возврате из‑за исключений используется глобальная переменная PG_exception_stack (src/backend/utils/error/elog.c), которая инициализируется перед началом главного цикла.

Использовать try/catch/finally можно так

PG_TRY();
{
  // Ошибка
  if (!IsValidValue(1)) 
  {
    ereport(ERROR,
            (errcode(ERRCODE_SOME_ERRCODE),
            errmsg("value is not valid")));
  }
}
PG_CATCH();
{
    GlobalValue = old_value;
}
PG_END_TRY();

// ===================

PG_TRY();
{
  fwrite("Hello, world!", sizeof(char), 13, handle)
}
PG_FINALLY();
{
    fclose(handle);
}
PG_END_TRY();

Исключение эмулируется путем логирования с уровнем ERROR

Но одновременно нельзя использовать PG_CATCH и PG_FINALLY: после разворачивания макросов создается if/else конструкция, причем if — PG_TRY, а else — есть в PG_CATCH и в PG_FINALLY — при использовании catch/finally вместе получится if с 2 else.

После настройки обработчика исключений, мы входим в цикл.

Инициализация/сброс переменных

В начале каждой итерации настраиваются локальные переменные и окружение.

Перед началом принятия и обработки пакета инициализируются переменные:

  • MessageContext — контекст памяти при обработке входящего сообщения.

  • input_message — хранение строки входящего запроса.

StringInfo - хранение строк переменной длины

Для хранения строк переменной длины существует свой тип данных — StringInfo. Объявлен в src/include/lib/stringinfo.h

typedef struct StringInfoData
{
    // Буфер, хранящий строку
	char	   *data;
  
	// Длина строки
    int			len;
	
    // Максимально возможная длина строки, 
    // т.к. память была аллоцирована через palloc
    int			maxlen; 
	
    // Переменная для нужд других функций. 
    // Например, отслеживание текущий позиции считывания
    int			cursor;
} StringInfoData;

typedef StringInfoData *StringInfo;

Этот тип используется во всем коде проекта:

  • Буфер plain строки запроса.

  • Выставление названия приложения.

  • Выполнение запросов внутри приложения (например, для обновления материализованного приложения).

И в других местах

Например, при выполнении EXPLAIN результат записывается в переменную StringInfo (src/backend/commands/explain.c):

// 1584 строка
if (es->format == EXPLAIN_FORMAT_TEXT)
{
    appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d)",
                     plan->startup_cost, plan->total_cost,
                     plan->plan_rows, plan->plan_width);
}

Сигнализация клиенту о готовности принимать запрос

Пакет ReadyForQuery бэкэнд отправляет, когда хочет уведомить фронтэнд о готовности принимать запросы. Чтобы понимать, нужно ли отправлять этот пакет используется флаг send_ready_for_query.

После инициализации (на первом шаге) он будет выставлен, но затем может быть снят

В начале каждой итерации проверяется этот флаг. Если он установлен:

  1. Отправляется статистика сборщику статистики.

  2. Обновляется заголовок программы соответствующим образом.

  3. Инициализируются таймеры таймаутов.

  4. Отправляются измененные GUC настройки.

  5. Отправляется ReadyForQuery пакет.

  6. Флаг send_ready_for_query снимается.

Для первых 3 шагов поведение/параметры зависят от состояния транзакции:

  • Простой в транзакции

  • Прерванная транзакция

  • Остальные

Модуль xact - управление транзакциями

За управление транзакциями отвечает модуль xact — src/backend/access/transam/xact.c.

Состояние транзакции описывается структурой TransactionStateData

typedef struct TransactionStateData
{
	FullTransactionId fullTransactionId;	/* my FullTransactionId */
	SubTransactionId subTransactionId;	/* my subxact ID */
	char	   *name;			/* savepoint name, if any */
	int			savepointLevel; /* savepoint level */
	TransState	state;			/* low-level state */
	TBlockState blockState;		/* high-level state */
	int			nestingLevel;	/* transaction nesting depth */
	int			gucNestLevel;	/* GUC context nesting depth */
	MemoryContext curTransactionContext;	/* my xact-lifetime context */
	ResourceOwner curTransactionOwner;	/* my query resources */
	TransactionId *childXids;	/* subcommitted child XIDs, in XID order */
	int			nChildXids;		/* # of subcommitted child XIDs */
	int			maxChildXids;	/* allocated size of childXids[] */
	Oid			prevUser;		/* previous CurrentUserId setting */
	int			prevSecContext; /* previous SecurityRestrictionContext */
	bool		prevXactReadOnly;	/* entry-time xact r/o state */
	bool		startedInRecovery;	/* did we start in recovery? */
	bool		didLogXid;		/* has xid been included in WAL record? */
	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
	bool		chain;			/* start a new block after this one */
	bool		assigned;		/* assigned to top-level XID */
	struct TransactionStateData *parent;	/* back link to parent */
} TransactionStateData;

Верхнеуровневая транзакция хранится в переменной TopTransactionStateData, а текущая в CurrentTransactionState

/*
 * CurrentTransactionState always points to the current transaction state
 * block.  It will point to TopTransactionStateData when not in a
 * transaction at all, or when in a top-level transaction.
 */
static TransactionStateData TopTransactionStateData = {
	.state = TRANS_DEFAULT,
	.blockState = TBLOCK_DEFAULT,
	.assigned = false,
};

static TransactionState CurrentTransactionState = &TopTransactionStateData;

Состояние транзакции описывается 2 перечислениями:

  • TBlockState - общее описание состояние транзакции без деталей

/*
 *	transaction states - transaction state from server perspective
 */
typedef enum TransState
{
	TRANS_DEFAULT,				/* idle */
	TRANS_START,				/* transaction starting */
	TRANS_INPROGRESS,			/* inside a valid transaction */
	TRANS_COMMIT,				/* commit in progress */
	TRANS_ABORT,				/* abort in progress */
	TRANS_PREPARE				/* prepare in progress */
} TransState;
  • TransState - детализированное состояние транзакции

/*
 *	transaction block states - transaction state of client queries
 *
 * Note: the subtransaction states are used only for non-topmost
 * transactions; the others appear only in the topmost transaction.
 */
typedef enum TBlockState
{
	/* not-in-transaction-block states */
	TBLOCK_DEFAULT,				/* idle */
	TBLOCK_STARTED,				/* running single-query transaction */

	/* transaction block states */
	TBLOCK_BEGIN,				/* starting transaction block */
	TBLOCK_INPROGRESS,			/* live transaction */
	TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN */
	TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */
	TBLOCK_END,					/* COMMIT received */
	TBLOCK_ABORT,				/* failed xact, awaiting ROLLBACK */
	TBLOCK_ABORT_END,			/* failed xact, ROLLBACK received */
	TBLOCK_ABORT_PENDING,		/* live xact, ROLLBACK received */
	TBLOCK_PREPARE,				/* live xact, PREPARE received */

	/* subtransaction states */
	TBLOCK_SUBBEGIN,			/* starting a subtransaction */
	TBLOCK_SUBINPROGRESS,		/* live subtransaction */
	TBLOCK_SUBRELEASE,			/* RELEASE received */
	TBLOCK_SUBCOMMIT,			/* COMMIT received while TBLOCK_SUBINPROGRESS */
	TBLOCK_SUBABORT,			/* failed subxact, awaiting ROLLBACK */
	TBLOCK_SUBABORT_END,		/* failed subxact, ROLLBACK received */
	TBLOCK_SUBABORT_PENDING,	/* live subxact, ROLLBACK received */
	TBLOCK_SUBRESTART,			/* live subxact, ROLLBACK TO received */
	TBLOCK_SUBABORT_RESTART		/* failed subxact, ROLLBACK TO received */
} TBlockState;

Для управления транзакцией используются функции:

  • StartTransactionCommand — старт транзакции

  • CommitTransactionCommand — коммит транзакции

  • AbortCurrentTransaction — прервать транзакцию

При отправке клиенту ReadyForQuery пакета было сказано, что первые шаги выполняются в зависимости от состояния транзакции. Конкретно, использовались функции:

  • IsTransactionOrTransactionBlock — Находимся ли мы в транзакции

bool
IsTransactionOrTransactionBlock(void)
{
	TransactionState s = CurrentTransactionState;

	if (s->blockState == TBLOCK_DEFAULT)
		return false;

	return true;
}
  • IsAbortedTransactionBlockState — текущая транзакция прервана

bool
IsAbortedTransactionBlockState(void)
{
	TransactionState s = CurrentTransactionState;

	if (s->blockState == TBLOCK_ABORT ||
		s->blockState == TBLOCK_SUBABORT)
		return true;

	return false;
}

Сами первые 3 шага выглядят следующим образом:

if (IsAbortedTransactionBlockState())
{
    set_ps_display("idle in transaction (aborted)");
    pgstat_report_activity(STATE_IDLEINTRANSACTION_ABORTED, NULL);

    /* Start the idle-in-transaction timer */
    if (IdleInTransactionSessionTimeout > 0)
    {
        idle_in_transaction_timeout_enabled = true;
        enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
                             IdleInTransactionSessionTimeout);
    }
}
else if (IsTransactionOrTransactionBlock())
{
    set_ps_display("idle in transaction");
    pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);

    /* Start the idle-in-transaction timer */
    if (IdleInTransactionSessionTimeout > 0)
    {
        idle_in_transaction_timeout_enabled = true;
        enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
                             IdleInTransactionSessionTimeout);
    }
}
else
{
    /*
     * Process incoming notifies (including self-notifies), if
     * any, and send relevant messages to the client.  Doing it
     * here helps ensure stable behavior in tests: if any notifies
     * were received during the just-finished transaction, they'll
     * be seen by the client before ReadyForQuery is.
     */
    if (notifyInterruptPending)
        ProcessNotifyInterrupt(false);

    pgstat_report_stat(false);

    set_ps_display("idle");
    pgstat_report_activity(STATE_IDLE, NULL);

    /* Start the idle-session timer */
    if (IdleSessionTimeout > 0)
    {
        idle_session_timeout_enabled = true;
        enable_timeout_after(IDLE_SESSION_TIMEOUT,
                             IdleSessionTimeout);
    }
}

Чтение запроса

Так как во время чтения и парсинга пришедшего пакета могут прийти асинхронные сигналы. Чтобы учитывать текущее состояние принятия пакета при обработке сигналов, существует флаг DoingCommandRead. Перед началом получения пакета этот флаг выставляется, а после затирается.

Дальше начинается само чтение запроса.

Для получения запроса используется функция ReadCommand. При работе через сокеты внутри происходит делегирование выполнения функции SocketBackend.

Вначале читается первый байт. На его основании определяются максимальный размер буфера записи и тип режима работы: Simple Query или Extended Query режимы.

Когда чтение началось отменить запрос нельзя, поэтому процесс чтения текущего запроса оборачивается макросами HOLD_CANCEL_INTERRUPTS и RESUME_CANCEL_INTERRUPTS. Грубо говоря, они размечают область, при входе в которую не нужно выполнять команды на отмену запросов.

/* ----------------
 *	SocketBackend()		Is called for frontend-backend connections
 *
 *	Returns the message type code, and loads message body data into inBuf.
 *
 *	EOF is returned if the connection is lost.
 * ----------------
 */
static int
SocketBackend(StringInfo inBuf)
{
	int			qtype;
	int			maxmsglen;

	/*
	 * Get message type code from the frontend.
	 */
	HOLD_CANCEL_INTERRUPTS();
	pq_startmsgread();
	qtype = pq_getbyte();

	if (qtype == EOF)			/* frontend disconnected */
	{
		if (IsTransactionState())
			ereport(COMMERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("unexpected EOF on client connection with an open transaction")));
		else
		{
			/*
			 * Can't send DEBUG log messages to client at this point. Since
			 * we're disconnecting right away, we don't need to restore
			 * whereToSendOutput.
			 */
			whereToSendOutput = DestNone;
			ereport(DEBUG1,
					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
					 errmsg_internal("unexpected EOF on client connection")));
		}
		return qtype;
	}

	/*
	 * Validate message type code before trying to read body; if we have lost
	 * sync, better to say "command unknown" than to run out of memory because
	 * we used garbage as a length word.  We can also select a type-dependent
	 * limit on what a sane length word could be.  (The limit could be chosen
	 * more granularly, but it's not clear it's worth fussing over.)
	 *
	 * This also gives us a place to set the doing_extended_query_message flag
	 * as soon as possible.
	 */
	switch (qtype)
	{
		case 'Q':				/* simple query */
			maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
			doing_extended_query_message = false;
			break;

		case 'F':				/* fastpath function call */
			maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
			doing_extended_query_message = false;
			break;

		case 'X':				/* terminate */
			maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
			doing_extended_query_message = false;
			ignore_till_sync = false;
			break;

		case 'B':				/* bind */
		case 'P':				/* parse */
			maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
			doing_extended_query_message = true;
			break;

		case 'C':				/* close */
		case 'D':				/* describe */
		case 'E':				/* execute */
		case 'H':				/* flush */
			maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
			doing_extended_query_message = true;
			break;

		case 'S':				/* sync */
			maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
			/* stop any active skip-till-Sync */
			ignore_till_sync = false;
			/* mark not-extended, so that a new error doesn't begin skip */
			doing_extended_query_message = false;
			break;

		case 'd':				/* copy data */
			maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
			doing_extended_query_message = false;
			break;

		case 'c':				/* copy done */
		case 'f':				/* copy fail */
			maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
			doing_extended_query_message = false;
			break;

		default:

			/*
			 * Otherwise we got garbage from the frontend.  We treat this as
			 * fatal because we have probably lost message boundary sync, and
			 * there's no good way to recover.
			 */
			ereport(FATAL,
					(errcode(ERRCODE_PROTOCOL_VIOLATION),
					 errmsg("invalid frontend message type %d", qtype)));
			maxmsglen = 0;		/* keep compiler quiet */
			break;
	}

	/*
	 * In protocol version 3, all frontend messages have a length word next
	 * after the type code; we can read the message contents independently of
	 * the type.
	 */
	if (pq_getmessage(inBuf, maxmsglen))
		return EOF;				/* suitable message already logged */
	RESUME_CANCEL_INTERRUPTS();

	return qtype;
}
Получение запроса от интерактивного бэкэнда

В случае, если был запущен интерактивный бэкэнд, то строка запроса получается не из сокета, а от пользователя напрямую. В этом случае в функции ReadCommand вместо SocketBackend вызывается InteractiveBackend.

Внутри эта функция последовательно читает байты из STDIN и добавляет его в результирующую строку, пока не получит сегмент разделителя запросов (EOF, новая строка, двойная новая строка)

static int
InteractiveBackend(StringInfo inBuf)
{
	int			c;				/* character read from getc() */

	/*
	 * display a prompt and obtain input from the user
	 */
	printf("backend> ");
	fflush(stdout);

	resetStringInfo(inBuf);

	/*
	 * Read characters until EOF or the appropriate delimiter is seen.
	 */
	while ((c = interactive_getc()) != EOF)
	{
		if (c == '\n')
		{
			if (UseSemiNewlineNewline)
			{
				/*
				 * In -j mode, semicolon followed by two newlines ends the
				 * command; otherwise treat newline as regular character.
				 */
				if (inBuf->len > 1 &&
					inBuf->data[inBuf->len - 1] == '\n' &&
					inBuf->data[inBuf->len - 2] == ';')
				{
					/* might as well drop the second newline */
					break;
				}
			}
			else
			{
				/*
				 * In plain mode, newline ends the command unless preceded by
				 * backslash.
				 */
				if (inBuf->len > 0 &&
					inBuf->data[inBuf->len - 1] == '\\')
				{
					/* discard backslash from inBuf */
					inBuf->data[--inBuf->len] = '\0';
					/* discard newline too */
					continue;
				}
				else
				{
					/* keep the newline character, but end the command */
					appendStringInfoChar(inBuf, '\n');
					break;
				}
			}
		}

		/* Not newline, or newline treated as regular character */
		appendStringInfoChar(inBuf, (char) c);
	}

	/* No input before EOF signal means time to quit. */
	if (c == EOF && inBuf->len == 0)
		return EOF;

	/*
	 * otherwise we have a user query so process it.
	 */

	/* Add '\0' to make it look the same as message case. */
	appendStringInfoChar(inBuf, (char) '\0');

	/*
	 * if the query echo flag was given, print the query..
	 */
	if (EchoQuery)
		printf("statement: %s\n", inBuf->data);
	fflush(stdout);

	return 'Q';
}

После завершения принятия сообщения сбрасываются таймауты ожиданий:

  • IDLE_IN_TRANSACTION_SESSION_TIMEOUT — таймаут ожидания в транзакции;

  • IDLE_SESSION_TIMEOUT — таймаут ожидания в сессии.

Проверка внешних прерываний (сигналы)

Во время чтения команды от пользователя была заблокирована обработка сигналов. Теперь мы можем проверить наличие пришедших сигналов. За обработку сигналов отвечает функция ProcessInterrupts().

Проверка необходимости обработки сигналов

Для увеличения производительности делается предположение, что во время чтения пришедшего пакета не возникло никаких сигналов.

Для проверки существования ожидающих обработки сигналов используется макрос INTERRUPTS_PEDNDING_CONDITION (src/include/miscadmin.h)

/* Test whether an interrupt is pending */
#ifndef WIN32
#define INTERRUPTS_PENDING_CONDITION() \
	(unlikely(InterruptPending))
#else
#define INTERRUPTS_PENDING_CONDITION() \
	(unlikely(UNBLOCKED_SIGNAL_QUEUE()) ? pgwin32_dispatch_queued_signals() : 0, \
	 unlikely(InterruptPending))
#endif

Можно заметить, что условия помечены атрибутом unlikely. Он сигнализирует компилятору о том, что какое‑либо условие скорее всего не будет выполнено. Это позволяет компилятору генерироать более оптимизированный код.

unlikely — тоже макрос. Вместе с likely пределяется в src/include/c.h

/*
 * Hints to the compiler about the likelihood of a branch. Both likely() and
 * unlikely() return the boolean value of the contained expression.
 *
 * These should only be used sparingly, in very hot code paths. It's very easy
 * to mis-estimate likelihoods.
 */
#if __GNUC__ >= 3
#define likely(x)	__builtin_expect((x) != 0, 1)
#define unlikely(x) __builtin_expect((x) != 0, 0)
#else
#define likely(x)	((x) != 0)
#define unlikely(x) ((x) != 0)
#endif

В начале ProcessInterrupts имеется проверка на нахождение в особенных секциях.

  1. CritSectionCount — количество исполняемых критических секций. Например, во время построение Gist индекса, мы входим в такую критическую секцию (src/backend/access/gist/gistbuild.c)

IndexBuildResult *
gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
  // ...
  START_CRIT_SECTION();
  
  GISTInitBuffer(buffer, F_LEAF);
  
  MarkBufferDirty(buffer);
  PageSetLSN(page, GistBuildLSN);
  
  UnlockReleaseBuffer(buffer);
  
  END_CRIT_SECTION();
  // ...
}
  1. InterruptHoldoffCount — количество секций, в которых исполнение обработчиков нежелательно. Например, обработчик SIGALRM входит в такую секцию

static void
handle_sig_alarm(SIGNAL_ARGS)
{
	/*
	 * Bump the holdoff counter, to make sure nothing we call will process
	 * interrupts directly. No timeout handler should do that, but these
	 * failures are hard to debug, so better be sure.
	 */
	HOLD_INTERRUPTS();

	// ...

	RESUME_INTERRUPTS();
}

В начале ProcessInterrupts проверяется, что обе мы не находимся ни в одной из перечисленных секций.

Внутри функции проверяется, что нужно:

  • Завершить процесс

  • Проверить подключение к клиенту

  • Провести восстановление после конфликта

  • Отменить запрос

  • Обработать превышения таймаутов ожидания

  • Выполнить параллельные запросы

  • Вывести в лог описание текущего контекста памяти

Отдельно проверяются изменения GUC конфигурации. При их изменении, они отсылаются клиенту.

Обработка входного запроса

Когда пакет получен и все подготовки завершены, мы готовы обрабатывать запрос. На основании первого байта пакета определяются следующие шаги.

Клиент-серверный протокол

Для коммуникации фронтэнда и бэкэнда используется собственный клиент‑серверный протокол. Для каждой стороны определены сообщения, которые она может послать и которыми должна отвечать на сообщения другой стороны.

Например, на сообщение фронтэнда BIND, бэкэнд должен ответить BindComplete или ErrorResponse.

Формат сообщений имеет следующий вид:

  1. Первый байт — тип пакета. В документации описывается типом char. Например, для Bind первый байт — «B»

  2. 4 байтное число (int32) — размер сообщения, включая само число размера, т. е. 4 + кол‑во байтов полезной нагрузки

  3. Сама полезная нагрузка сообщения — для каждого сообщения своя: может быть простой строкой, а может быть комбинацией чисел и строк

Описание форматов всех сообщений описано в документации: https://www.postgresql.org/docs/15/protocol‑message‑formats.html

При обычной работе обычно используются 2 режима работы: Simple Query и Extended Query

Simple Query

Документация: https://www.postgresql.org/docs/current/protocol‑flow.html#id-1.10.6.7.4

Первый байт — «Q». Представляет из себя простую строку — без параметров или какой‑либо защиты. При ее обработке запрос представляется простой C‑строкой (вот здесь интерполяция строк и играет злую шутку).

Может содержать в себе несколько выражений, разделенных ;. Например, INSERT и SELECT могут передаться в одном сообщении.

Extended Query

Документация: https://www.postgresql.org/docs/current/protocol‑flow.html#PROTOCOL‑FLOW‑EXT‑QUERY

Этот режим можно описать конвейером сообщений:

  1. PARSE — подготовить запрос, заранее распарсив его и создав именованное (или безымянный, хранящийся только до следующего безымянного) выражение. В SQL может быть представлен через PREPARE выражение.

  1. BIND — создать портал (именованный или безымянный), подставив в него переданные аргументы запроса.

  1. EXECUTE — получить следующую порция результатов выполнения портала (следующие строки).

Больше о самом протоколе в документации: https://www.postgresql.org/docs/15/protocol.html

Simple Query

Сообщение Simple Query просто исполняется. Но если процесс — WALSender, то выполнение происходит только в случае, если сообщение репликационное.

case 'Q':			/* simple query */
  {
      const char *query_string;
  
      /* Set statement_timestamp() */
      SetCurrentStatementStartTimestamp();
  
      query_string = pq_getmsgstring(&input_message);
      pq_getmsgend(&input_message);
  
      if (am_walsender)
      {
          if (!exec_replication_command(query_string))
              exec_simple_query(query_string);
      }
      else
          exec_simple_query(query_string);
  
      send_ready_for_query = true;
  }
  break;
Идентификация репликационных сообщений

Для определения того, что пакет содержит запрос для синхронизации, используется функция объявленная в src/include/replication/walsender_private.h

extern void replication_scanner_init(const char *query_string);
extern void replication_scanner_finish(void);
extern bool replication_scanner_is_replication_command(void);

Но их реализации в исходном коде не найти. Она генерируется с помощью пары LEX (лексический анализатор) и YACC (парсер)

LEX генерирует токены из исходного кода, а YACC создает из пришедших токенов синтаксическое дерево. Файл с лексическим анализатором для репликации располагается в src/backend/replication/repl_scanner.l. Сама функция, используемая для идентификации репликационной команды:

/*
 * Check to see if the first token of a command is a WalSender keyword.
 *
 * To keep repl_scanner.l minimal, we don't ask it to know every construct
 * that the core lexer knows.  Therefore, we daren't lex more than the
 * first token of a general SQL command.  That will usually look like an
 * IDENT token here, although some other cases are possible.
 */
bool
replication_scanner_is_replication_command(void)
{
	int			first_token = replication_yylex();

	switch (first_token)
	{
		case K_IDENTIFY_SYSTEM:
		case K_BASE_BACKUP:
		case K_START_REPLICATION:
		case K_CREATE_REPLICATION_SLOT:
		case K_DROP_REPLICATION_SLOT:
		case K_TIMELINE_HISTORY:
		case K_SHOW:
			/* Yes; push back the first token so we can parse later. */
			repl_pushed_back_token = first_token;
			return true;
		default:
			/* Nope; we don't bother to push back the token. */
			return false;
	}
}

Parse

В случае подготовленного запроса нужно не только почитать строку запроса, но и получить все параметры. Так как типы в Postgres являются объектами, то и типы параметров передаются в виде идентификаторов объектов.

case 'P':			/* parse */
    {
        const char *stmt_name;
        const char *query_string;
        int			numParams;
        Oid		   *paramTypes = NULL;
    
        forbidden_in_wal_sender(firstchar);
    
        /* Set statement_timestamp() */
        SetCurrentStatementStartTimestamp();
    
        stmt_name = pq_getmsgstring(&input_message);
        query_string = pq_getmsgstring(&input_message);
        numParams = pq_getmsgint(&input_message, 2);
        if (numParams > 0)
        {
            paramTypes = (Oid *) palloc(numParams * sizeof(Oid));
            for (int i = 0; i < numParams; i++)
                paramTypes[i] = pq_getmsgint(&input_message, 4);
        }
        pq_getmsgend(&input_message);
    
        exec_parse_message(query_string, stmt_name,
                           paramTypes, numParams);
    }
    break;

Также можно заметить функцию forbidden_in_wal_sender. Эта функция инициирует ошибку, если WALSender получил недопустимую команду - он работает только в режиме Simple Query.

/*
 * Throw an error if we're a WAL sender process.
 *
 * This is used to forbid anything else than simple query protocol messages
 * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
 * message was received, and is used to construct the error message.
 */
static void
forbidden_in_wal_sender(char firstchar)
{
	if (am_walsender)
	{
		if (firstchar == 'F')
			ereport(ERROR,
					(errcode(ERRCODE_PROTOCOL_VIOLATION),
					 errmsg("fastpath function calls not supported in a replication connection")));
		else
			ereport(ERROR,
					(errcode(ERRCODE_PROTOCOL_VIOLATION),
					 errmsg("extended query protocol not supported in a replication connection")));
	}
}

Bind

Парсинг BIND пакета (как описывается в комментарии) довольно затратный. Поэтому практически вся логика вынесена в отдельную функцию.

case 'B':			/* bind */
    forbidden_in_wal_sender(firstchar);
  
    /* Set statement_timestamp() */
    SetCurrentStatementStartTimestamp();
  
    /*
     * this message is complex enough that it seems best to put
     * the field extraction out-of-line
     */
    exec_bind_message(&input_message);
    break;

Execute

Если на вход пришел EXECUTE пакет, то из него получаются название портала для выполнения и количество строк для возврата. Они передаются самой функции выполнения.

case 'E':			/* execute */
    {
        const char *portal_name;
        int			max_rows;
  
        forbidden_in_wal_sender(firstchar);
  
        /* Set statement_timestamp() */
        SetCurrentStatementStartTimestamp();
  
        portal_name = pq_getmsgstring(&input_message);
        max_rows = pq_getmsgint(&input_message, 4);
        pq_getmsgend(&input_message);
  
        exec_execute_message(portal_name, max_rows);
    }
    break;

Fastpath function call

В отличие от предыдущих пакетов, бизнес логика FASTPATH частично вынесена наружу. Именно: старт транзакции и переключение контекста памяти.

case 'F':			/* fastpath function call */
    forbidden_in_wal_sender(firstchar);
  
    /* Set statement_timestamp() */
    SetCurrentStatementStartTimestamp();
  
    /* Report query to various monitoring facilities. */
    pgstat_report_activity(STATE_FASTPATH, NULL);
    set_ps_display("<FASTPATH>");
  
    /* start an xact for this function invocation */
    start_xact_command();
  
    /*
     * Note: we may at this point be inside an aborted
     * transaction.  We can't throw error for that until we've
     * finished reading the function-call message, so
     * HandleFunctionRequest() must check for it after doing so.
     * Be careful not to do anything that assumes we're inside a
     * valid transaction here.
     */
  
    /* switch back to message context */
    MemoryContextSwitchTo(MessageContext);
  
    HandleFunctionRequest(&input_message);
  
    /* commit the function-invocation transaction */
    finish_xact_command();
  
    send_ready_for_query = true;
    break;

P.S. В комментарии сказано не добавлять код, предполагающий его исполнение в корректном состоянии (не прерванной транзакции), хотя этот комментарий нужно вынести из switch’а, т.к. подобное относится ко всей итерации цикла.

Close

В Расширенном запросе можно создавать порталы и именованные запросы. Они хранятся в памяти, поэтому их стоит закрывать. Для этого используется пакет CLOSE и одноименная SQL команда.  

В пакете передаются тип объекта для закрытия и его название. 

case 'C':			/* close */
    {
        int			close_type;
        const char *close_target;
  
        forbidden_in_wal_sender(firstchar);
  
        close_type = pq_getmsgbyte(&input_message);
        close_target = pq_getmsgstring(&input_message);
        pq_getmsgend(&input_message);
  
        switch (close_type)
        {
            case 'S':
                if (close_target[0] != '\0')
                    DropPreparedStatement(close_target, false);
                else
                {
                    /* special-case the unnamed statement */
                    drop_unnamed_stmt();
                }
                break;
            case 'P':
                {
                    Portal		portal;
  
                    portal = GetPortalByName(close_target);
                    if (PortalIsValid(portal))
                        PortalDrop(portal, false);
                }
                break;
            default:
                ereport(ERROR,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("invalid CLOSE message subtype %d",
                                close_type)));
                break;
        }
  
        if (whereToSendOutput == DestRemote)
            pq_putemptymessage('3');	/* CloseComplete */
    }
    break;

Describe

От бэкэнда можно получить описание порталов и именованных выражений:

  • Портал — описание возвращаемых данных (название, тип данных и т. д.).

  • Выражения — описание передаваемых параметров.

Так же как и в CLOSE, в DESCRIBE пакете передаются тип объекта для описания и его название

case 'D':			/* describe */
    {
        int			describe_type;
        const char *describe_target;
  
        forbidden_in_wal_sender(firstchar);
  
        /* Set statement_timestamp() (needed for xact) */
        SetCurrentStatementStartTimestamp();
  
        describe_type = pq_getmsgbyte(&input_message);
        describe_target = pq_getmsgstring(&input_message);
        pq_getmsgend(&input_message);
  
        switch (describe_type)
        {
            case 'S':
                exec_describe_statement_message(describe_target);
                break;
            case 'P':
                exec_describe_portal_message(describe_target);
                break;
            default:
                ereport(ERROR,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("invalid DESCRIBE message subtype %d",
                                describe_type)));
                break;
        }
    }
    break;

Flush

В процессе работы во внутреннем буфере могла накопиться информация. Для немедленного сброса буфера отправки, определена команда FLUSH

case 'H':			/* flush */
    pq_getmsgend(&input_message);
    if (whereToSendOutput == DestRemote)
        pq_flush();
    break;

Sync

SYNC пакет используется для коммита транзакции. Логика работы простая — просто вызывается функция коммита

case 'S':			/* sync */
    pq_getmsgend(&input_message);
    finish_xact_command();
    send_ready_for_query = true;
    break;

Terminate

Закрыть соединения можно 2 способами: оборвать соединение и послать пакет закрытия соединения. Оба этих случая учтены.

    /*
     * 'X' means that the frontend is closing down the socket. EOF
     * means unexpected loss of frontend connection. Either way,
     * perform normal shutdown.
     */
case EOF:

    /* for the statistics collector */
    pgStatSessionEndCause = DISCONNECT_CLIENT_EOF;

    /* FALLTHROUGH */

case 'X':

    /*
     * Reset whereToSendOutput to prevent ereport from attempting
     * to send any more messages to client.
     */
    if (whereToSendOutput == DestRemote)
        whereToSendOutput = DestNone;

    /*
     * NOTE: if you are tempted to add more code here, DON'T!
     * Whatever you had in mind to do should be set up as an
     * on_proc_exit or on_shmem_exit callback, instead. Otherwise
     * it will fail to be called during other backend-shutdown
     * scenarios.
     */
    proc_exit(0);

Дополнительно

Также в процессе разработки обнаружился случай, когда клиент продолжает посылать пакеты команд связанными с копированием. Такие пакеты просто игнорируются.

case 'd':			/* copy data */
case 'c':			/* copy done */
case 'f':			/* copy fail */

    /*
     * Accept but ignore these messages, per protocol spec; we
     * probably got here because a COPY failed, and the frontend
     * is still sending data.
     */
    break;

Также обрабатывается неизвестный тип пакета.

default:
    ereport(FATAL,
            (errcode(ERRCODE_PROTOCOL_VIOLATION),
             errmsg("invalid frontend message type %d",
                    firstchar)));

Конец

На этом итерация заканчивается и начинается новая (если не было исключений)

Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
Total votes 7: ↑7 and ↓0+7
Comments1

Articles