From b0ac6625ca925b7f8981d99c5e85e4758297054c Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v11] Handle stopSignaled during sync function call.

This patch attempts to fix below issues:

1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup, so
that we first release the replication slot, then set pid to invalid
one and syncing to false. Setting 'syncing' to false needs to be
performed as the last step of cleanup so that shutdown flow during
promotion can reliably exit from wait-loop on seeing 'syncing' as
false.

2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.

3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
 doc/src/sgml/func.sgml                     |   4 +
 src/backend/replication/logical/slotsync.c | 282 ++++++++++++++-------
 src/backend/replication/slot.c             |  12 +-
 src/backend/replication/walsender.c        |   2 +-
 src/backend/tcop/postgres.c                |   2 +-
 src/include/replication/slot.h             |   2 +-
 6 files changed, 206 insertions(+), 98 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 92a0f49e6a..e3e22645ed 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+        Note that this function cannot be executed if
+        <link linkend="guc-sync-replication-slots"><varname>
+        sync_replication_slots</varname></link> is enabled and the slotsync
+        worker is already running to perform the synchronization of slots.
        </para>
 
        <caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index cb39adcd0e..578cfce896 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
  * and also sets stopSignaled=true to handle the race condition when the
  * postmaster has not noticed the promotion yet and thus may end up restarting
  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
  *
  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
  * overwrites.
@@ -92,9 +93,6 @@
  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
  * last_start_time before exiting, so that postmaster can start the worker
  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
  */
 typedef struct SlotSyncCtxStruct
 {
@@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	if (SlotSyncCtx->syncing)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots concurrently"));
-	}
-
-	SlotSyncCtx->syncing = true;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = true;
-
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
 	{
@@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 	if (started_tx)
 		CommitTransactionCommand();
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	SlotSyncCtx->syncing = false;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = false;
-
 	return some_slot_updated;
 }
 
@@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
 		slotsync_reread_config();
 }
 
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+	walrcv_disconnect(wrconn);
+}
+
 /*
  * Cleanup function for slotsync worker.
  *
@@ -1198,8 +1189,38 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
 static void
 slotsync_worker_onexit(int code, Datum arg)
 {
+	/*
+	 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+	 *
+	 * The startup process during promotion invokes ShutDownSlotSync() which
+	 * waits for slot sync to finish and it does that by checking the
+	 * 'syncing' flag. Thus the slot sync worker must be done with slots'
+	 * release and cleanup to avoid any dangling temporary slots or active
+	 * slots before it marks itself as finished syncing.
+	 */
+
+	/* Make sure active replication slots are released */
+	if (MyReplicationSlot != NULL)
+		ReplicationSlotRelease();
+
+	/* Also cleanup the temporary slots. */
+	ReplicationSlotCleanup(false);
+
 	SpinLockAcquire(&SlotSyncCtx->mutex);
+
 	SlotSyncCtx->pid = InvalidPid;
+
+	/*
+	 * If syncing_slots is true, it indicates that the process errored out
+	 * without resetting the flag. So, we need to clean up shared memory and
+	 * reset the flag here.
+	 */
+	if (syncing_slots)
+	{
+		SlotSyncCtx->syncing = false;
+		syncing_slots = false;
+	}
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 }
 
@@ -1242,6 +1263,64 @@ wait_for_slot_activity(bool some_slot_updated)
 		ResetLatch(MyLatch);
 }
 
+/*
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
+ */
+static void
+check_and_set_sync_info(pid_t worker_pid)
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+
+	/* The worker pid must not be already assigned in SlotSyncCtx */
+	Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+	/*
+	 * Emit an error if startup process signaled the slot sync machinery to
+	 * stop. See comments atop SlotSyncCtxStruct.
+	 */
+	if (SlotSyncCtx->stopSignaled)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+	}
+
+	if (SlotSyncCtx->syncing)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots concurrently"));
+	}
+
+	SlotSyncCtx->syncing = true;
+
+	/*
+	 * Advertise the required PID so that the startup process can kill the
+	 * slot sync worker on promotion.
+	 */
+	SlotSyncCtx->pid = worker_pid;
+
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	SlotSyncCtx->syncing = false;
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = false;
+};
+
 /*
  * The main loop of our worker process.
  *
@@ -1278,47 +1357,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 
 	Assert(SlotSyncCtx != NULL);
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	Assert(SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Startup process signaled the slot sync worker to stop, so if meanwhile
-	 * postmaster ended up starting the worker again, exit.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		proc_exit(0);
-	}
-
-	/* Advertise our PID so that the startup process can kill us on promotion */
-	SlotSyncCtx->pid = MyProcPid;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	ereport(LOG, errmsg("slot sync worker started"));
-
-	/* Register it as soon as SlotSyncCtx->pid is initialized. */
-	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
-	/* Setup signal handling */
-	pqsignal(SIGHUP, SignalHandlerForConfigReload);
-	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
-	pqsignal(SIGTERM, die);
-	pqsignal(SIGFPE, FloatExceptionHandler);
-	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
-	pqsignal(SIGUSR2, SIG_IGN);
-	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGCHLD, SIG_DFL);
-
-	/*
-	 * Establishes SIGALRM handler and initialize timeout module. It is needed
-	 * by InitPostgres to register different timeouts.
-	 */
-	InitializeTimeouts();
-
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
-
 	/*
 	 * If an exception is encountered, processing resumes here.
 	 *
@@ -1350,6 +1388,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	/* Setup signal handling */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, die);
+	pqsignal(SIGFPE, FloatExceptionHandler);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	check_and_set_sync_info(MyProcPid);
+
+	ereport(LOG, errmsg("slot sync worker started"));
+
+	/* Register it as soon as SlotSyncCtx->pid is initialized. */
+	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+	/*
+	 * Establishes SIGALRM handler and initialize timeout module. It is needed
+	 * by InitPostgres to register different timeouts.
+	 */
+	InitializeTimeouts();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
 	 */
@@ -1402,13 +1466,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 				errmsg("could not connect to the primary server: %s", err));
 
 	/*
-	 * Register the failure callback once we have the connection.
+	 * Register the disconnection callback.
 	 *
-	 * XXX: This can be combined with previous such cleanup registration of
+	 * XXX: This can be combined with previous cleanup registration of
 	 * slotsync_worker_onexit() but that will need the connection to be made
 	 * global and we want to avoid introducing global for this purpose.
 	 */
-	before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+	before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
 
 	/*
 	 * Using the specified primary server connection, check that we are not a
@@ -1457,8 +1521,8 @@ update_synced_slots_inactive_since(void)
 	if (!StandbyMode)
 		return;
 
-	/* The slot sync worker mustn't be running by now */
-	Assert(SlotSyncCtx->pid == InvalidPid);
+	/* The slot sync worker or SQL function mustn't be running by now */
+	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
@@ -1471,6 +1535,9 @@ update_synced_slots_inactive_since(void)
 		{
 			Assert(SlotIsLogical(s));
 
+			/* The slot must not be acquired by any process */
+			Assert(s->active_pid == 0);
+
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
 				now = GetCurrentTimestamp();
@@ -1486,25 +1553,39 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
  */
 void
 ShutDownSlotSync(void)
 {
+	pid_t		worker_pid;
+
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if (!SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
 		return;
 	}
+
+	worker_pid = SlotSyncCtx->pid;
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (worker_pid != InvalidPid)
+		kill(worker_pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for slot sync to end */
 	for (;;)
 	{
 		int			rc;
@@ -1522,8 +1603,8 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/* Ensure that no process is syncing the slots. */
+		if (!SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1682,37 @@ SlotSyncShmemInit(void)
 }
 
 /*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
  */
 static void
 slotsync_failure_callback(int code, Datum arg)
 {
 	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
 
-	if (syncing_slots)
-	{
-		/*
-		 * If syncing_slots is true, it indicates that the process errored out
-		 * without resetting the flag. So, we need to clean up shared memory
-		 * and reset the flag here.
-		 */
-		SpinLockAcquire(&SlotSyncCtx->mutex);
-		SlotSyncCtx->syncing = false;
-		SpinLockRelease(&SlotSyncCtx->mutex);
+	/*
+	 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+	 *
+	 * The startup process during promotion invokes ShutDownSlotSync() which
+	 * waits for slot sync to finish and it does that by checking the
+	 * 'syncing' flag. Thus the SQL function must be done with slots' release
+	 * and cleanup to avoid any dangling temporary slots or active slots
+	 * before it marks itself as finished syncing.
+	 */
 
-		syncing_slots = false;
-	}
+	/* Make sure active replication slots are released */
+	if (MyReplicationSlot != NULL)
+		ReplicationSlotRelease();
+
+	/* Also cleanup the synced temporary slots. */
+	ReplicationSlotCleanup(true);
+
+	/*
+	 * The set syncing_slots indicates that the process errored out without
+	 * resetting the flag. So, we need to clean up shared memory and reset the
+	 * flag here.
+	 */
+	if (syncing_slots)
+		reset_syncing_flag();
 
 	walrcv_disconnect(wrconn);
 }
@@ -1634,9 +1726,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		check_and_set_sync_info(InvalidPid);
+
 		validate_remote_info(wrconn);
 
 		synchronize_slots(wrconn);
+
+		/* Cleanup the synced temporary slots */
+		ReplicationSlotCleanup(true);
+
+		/* We are done with sync, so reset sync flag */
+		reset_syncing_flag();
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cebf44bb0f..aa4ea387da 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -237,7 +237,7 @@ ReplicationSlotShmemExit(int code, Datum arg)
 		ReplicationSlotRelease();
 
 	/* Also cleanup all the temporary slots. */
-	ReplicationSlotCleanup();
+	ReplicationSlotCleanup(false);
 }
 
 /*
@@ -736,10 +736,13 @@ ReplicationSlotRelease(void)
 }
 
 /*
- * Cleanup all temporary slots created in current session.
+ * Cleanup temporary slots created in current session.
+ *
+ * Cleanup only synced temporary slots if 'synced_only' is true, else
+ * cleanup all temporary slots.
  */
 void
-ReplicationSlotCleanup(void)
+ReplicationSlotCleanup(bool synced_only)
 {
 	int			i;
 
@@ -755,7 +758,8 @@ restart:
 			continue;
 
 		SpinLockAcquire(&s->mutex);
-		if (s->active_pid == MyProcPid)
+		if ((s->active_pid == MyProcPid &&
+			 (!synced_only || s->data.synced)))
 		{
 			Assert(s->data.persistency == RS_TEMPORARY);
 			SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9bf7c67f37..c623b07cf0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -336,7 +336,7 @@ WalSndErrorCleanup(void)
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
 
-	ReplicationSlotCleanup();
+	ReplicationSlotCleanup(false);
 
 	replication_active = false;
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 76f48b13d2..2dff28afce 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4410,7 +4410,7 @@ PostgresMain(const char *dbname, const char *username)
 			ReplicationSlotRelease();
 
 		/* We also want to cleanup temporary slots on error. */
-		ReplicationSlotCleanup();
+		ReplicationSlotCleanup(false);
 
 		jit_reset_after_error();
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..1bc80960ef 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -247,7 +247,7 @@ extern void ReplicationSlotAlter(const char *name, bool failover);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
-extern void ReplicationSlotCleanup(void);
+extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
 extern void ReplicationSlotMarkDirty(void);
 
-- 
2.34.1

