From 3f89c4894552a7c2282308f4d4ff921949264fa2 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v9] Handle stopSignaled during sync function call.

This patch attempts to fix below issues:

1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup
callbacks registration, so that we first release the replication slot,
then set pid to invalid one and then set syncing to false.
Setting 'syncing' to false needs to be performed as the last step of
cleanup so that shutdown flow during promotion can reliably exit
from wait-loop on seeing 'syncing' as false.

2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.

3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
 doc/src/sgml/func.sgml                     |   4 +
 src/backend/replication/logical/slotsync.c | 264 ++++++++++++++-------
 2 files changed, 182 insertions(+), 86 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8dfb42ad4d..65a5fa6bb2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+        Note that this function cannot be executed if
+        <link linkend="guc-sync-replication-slots"><varname>
+        sync_replication_slots</varname></link> is enabled and the slotsync
+        worker is already running to perform the synchronization of slots.
        </para>
 
        <caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..32a262d613 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
  * last_start_time before exiting, so that postmaster can start the worker
  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
  */
 typedef struct SlotSyncCtxStruct
 {
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	if (SlotSyncCtx->syncing)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots concurrently"));
-	}
-
-	SlotSyncCtx->syncing = true;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = true;
-
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
 	{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 	if (started_tx)
 		CommitTransactionCommand();
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	SlotSyncCtx->syncing = false;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = false;
-
 	return some_slot_updated;
 }
 
@@ -1190,6 +1167,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
 		slotsync_reread_config();
 }
 
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+	walrcv_disconnect(wrconn);
+}
+
 /*
  * Cleanup function for slotsync worker.
  *
@@ -1199,7 +1189,20 @@ static void
 slotsync_worker_onexit(int code, Datum arg)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
+
 	SlotSyncCtx->pid = InvalidPid;
+
+	/*
+	 * If syncing_slots is true, it indicates that the process errored out
+	 * without resetting the flag. So, we need to clean up shared memory and
+	 * reset the flag here.
+	 */
+	if (syncing_slots)
+	{
+		SlotSyncCtx->syncing = false;
+		syncing_slots = false;
+	}
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 }
 
@@ -1242,6 +1245,63 @@ wait_for_slot_activity(bool some_slot_updated)
 		ResetLatch(MyLatch);
 }
 
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+
+	/* The worker pid must not be already assigned in SlotSyncCtx */
+	Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+	/*
+	 * Startup process signaled the slot sync machinery to stop, so if
+	 * meanwhile postmaster ended up starting the worker again or user has
+	 * invoked pg_sync_replication_slots(), error out.
+	 */
+	if (SlotSyncCtx->stopSignaled)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+	}
+
+	if (SlotSyncCtx->syncing)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots concurrently"));
+	}
+
+	SlotSyncCtx->syncing = true;
+
+	/* Advertise our PID so that the startup process can kill us on promotion */
+	SlotSyncCtx->pid = worker_pid;
+
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	SlotSyncCtx->syncing = false;
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = false;
+};
+
 /*
  * The main loop of our worker process.
  *
@@ -1272,52 +1332,23 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	InitProcess();
 
 	/*
-	 * Early initialization.
+	 * Register slotsync_worker_onexit() before we register
+	 * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of
+	 * slot sync worker, ReplicationSlotShmemExit() is called first, followed
+	 * by slotsync_worker_onexit(). Startup process during promotion calls
+	 * ShutDownSlotSync() which waits for slot sync to finish and it does that
+	 * by checking the 'syncing' flag. Thus it is important that worker should
+	 * be done with slots' release and cleanup before it actually marks itself
+	 * as finished syncing.
 	 */
-	BaseInit();
-
-	Assert(SlotSyncCtx != NULL);
-
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	Assert(SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Startup process signaled the slot sync worker to stop, so if meanwhile
-	 * postmaster ended up starting the worker again, exit.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		proc_exit(0);
-	}
-
-	/* Advertise our PID so that the startup process can kill us on promotion */
-	SlotSyncCtx->pid = MyProcPid;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	ereport(LOG, errmsg("slot sync worker started"));
-
-	/* Register it as soon as SlotSyncCtx->pid is initialized. */
 	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
 
-	/* Setup signal handling */
-	pqsignal(SIGHUP, SignalHandlerForConfigReload);
-	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
-	pqsignal(SIGTERM, die);
-	pqsignal(SIGFPE, FloatExceptionHandler);
-	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
-	pqsignal(SIGUSR2, SIG_IGN);
-	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGCHLD, SIG_DFL);
-
 	/*
-	 * Establishes SIGALRM handler and initialize timeout module. It is needed
-	 * by InitPostgres to register different timeouts.
+	 * Early initialization.
 	 */
-	InitializeTimeouts();
+	BaseInit();
 
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
+	Assert(SlotSyncCtx != NULL);
 
 	/*
 	 * If an exception is encountered, processing resumes here.
@@ -1350,6 +1381,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	/* Setup signal handling */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, die);
+	pqsignal(SIGFPE, FloatExceptionHandler);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	check_flags_and_set_sync_info(MyProcPid);
+
+	ereport(LOG, errmsg("slot sync worker started"));
+
+	/*
+	 * Establishes SIGALRM handler and initialize timeout module. It is needed
+	 * by InitPostgres to register different timeouts.
+	 */
+	InitializeTimeouts();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
 	 */
@@ -1402,13 +1456,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 				errmsg("could not connect to the primary server: %s", err));
 
 	/*
-	 * Register the failure callback once we have the connection.
+	 * Register the disconnection callback.
 	 *
-	 * XXX: This can be combined with previous such cleanup registration of
+	 * XXX: This can be combined with previous cleanup registration of
 	 * slotsync_worker_onexit() but that will need the connection to be made
 	 * global and we want to avoid introducing global for this purpose.
 	 */
-	before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+	before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
 
 	/*
 	 * Using the specified primary server connection, check that we are not a
@@ -1457,8 +1511,8 @@ update_synced_slots_inactive_since(void)
 	if (!StandbyMode)
 		return;
 
-	/* The slot sync worker mustn't be running by now */
-	Assert(SlotSyncCtx->pid == InvalidPid);
+	/* The slot sync worker or SQL function mustn't be running by now */
+	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
@@ -1471,6 +1525,9 @@ update_synced_slots_inactive_since(void)
 		{
 			Assert(SlotIsLogical(s));
 
+			/* The slot must not be acquired by any process */
+			Assert(s->active_pid == 0);
+
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
 				now = GetCurrentTimestamp();
@@ -1486,25 +1543,39 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
  */
 void
 ShutDownSlotSync(void)
 {
+	pid_t		worker_pid;
+
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if (!SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
 		return;
 	}
+
+	worker_pid = SlotSyncCtx->pid;
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (worker_pid != InvalidPid)
+		kill(worker_pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for slot sync to end */
 	for (;;)
 	{
 		int			rc;
@@ -1522,8 +1593,11 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/*
+		 * Confirm that both the worker and the function
+		 * pg_sync_replication_slots() are done.
+		 */
+		if (!SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1675,36 @@ SlotSyncShmemInit(void)
 }
 
 /*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
  */
 static void
 slotsync_failure_callback(int code, Datum arg)
 {
 	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
 
-	if (syncing_slots)
-	{
-		/*
-		 * If syncing_slots is true, it indicates that the process errored out
-		 * without resetting the flag. So, we need to clean up shared memory
-		 * and reset the flag here.
-		 */
-		SpinLockAcquire(&SlotSyncCtx->mutex);
-		SlotSyncCtx->syncing = false;
-		SpinLockRelease(&SlotSyncCtx->mutex);
+	/*
+	 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+	 *
+	 * Startup process during promotion calls ShutDownSlotSync() which waits
+	 * for slot sync to finish and it does that by checking the 'syncing'
+	 * flag. Thus it is important that SQL function should be done with slots'
+	 * release and cleanup before it actually marks itself as finished
+	 * syncing.
+	 */
+	/* Make sure active replication slots are released */
+	if (MyReplicationSlot != NULL)
+		ReplicationSlotRelease();
 
-		syncing_slots = false;
-	}
+	/* Also cleanup all the temporary slots. */
+	ReplicationSlotCleanup();
+
+	/*
+	 * If syncing_slots is true, it indicates that the process errored out
+	 * without resetting the flag. So, we need to clean up shared memory and
+	 * reset the flag here.
+	 */
+	if (syncing_slots)
+		reset_syncing_flag();
 
 	walrcv_disconnect(wrconn);
 }
@@ -1634,9 +1718,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		check_flags_and_set_sync_info(InvalidPid);
+
 		validate_remote_info(wrconn);
 
 		synchronize_slots(wrconn);
+
+		/* Cleanup the temporary slots */
+		ReplicationSlotCleanup();
+
+		/* We are done with sync, so reset sync flag */
+		reset_syncing_flag();
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 }
-- 
2.34.1

