Как стать автором
Обновить

Рецепты PostgreSQL: асинхронные уведомления в… реплике!?

Время на прочтение16 мин
Количество просмотров1.7K

Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:

Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:

LISTEN, NOTIFY

Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:

src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,3 +1,5 @@
+#include <include.h>
+
 /*-------------------------------------------------------------------------
  *
  * async.c
@@ -46,7 +48,7 @@
  *	  to. In case there is a match it delivers the notification event to its
  *	  frontend.  Non-matching events are simply skipped.
  *
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
+ * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in
  *	  a backend-local list which will not be processed until transaction end.
  *
  *	  Duplicate notifications from the same transaction are sent out as one
@@ -56,7 +58,7 @@
  *	  that has been sent, it can easily add some unique string into the extra
  *	  payload parameter.
  *
- *	  When the transaction is ready to commit, PreCommit_Notify() adds the
+ *	  When the transaction is ready to commit, PreCommit_Notify_My() adds the
  *	  pending notifications to the head of the queue. The head pointer of the
  *	  queue always points to the next free position and a position is just a
  *	  page number and the offset in that page. This is done before marking the
@@ -67,7 +69,7 @@
  *	  Once we have put all of the notifications into the queue, we return to
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
- *	  After commit we are called another time (AtCommit_Notify()). Here we
+ *	  After commit we are called another time (AtCommit_Notify_My()). Here we
  *	  make the actual updates to the effective listen state (listenChannels).
  *
  *	  Finally, after we are out of the transaction altogether, we check if
@@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry
 {
 	int			length;			/* total allocated length of entry */
 	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
+//	TransactionId xid;			/* sender's XID */
 	int32		srcPid;			/* sender's PID */
 	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
 } AsyncQueueEntry;
@@ -414,14 +416,16 @@ typedef struct NotificationHash
 
 static NotificationList *pendingNotifies = NULL;
 
+static pqsigfunc pg_async_signal_original = NULL;
+
 /*
- * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * Inbound notifications are initially processed by HandleNotifyInterruptMy(),
  * called from inside a signal handler. That just sets the
  * notifyInterruptPending flag and sets the process
- * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to
  * actually deal with the interrupt.
  */
-volatile sig_atomic_t notifyInterruptPending = false;
+//volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
@@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;
 static bool backendTryAdvanceTail = false;
 
 /* GUC parameter */
-bool		Trace_notify = false;
+//bool		Trace_notify = false;
 
 /* local function prototypes */
 static int	asyncQueuePageDiff(int p, int q);
@@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+static void pg_async_signal(SIGNAL_ARGS) {
+    HandleNotifyInterruptMy();
+    if (notifyInterruptPending) ProcessNotifyInterruptMy();
+    pg_async_signal_original(postgres_signal_arg);
+}
+
 /*
  * Compute the difference between two queue page numbers (i.e., p - q),
  * accounting for wraparound.
@@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)
  * Report space needed for our shared memory area
  */
 Size
-AsyncShmemSize(void)
+AsyncShmemSizeMy(void)
 {
 	Size		size;
 
-	/* This had better match AsyncShmemInit */
+	/* This had better match AsyncShmemInitMy */
 	size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
 	size = add_size(size, offsetof(AsyncQueueControl, backend));
 
@@ -526,7 +536,7 @@ AsyncShmemSize(void)
  * Initialize our shared memory area
  */
 void
-AsyncShmemInit(void)
+AsyncShmemInitMy(void)
 {
 	bool		found;
 	Size		size;
@@ -585,7 +595,7 @@ AsyncShmemInit(void)
  *	  SQL function to send a notification event
  */
 Datum
-pg_notify(PG_FUNCTION_ARGS)
+pg_notify_my(PG_FUNCTION_ARGS)
 {
 	const char *channel;
 	const char *payload;
@@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)
 		payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
 
 	/* For NOTIFY as a statement, this is checked in ProcessUtility */
-	PreventCommandDuringRecovery("NOTIFY");
+//	PreventCommandDuringRecovery("NOTIFY");
 
-	Async_Notify(channel, payload);
+	Async_Notify_My(channel, payload);
 
 	PG_RETURN_VOID();
 }
 
 
 /*
- * Async_Notify
+ * Async_Notify_My
  *
  *		This is executed by the SQL notify command.
  *
@@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)
  *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  */
 void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify_My(const char *channel, const char *payload)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 	size_t		channel_len;
@@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)
 		elog(ERROR, "cannot send notifications from a parallel worker");
 
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Notify(%s)", channel);
+		elog(DEBUG1, "Async_Notify_My(%s)", channel);
 
 	channel_len = channel ? strlen(channel) : 0;
 	payload_len = payload ? strlen(payload) : 0;
@@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)
 		/*
 		 * First notify event in current (sub)xact. Note that we allocate the
 		 * NotificationList in TopTransactionContext; the nestingLevel might
-		 * get changed later by AtSubCommit_Notify.
+		 * get changed later by AtSubCommit_Notify_My.
 		 */
 		notifies = (NotificationList *)
 			MemoryContextAlloc(TopTransactionContext,
@@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)
 	int			my_level = GetCurrentTransactionNestLevel();
 
 	/*
-	 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
+	 * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would
 	 * be too complicated to ensure we get the right interactions of
 	 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
 	 * would be any performance benefit anyway in sane applications.
@@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)
 		/*
 		 * First action in current sub(xact). Note that we allocate the
 		 * ActionList in TopTransactionContext; the nestingLevel might get
-		 * changed later by AtSubCommit_Notify.
+		 * changed later by AtSubCommit_Notify_My.
 		 */
 		actions = (ActionList *)
 			MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
@@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)
 }
 
 /*
- * Async_Listen
+ * Async_Listen_My
  *
  *		This is executed by the SQL listen command.
  */
 void
-Async_Listen(const char *channel)
+Async_Listen_My(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);
 
 	queue_listen(LISTEN_LISTEN, channel);
 }
 
 /*
- * Async_Unlisten
+ * Async_Unlisten_My
  *
  *		This is executed by the SQL unlisten command.
  */
 void
-Async_Unlisten(const char *channel)
+Async_Unlisten_My(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NULL && !unlistenExitRegistered)
@@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)
 }
 
 /*
- * Async_UnlistenAll
+ * Async_UnlistenAll_My
  *
  *		This is invoked by UNLISTEN * command, and also at backend exit.
  */
 void
-Async_UnlistenAll(void)
+Async_UnlistenAll_My(void)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+		elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NULL && !unlistenExitRegistered)
@@ -818,7 +828,7 @@ Async_UnlistenAll(void)
  * change within a transaction.
  */
 Datum
-pg_listening_channels(PG_FUNCTION_ARGS)
+pg_listening_channels_my(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
 
@@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)
 }
 
 /*
- * AtPrepare_Notify
+ * AtPrepare_Notify_My
  *
  *		This is called at the prepare phase of a two-phase
  *		transaction.  Save the state for possible commit later.
  */
 void
-AtPrepare_Notify(void)
+AtPrepare_Notify_My(void)
 {
 	/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
 	if (pendingActions || pendingNotifies)
@@ -874,7 +884,7 @@ AtPrepare_Notify(void)
 }
 
 /*
- * PreCommit_Notify
+ * PreCommit_Notify_My
  *
  *		This is called at transaction commit, before actually committing to
  *		clog.
@@ -889,7 +899,7 @@ AtPrepare_Notify(void)
  *		we can still throw error if we run out of queue space.
  */
 void
-PreCommit_Notify(void)
+PreCommit_Notify_My(void)
 {
 	ListCell   *p;
 
@@ -897,7 +907,7 @@ PreCommit_Notify(void)
 		return;					/* no relevant statements in this xact */
 
 	if (Trace_notify)
-		elog(DEBUG1, "PreCommit_Notify");
+		elog(DEBUG1, "PreCommit_Notify_My");
 
 	/* Preflight for any pending listen/unlisten actions */
 	if (pendingActions != NULL)
@@ -932,7 +942,7 @@ PreCommit_Notify(void)
 		 * so cheap if we don't, and we'd prefer not to do that work while
 		 * holding NotifyQueueLock.
 		 */
-		(void) GetCurrentTransactionId();
+//		(void) GetCurrentTransactionId();
 
 		/*
 		 * Serialize writers by acquiring a special lock that we hold till
@@ -951,7 +961,7 @@ PreCommit_Notify(void)
 		 * used by the flatfiles mechanism.)
 		 */
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+						 RowExclusiveLock);
 
 		/* Now push the notifications into the queue */
 		backendHasSentNotifications = true;
@@ -984,14 +994,14 @@ PreCommit_Notify(void)
 }
 
 /*
- * AtCommit_Notify
+ * AtCommit_Notify_My
  *
  *		This is called at transaction commit, after committing to clog.
  *
  *		Update listenChannels and clear transaction-local state.
  */
 void
-AtCommit_Notify(void)
+AtCommit_Notify_My(void)
 {
 	ListCell   *p;
 
@@ -1003,7 +1013,7 @@ AtCommit_Notify(void)
 		return;
 
 	if (Trace_notify)
-		elog(DEBUG1, "AtCommit_Notify");
+		elog(DEBUG1, "AtCommit_Notify_My");
 
 	/* Perform any pending listen/unlisten actions */
 	if (pendingActions != NULL)
@@ -1036,7 +1046,7 @@ AtCommit_Notify(void)
 }
 
 /*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My
  *
  * This function must make sure we are ready to catch any incoming messages.
  */
@@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)
 }
 
 /*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify_My
  *
  * Add the channel to the list of channels we are listening on.
  */
@@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);
 }
 
 /*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My
  *
  * Remove the specified channel name from listenChannels.
  */
@@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
 	 */
+
+	if (!list_length(listenChannels) && pg_async_signal_original) {
+		pqsignal(SIGUSR1, pg_async_signal_original);
+		pg_async_signal_original = NULL;
+	}
 }
 
 /*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My
  *
  *		Unlisten on all channels for this backend.
  */
@@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)
 
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
+
+	if (pg_async_signal_original) {
+		pqsignal(SIGUSR1, pg_async_signal_original);
+		pg_async_signal_original = NULL;
+	}
 }
 
 /*
- * ProcessCompletedNotifies --- send out signals and self-notifies
+ * ProcessCompletedNotifiesMy --- send out signals and self-notifies
  *
  * This is called from postgres.c just before going idle at the completion
  * of a transaction.  If we issued any notifications in the just-completed
@@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)
  * Also, if we filled enough queue pages with new notifies, try to advance
  * the queue tail pointer.
  *
- * The reason that this is not done in AtCommit_Notify is that there is
+ * The reason that this is not done in AtCommit_Notify_My is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
  * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
+ * AtCommit_Notify_My would be a PANIC condition.  The timing is also arranged
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
@@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)
  * NOTE: we are outside of any transaction here.
  */
 void
-ProcessCompletedNotifies(void)
+ProcessCompletedNotifiesMy(void)
 {
+	bool idle = !IsTransactionOrTransactionBlock();
 	MemoryContext caller_context;
 
 	/* Nothing to do if we didn't send any notifications */
@@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)
 	caller_context = CurrentMemoryContext;
 
 	if (Trace_notify)
-		elog(DEBUG1, "ProcessCompletedNotifies");
+		elog(DEBUG1, "ProcessCompletedNotifiesMy");
 
 	/*
 	 * We must run asyncQueueReadAllNotifications inside a transaction, else
 	 * bad things happen if it gets an error.
 	 */
+	if (idle)
 	StartTransactionCommand();
 
 	/* Send signals to other backends */
@@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)
 		asyncQueueAdvanceTail();
 	}
 
+	if (idle)
 	CommitTransactionCommand();
 
 	MemoryContextSwitchTo(caller_context);
@@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
 	entryLength = QUEUEALIGN(entryLength);
 	qe->length = entryLength;
 	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
+//	qe->xid = GetCurrentTransactionId();
 	qe->srcPid = MyProcPid;
 	memcpy(qe->data, n->data, channellen + payloadlen + 2);
 }
@@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
  * occupied.
  */
 Datum
-pg_notification_queue_usage(PG_FUNCTION_ARGS)
+pg_notification_queue_usage_my(PG_FUNCTION_ARGS)
 {
 	double		usage;
 
@@ -1749,7 +1774,7 @@ SignalBackends(void)
 }
 
 /*
- * AtAbort_Notify
+ * AtAbort_Notify_My
  *
  *	This is called at transaction abort.
  *
@@ -1757,10 +1782,10 @@ SignalBackends(void)
  *	executed if the transaction got committed.
  */
 void
-AtAbort_Notify(void)
+AtAbort_Notify_My(void)
 {
 	/*
-	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+	 * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,
 	 * we have registered as a listener but have not made any entry in
 	 * listenChannels.  In that case, deregister again.
 	 */
@@ -1772,12 +1797,12 @@ AtAbort_Notify(void)
 }
 
 /*
- * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ * AtSubCommit_Notify_My() --- Take care of subtransaction commit.
  *
  * Reassign all items in the pending lists to the parent transaction.
  */
 void
-AtSubCommit_Notify(void)
+AtSubCommit_Notify_My(void)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 
@@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)
 }
 
 /*
- * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ * AtSubAbort_Notify_My() --- Take care of subtransaction abort.
  */
 void
-AtSubAbort_Notify(void)
+AtSubAbort_Notify_My(void)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 
@@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)
 }
 
 /*
- * HandleNotifyInterrupt
+ * HandleNotifyInterruptMy
  *
  *		Signal handler portion of interrupt handling. Let the backend know
  *		that there's a pending notify interrupt. If we're currently reading
  *		from the client, this will interrupt the read and
- *		ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
+ *		ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().
  */
 void
-HandleNotifyInterrupt(void)
+HandleNotifyInterruptMy(void)
 {
 	/*
 	 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
@@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)
 }
 
 /*
- * ProcessNotifyInterrupt
+ * ProcessNotifyInterruptMy
  *
  *		This is called if we see notifyInterruptPending set, just before
  *		transmitting ReadyForQuery at the end of a frontend command, and
  *		also if a notify signal occurs while reading from the frontend.
- *		HandleNotifyInterrupt() will cause the read to be interrupted
+ *		HandleNotifyInterruptMy() will cause the read to be interrupted
  *		via the process's latch, and this routine will get called.
  *		If we are truly idle (ie, *not* inside a transaction block),
  *		process the incoming notifies.
  */
 void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterruptMy(void)
 {
 	if (IsTransactionOrTransactionBlock())
 		return;					/* not really idle */
@@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)
 	 * before we see them.
 	 *----------
 	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
+//	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)
 	PG_END_TRY();
 
 	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
+//	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		/* Ignore messages destined for other databases */
 		if (qe->dboid == MyDatabaseId)
 		{
+#if 0
 			if (XidInMVCCSnapshot(qe->xid, snapshot))
 			{
 				/*
@@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 			}
 			else if (TransactionIdDidCommit(qe->xid))
 			{
+#endif
 				/* qe->data is the null-terminated channel name */
 				char	   *channel = qe->data;
 
@@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					/* payload follows channel name */
 					char	   *payload = qe->data + strlen(channel) + 1;
 
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+					NotifyMyFrontEndMy(channel, payload, qe->srcPid);
 				}
+#if 0
 			}
 			else
 			{
@@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 * ignore its notifications.
 				 */
 			}
+#endif
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2271,6 +2300,7 @@ static void
 ProcessIncomingNotify(void)
 {
 	/* We *must* reset the flag */
+	bool idle = !IsTransactionOrTransactionBlock();
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
@@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)
 	 * We must run asyncQueueReadAllNotifications inside a transaction, else
 	 * bad things happen if it gets an error.
 	 */
+	if (idle)
 	StartTransactionCommand();
 
 	asyncQueueReadAllNotifications();
 
+	if (idle)
 	CommitTransactionCommand();
 
 	/*
@@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)
  * Send NOTIFY message to my front end.
  */
 void
-NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
+NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)
 {
 	if (whereToSendOutput == DestRemote)
 	{

Теперь делаем расширение для функций

pg_async--1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit

CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C;
CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C;
CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C;
CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C;
CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C;
CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;

Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.

Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:

pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)

EXTENSION(pg_async_listen) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);
    PG_RETURN_VOID();
}

EXTENSION(pg_async_listening_channels) {
    return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo);
}

EXTENSION(pg_async_notification_queue_usage) {
    return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo);
}

EXTENSION(pg_async_notify) {
    return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo);
}

EXTENSION(pg_async_unlisten_all) {
    !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();
    PG_RETURN_VOID();
}

EXTENSION(pg_async_unlisten) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);
    PG_RETURN_VOID();
}

Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:

pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;

void _PG_init(void); void _PG_init(void) {
    if (!process_shared_preload_libraries_in_progress) return;
    pg_async_ProcessUtility_hook_original = ProcessUtility_hook;
    ProcessUtility_hook = pg_async_ProcessUtility_hook;
    pg_async_shmem_startup_hook_original = shmem_startup_hook;
    shmem_startup_hook = pg_async_shmem_startup_hook;
    RequestAddinShmemSpace(AsyncShmemSizeMy());
    RegisterSubXactCallback(pg_async_SubXactCallback, NULL);
    RegisterXactCallback(pg_async_XactCallback, NULL);
}

void _PG_fini(void); void _PG_fini(void) {
    ProcessUtility_hook = pg_async_ProcessUtility_hook_original;
    shmem_startup_hook = pg_async_shmem_startup_hook_original;
    UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);
    UnregisterXactCallback(pg_async_XactCallback, NULL);
}

В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:

pg_async.c
static void pg_async_shmem_startup_hook(void) {
    if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();
    LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
    AsyncShmemInitMy();
    LWLockRelease(AddinShmemInitLock);
}

В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:

pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {
    if (!XactReadOnly) return;
    switch (event) {
        case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;
        case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;
        case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;
        case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;
        default: break;
    }
}

В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:

pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {
    if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname)));
}

static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {
    Node *parsetree = pstmt->utilityStmt;
    if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    check_stack_depth();
    switch (nodeTag(parsetree)) {
        case T_ListenStmt: {
            ListenStmt *stmt = (ListenStmt *)parsetree;
            CheckRestrictedOperation("LISTEN");
            Async_Listen_My(stmt->conditionname);
        } break;
        case T_NotifyStmt: {
            NotifyStmt *stmt = (NotifyStmt *)parsetree;
            Async_Notify_My(stmt->conditionname, stmt->payload);
        } break;
        case T_UnlistenStmt: {
            UnlistenStmt *stmt = (UnlistenStmt *)parsetree;
            CheckRestrictedOperation("UNLISTEN");
            stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();
        } break;
        default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    }
    CommandCounterIncrement();
}

→ Всё это можно посмотреть в репозитории

Теги:
Хабы:
Всего голосов 4: ↑4 и ↓0+4
Комментарии0

Публикации