From 6e35ac0a298817c534e15e4d4bee5b9fb9f9e830 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v10] Handle stopSignaled during sync function call.

This patch attempts to fix below issues:

1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup
callbacks registration, so that we first release the replication slot,
then set pid to invalid one and then set syncing to false.
Setting 'syncing' to false needs to be performed as the last step of
cleanup so that shutdown flow during promotion can reliably exit
from wait-loop on seeing 'syncing' as false.

2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.

3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
 doc/src/sgml/func.sgml                     |   4 +
 src/backend/replication/logical/slotsync.c | 272 ++++++++++++++-------
 2 files changed, 186 insertions(+), 90 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 92a0f49e6a..e3e22645ed 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+        Note that this function cannot be executed if
+        <link linkend="guc-sync-replication-slots"><varname>
+        sync_replication_slots</varname></link> is enabled and the slotsync
+        worker is already running to perform the synchronization of slots.
        </para>
 
        <caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index cb39adcd0e..7fb4733504 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
  * and also sets stopSignaled=true to handle the race condition when the
  * postmaster has not noticed the promotion yet and thus may end up restarting
  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
  *
  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
  * overwrites.
@@ -92,9 +93,6 @@
  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
  * last_start_time before exiting, so that postmaster can start the worker
  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
  */
 typedef struct SlotSyncCtxStruct
 {
@@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	if (SlotSyncCtx->syncing)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots concurrently"));
-	}
-
-	SlotSyncCtx->syncing = true;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = true;
-
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
 	{
@@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 	if (started_tx)
 		CommitTransactionCommand();
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	SlotSyncCtx->syncing = false;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = false;
-
 	return some_slot_updated;
 }
 
@@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
 		slotsync_reread_config();
 }
 
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+	walrcv_disconnect(wrconn);
+}
+
 /*
  * Cleanup function for slotsync worker.
  *
@@ -1199,7 +1190,20 @@ static void
 slotsync_worker_onexit(int code, Datum arg)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
+
 	SlotSyncCtx->pid = InvalidPid;
+
+	/*
+	 * If syncing_slots is true, it indicates that the process errored out
+	 * without resetting the flag. So, we need to clean up shared memory and
+	 * reset the flag here.
+	 */
+	if (syncing_slots)
+	{
+		SlotSyncCtx->syncing = false;
+		syncing_slots = false;
+	}
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 }
 
@@ -1242,6 +1246,64 @@ wait_for_slot_activity(bool some_slot_updated)
 		ResetLatch(MyLatch);
 }
 
+/*
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
+ */
+static void
+check_and_set_sync_info(pid_t worker_pid)
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+
+	/* The worker pid must not be already assigned in SlotSyncCtx */
+	Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+	/*
+	 * Emit an error if startup process signaled the slot sync machinery to
+	 * stop. See comments atop SlotSyncCtxStruct.
+	 */
+	if (SlotSyncCtx->stopSignaled)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+	}
+
+	if (SlotSyncCtx->syncing)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots concurrently"));
+	}
+
+	SlotSyncCtx->syncing = true;
+
+	/*
+	 * Advertise the required PID so that the startup process can kill the
+	 * slot sync worker on promotion.
+	 */
+	SlotSyncCtx->pid = worker_pid;
+
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	SlotSyncCtx->syncing = false;
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = false;
+};
+
 /*
  * The main loop of our worker process.
  *
@@ -1272,52 +1334,23 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	InitProcess();
 
 	/*
-	 * Early initialization.
+	 * Register slotsync_worker_onexit() before we register
+	 * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the
+	 * exit of the slot sync worker, ReplicationSlotShmemExit() is called
+	 * first, followed by slotsync_worker_onexit(). The startup process during
+	 * promotion invokes ShutDownSlotSync() which waits for slot sync to
+	 * finish and it does that by checking the 'syncing' flag. Thus worker
+	 * must be done with the slots' release and cleanup before it marks itself
+	 * as finished syncing.
 	 */
-	BaseInit();
-
-	Assert(SlotSyncCtx != NULL);
-
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	Assert(SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Startup process signaled the slot sync worker to stop, so if meanwhile
-	 * postmaster ended up starting the worker again, exit.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		proc_exit(0);
-	}
-
-	/* Advertise our PID so that the startup process can kill us on promotion */
-	SlotSyncCtx->pid = MyProcPid;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	ereport(LOG, errmsg("slot sync worker started"));
-
-	/* Register it as soon as SlotSyncCtx->pid is initialized. */
 	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
 
-	/* Setup signal handling */
-	pqsignal(SIGHUP, SignalHandlerForConfigReload);
-	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
-	pqsignal(SIGTERM, die);
-	pqsignal(SIGFPE, FloatExceptionHandler);
-	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
-	pqsignal(SIGUSR2, SIG_IGN);
-	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGCHLD, SIG_DFL);
-
 	/*
-	 * Establishes SIGALRM handler and initialize timeout module. It is needed
-	 * by InitPostgres to register different timeouts.
+	 * Early initialization.
 	 */
-	InitializeTimeouts();
+	BaseInit();
 
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
+	Assert(SlotSyncCtx != NULL);
 
 	/*
 	 * If an exception is encountered, processing resumes here.
@@ -1350,6 +1383,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	/* Setup signal handling */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, die);
+	pqsignal(SIGFPE, FloatExceptionHandler);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	check_and_set_sync_info(MyProcPid);
+
+	ereport(LOG, errmsg("slot sync worker started"));
+
+	/*
+	 * Establishes SIGALRM handler and initialize timeout module. It is needed
+	 * by InitPostgres to register different timeouts.
+	 */
+	InitializeTimeouts();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
 	 */
@@ -1402,13 +1458,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 				errmsg("could not connect to the primary server: %s", err));
 
 	/*
-	 * Register the failure callback once we have the connection.
+	 * Register the disconnection callback.
 	 *
-	 * XXX: This can be combined with previous such cleanup registration of
+	 * XXX: This can be combined with previous cleanup registration of
 	 * slotsync_worker_onexit() but that will need the connection to be made
 	 * global and we want to avoid introducing global for this purpose.
 	 */
-	before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+	before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
 
 	/*
 	 * Using the specified primary server connection, check that we are not a
@@ -1457,8 +1513,8 @@ update_synced_slots_inactive_since(void)
 	if (!StandbyMode)
 		return;
 
-	/* The slot sync worker mustn't be running by now */
-	Assert(SlotSyncCtx->pid == InvalidPid);
+	/* The slot sync worker or SQL function mustn't be running by now */
+	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
@@ -1471,6 +1527,9 @@ update_synced_slots_inactive_since(void)
 		{
 			Assert(SlotIsLogical(s));
 
+			/* The slot must not be acquired by any process */
+			Assert(s->active_pid == 0);
+
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
 				now = GetCurrentTimestamp();
@@ -1486,25 +1545,39 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
  */
 void
 ShutDownSlotSync(void)
 {
+	pid_t		worker_pid;
+
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if (!SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
 		return;
 	}
+
+	worker_pid = SlotSyncCtx->pid;
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (worker_pid != InvalidPid)
+		kill(worker_pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for slot sync to end */
 	for (;;)
 	{
 		int			rc;
@@ -1522,8 +1595,8 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/* Ensure that no process is syncing the slots. */
+		if (!SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1674,37 @@ SlotSyncShmemInit(void)
 }
 
 /*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
  */
 static void
 slotsync_failure_callback(int code, Datum arg)
 {
 	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
 
-	if (syncing_slots)
-	{
-		/*
-		 * If syncing_slots is true, it indicates that the process errored out
-		 * without resetting the flag. So, we need to clean up shared memory
-		 * and reset the flag here.
-		 */
-		SpinLockAcquire(&SlotSyncCtx->mutex);
-		SlotSyncCtx->syncing = false;
-		SpinLockRelease(&SlotSyncCtx->mutex);
+	/*
+	 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+	 *
+	 * The startup process during promotion invokes ShutDownSlotSync() which
+	 * waits for slot sync to finish and it does that by checking the
+	 * 'syncing' flag. Thus the SQL function must be done with slots' release
+	 * and cleanup to avoid any dangling temporary slots or active slots
+	 * before it marks itself as finished syncing.
+	 */
 
-		syncing_slots = false;
-	}
+	/* Make sure active replication slots are released */
+	if (MyReplicationSlot != NULL)
+		ReplicationSlotRelease();
+
+	/* Also cleanup all the temporary slots. */
+	ReplicationSlotCleanup();
+
+	/*
+	 * The set syncing_slots indicates that the process errored out without
+	 * resetting the flag. So, we need to clean up shared memory and reset the
+	 * flag here.
+	 */
+	if (syncing_slots)
+		reset_syncing_flag();
 
 	walrcv_disconnect(wrconn);
 }
@@ -1634,9 +1718,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		check_and_set_sync_info(InvalidPid);
+
 		validate_remote_info(wrconn);
 
 		synchronize_slots(wrconn);
+
+		/* Cleanup the temporary slots */
+		ReplicationSlotCleanup();
+
+		/* We are done with sync, so reset sync flag */
+		reset_syncing_flag();
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 }
-- 
2.34.1

