From 1e379feaa715626c7e2744bdb764bedb0fc3cab7 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 18 Nov 2024 16:13:26 +0530
Subject: [PATCH v63 1/2] Enhance replication slot error handling, slot
 invalidation, and inactive_since setting logic

In ReplicationSlotAcquire(), raise an error for invalid slots if the
caller specifies error_if_invalid=true.

Add check if the slot is already acquired, then mark it invalidated directly.

Ensure same inactive_since time for all slots in update_synced_slots_inactive_since()
and RestoreSlotFromDisk().
---
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |  13 +-
 src/backend/replication/slot.c                | 161 ++++++++++++------
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/include/replication/slot.h                |   3 +-
 src/test/recovery/t/019_replslot_limit.pl     |   2 +-
 8 files changed, 127 insertions(+), 62 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..692527b984 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
@@ -1508,7 +1508,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 static void
 update_synced_slots_inactive_since(void)
 {
-	TimestampTz now = 0;
+	TimestampTz now;
 
 	/*
 	 * We need to update inactive_since only when we are promoting standby to
@@ -1523,6 +1523,9 @@ update_synced_slots_inactive_since(void)
 	/* The slot sync worker or SQL function mustn't be running by now */
 	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
 			/* The slot must not be acquired by any process */
 			Assert(s->active_pid == 0);
 
-			/* Use the same inactive_since time for all the slots. */
-			if (now == 0)
-				now = GetCurrentTimestamp();
-
 			SpinLockAcquire(&s->mutex);
 			s->inactive_since = now;
 			SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..0330dcf57d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -163,6 +163,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static void RaiseSlotInvalidationError(ReplicationSlot *slot);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -535,9 +536,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +619,13 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot is found to
+	 * be invalid.
+	 */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		RaiseSlotInvalidationError(s);
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +796,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +823,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, false);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1557,7 +1568,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 /*
  * Helper for InvalidateObsoleteReplicationSlots
  *
- * Acquires the given slot and mark it invalid, if necessary and possible.
+ * Acquires the given slot unless already owned, and mark it invalid
+ * if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1600,10 +1612,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
-		 * invalidated, and is not currently acquired, acquire it and mark it
-		 * as having been invalidated.  We do this with the spinlock held to
-		 * avoid race conditions -- for example the restart_lsn could move
-		 * forward, or the slot could be dropped.
+		 * invalidated, and is already ours, mark it as invalidated;
+		 * otherwise, acquire it first and then mark it as invalidated. We do
+		 * this with the spinlock held to avoid race conditions -- for example
+		 * the restart_lsn could move forward, or the slot could be dropped.
 		 */
 		SpinLockAcquire(&s->mutex);
 
@@ -1676,11 +1688,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so and mark it as invalidated. If
+		 * the slot is already ours, mark it as invalidated. Otherwise, we'll
+		 * signal the owning process below and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot == s && active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -1695,20 +1708,56 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			/* Let caller know */
 			*invalidated = true;
-		}
 
-		SpinLockRelease(&s->mutex);
+			SpinLockRelease(&s->mutex);
 
-		/*
-		 * The logical replication slots shouldn't be invalidated as GUC
-		 * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
-		 * check_old_cluster_for_valid_slots() where we ensure that no
-		 * invalidated before the upgrade.
-		 */
-		Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
+			/*
+			 * Ensure that no logical slots are invalidated during binary
+			 * upgrade mode. This is guaranteed because, in binary upgrade
+			 * mode, the GUC max_slot_wal_keep_size is set to -1, and
+			 * check_old_cluster_for_valid_slots() verifies that no slots are
+			 * invalidated before the upgrade begins.
+			 */
+			if (IsBinaryUpgrade)
+				Assert(!(*invalidated && SlotIsLogical(s)));
+
+			/*
+			 * We hold the slot now and have already invalidated it; flush it
+			 * to ensure that state persists.
+			 *
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations, so release it now but be sure to tell caller
+			 * to restart from scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
 
-		if (active_pid != 0)
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ReportSlotInvalidation(invalidation_cause, false, active_pid,
+								   slotname, restart_lsn,
+								   oldestLSN, snapshotConflictHorizon);
+
+			/* done with this slot for now */
+			break;
+		}
+		else					/* Some other process owns the slot */
 		{
+			SpinLockRelease(&s->mutex);
+
+			/*
+			 * Ensure that no logical slots are invalidated during binary
+			 * upgrade mode. This is guaranteed because, in binary upgrade
+			 * mode, the GUC max_slot_wal_keep_size is set to -1, and
+			 * check_old_cluster_for_valid_slots() verifies that no slots are
+			 * invalidated before the upgrade begins.
+			 */
+			if (IsBinaryUpgrade)
+				Assert(!(*invalidated && SlotIsLogical(s)));
+
 			/*
 			 * Prepare the sleep on the slot's condition variable before
 			 * releasing the lock, to close a possible race condition if the
@@ -1761,31 +1810,6 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 			continue;
 		}
-		else
-		{
-			/*
-			 * We hold the slot now and have already invalidated it; flush it
-			 * to ensure that state persists.
-			 *
-			 * Don't want to hold ReplicationSlotControlLock across file
-			 * system operations, so release it now but be sure to tell caller
-			 * to restart from scratch.
-			 */
-			LWLockRelease(ReplicationSlotControlLock);
-			released_lock = true;
-
-			/* Make sure the invalidated state persists across server restart */
-			ReplicationSlotMarkDirty();
-			ReplicationSlotSave();
-			ReplicationSlotRelease();
-
-			ReportSlotInvalidation(invalidation_cause, false, active_pid,
-								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
-
-			/* done with this slot for now */
-			break;
-		}
 	}
 
 	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
@@ -2208,6 +2232,7 @@ RestoreSlotFromDisk(const char *name)
 	bool		restored = false;
 	int			readBytes;
 	pg_crc32c	checksum;
+	TimestampTz now;
 
 	/* no need to lock here, no concurrent access allowed yet */
 
@@ -2368,6 +2393,9 @@ RestoreSlotFromDisk(const char *name)
 						NameStr(cp.slotdata.name)),
 				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	/* nothing can be active yet, don't lock anything */
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -2400,7 +2428,7 @@ RestoreSlotFromDisk(const char *name)
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
 		 * makes the slot active will reset it.
 		 */
-		slot->inactive_since = GetCurrentTimestamp();
+		slot->inactive_since = now;
 
 		restored = true;
 		break;
@@ -2793,3 +2821,40 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * Raise an error based on the slot's invalidation cause.
+ */
+static void
+RaiseSlotInvalidationError(ReplicationSlot *slot)
+{
+	StringInfo	err_detail = makeStringInfo();
+
+	Assert(slot->data.invalidated != RS_INVAL_NONE);
+
+	switch (slot->data.invalidated)
+	{
+		case RS_INVAL_WAL_REMOVED:
+			appendStringInfoString(err_detail, _("This slot has been invalidated because the required WAL has been removed."));
+			break;
+
+		case RS_INVAL_HORIZON:
+			appendStringInfoString(err_detail, _("This slot has been invalidated because the required rows have been removed."));
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			/* translator: %s is a GUC variable name */
+			appendStringInfo(err_detail, _("This slot has been invalidated because \"%s\" is insufficient for slot."),
+							 "wal_level");
+			break;
+
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			errmsg("can no longer get changes from replication slot \"%s\"",
+				   NameStr(slot->data.name)),
+			errdetail_internal("%s", err_detail->data));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a0782b1bbf..3df5bd7b2a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..66ac7c40f1 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This slot has been invalidated because the required WAL has been removed",
 			$logstart))
 	{
 		$failed = 1;
-- 
2.34.1

