From 60a5beb355ef2c9c2392e0229af0a24293882f50 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 8 Sep 2024 11:33:05 +0000
Subject: [PATCH v45] Add inactive_timeout based replication slot invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
this GUC is a bit tricky. Because the amount of WAL a database
generates, and the allocated storage for instance will vary
greatly in production, making it difficult to pin down a
one-size-fits-all value.

It is often easy for users to set a timeout of say 1 or 2 or n
days, after which all the inactive slots get invalidated. This
commit introduces a GUC named replication_slot_inactive_timeout.
When set, postgres invalidates slots (during non-shutdown
checkpoints) that are inactive for longer than this amount of
time.

Note that the inactive timeout invalidation mechanism is not
applicable for slots on the standby server that are being synced
from the primary server (i.e., standby slots having 'synced' field
'true'). Synced slots are always considered to be inactive because
they don't perform logical decoding to produce changes.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Reviewed-by: Ajin Cherian, Shveta Malik, Peter Smith
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CA%2BTgmoZTbaaEjSZUG1FL0mzxAdN3qmXksO3O9_PZhEuXTkVnRQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/202403260841.5jcv7ihniccy%40alvherre.pgsql
---
 doc/src/sgml/config.sgml                      |  35 ++
 doc/src/sgml/system-views.sgml                |   7 +
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   8 +-
 src/backend/replication/slot.c                | 171 ++++++++--
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |  24 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 321 ++++++++++++++++++
 13 files changed, 560 insertions(+), 30 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0aec11f443..27b2285da1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4556,6 +4556,41 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-inactive-timeout" xreflabel="replication_slot_inactive_timeout">
+      <term><varname>replication_slot_inactive_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_inactive_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are inactive for longer than this
+        amount of time. If this value is specified without units,
+        it is taken as seconds. A value of zero (which is default) disables
+        the inactive timeout invalidation mechanism. This parameter can only
+        be set in the <filename>postgresql.conf</filename> file or on the
+        server command line.
+       </para>
+
+       <para>
+        Slot invalidation due to inactivity timeout occurs during checkpoint.
+        The duration of slot inactivity is calculated using the slot's
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>inactive_since</structfield>
+        value.
+       </para>
+
+       <para>
+        Note that the inactive timeout invalidation mechanism is not
+        applicable for slots on the standby server that are being synced
+        from primary server (i.e., standby slots having
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+        value <literal>true</literal>).
+        Synced slots are always considered to be inactive because they don't
+        perform logical decoding to produce changes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 634a4c0fab..5633429eef 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2618,6 +2618,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for longer than the amount of time specified by the
+          <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f9649eec1a..6cc7a739ec 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -448,7 +448,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -667,7 +667,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
@@ -1543,9 +1543,7 @@ update_synced_slots_inactive_since(void)
 			if (now == 0)
 				now = GetCurrentTimestamp();
 
-			SpinLockAcquire(&s->mutex);
-			s->inactive_since = now;
-			SpinLockRelease(&s->mutex);
+			ReplicationSlotSetInactiveSince(s, &now, true);
 		}
 	}
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0a03776156..d92b92bfed 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -140,6 +141,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			replication_slot_inactive_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -159,6 +161,14 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
+static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+										   ReplicationSlot *s,
+										   XLogRecPtr oldestLSN,
+										   Oid dboid,
+										   TransactionId snapshotConflictHorizon,
+										   bool *invalidated);
+static inline bool SlotInactiveTimeoutCheckAllowed(ReplicationSlot *s);
+
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
@@ -535,9 +545,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot has been
+ * invalidated previously.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +628,21 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot has been
+	 * invalidated previously.
+	 */
+	if (error_if_invalid && s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+	{
+		Assert(s->inactive_since > 0);
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("can no longer get changes from replication slot \"%s\"",
+						NameStr(s->data.name)),
+				 errdetail("This slot has been invalidated because it was inactive for longer than the amount of time specified by \"%s\".",
+						   "replication_slot_inactive_timeout.")));
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -703,16 +731,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		ReplicationSlotSetInactiveSince(slot, &now, false);
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 	else
-	{
-		SpinLockAcquire(&slot->mutex);
-		slot->inactive_since = now;
-		SpinLockRelease(&slot->mutex);
-	}
+		ReplicationSlotSetInactiveSince(slot, &now, true);
 
 	MyReplicationSlot = NULL;
 
@@ -785,7 +809,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +836,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1501,7 +1525,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1531,6 +1556,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
 			break;
+
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			Assert(inactive_since > 0);
+			appendStringInfo(&err_detail,
+							 _("The slot has been inactive since %s for longer than the amount of time specified by \"%s\"."),
+							 timestamptz_to_str(inactive_since),
+							 "replication_slot_inactive_timeout");
+			break;
+
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1547,6 +1581,28 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 	pfree(err_detail.data);
 }
 
+/*
+ * Is this replication slot allowed for inactive timeout invalidation check?
+ *
+ * Check if inactive timeout invalidation mechanism is disabled or slot is
+ * currently being used or server is in recovery mode or slot on standby is
+ * currently being synced from the primary.
+ *
+ * Note that the inactive timeout invalidation mechanism is not
+ * applicable for slots on the standby server that are being synced
+ * from the primary server (i.e., standby slots having 'synced' field 'true').
+ * Synced slots are always considered to be inactive because they don't
+ * perform logical decoding to produce changes.
+ */
+static inline bool
+SlotInactiveTimeoutCheckAllowed(ReplicationSlot *s)
+{
+	return (replication_slot_inactive_timeout > 0 &&
+			s->inactive_since > 0 &&
+			!RecoveryInProgress() &&
+			!s->data.synced);
+}
+
 /*
  * Helper for InvalidateObsoleteReplicationSlots
  *
@@ -1574,6 +1630,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1581,6 +1638,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1591,6 +1649,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_INACTIVE_TIMEOUT &&
+			SlotInactiveTimeoutCheckAllowed(s))
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1644,6 +1712,28 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+
+					if (!SlotInactiveTimeoutCheckAllowed(s))
+						break;
+
+					/*
+					 * Check if the slot needs to be invalidated due to
+					 * replication_slot_inactive_timeout GUC.
+					 */
+					if (TimestampDifferenceExceeds(s->inactive_since, now,
+												   replication_slot_inactive_timeout * 1000))
+					{
+						invalidation_cause = cause;
+						inactive_since = s->inactive_since;
+
+						/*
+						 * Invalidation due to inactive timeout implies that
+						 * no one is using the slot.
+						 */
+						Assert(s->active_pid == 0);
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1669,11 +1759,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so and mark it as invalidated. If
+		 * the slot is already ours, mark it as invalidated. Otherwise, we'll
+		 * signal the owning process below and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot == s &&
+			 active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -1728,7 +1820,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1774,7 +1867,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since);
 
 			/* done with this slot for now */
 			break;
@@ -1797,6 +1891,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1849,7 +1944,8 @@ restart:
 }
 
 /*
- * Flush all replication slots to disk.
+ * Flush all replication slots to disk. Also, invalidate obsolete slots during
+ * non-shutdown checkpoint.
  *
  * It is convenient to flush dirty replication slots at the time of checkpoint.
  * Additionally, in case of a shutdown checkpoint, we also identify the slots
@@ -1907,6 +2003,38 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!is_shutdown)
+	{
+		elog(DEBUG1, "performing replication slot invalidation checks");
+
+		/*
+		 * NB: We will make another pass over replication slots for
+		 * invalidation checks to keep the code simple. Testing shows that
+		 * there is no noticeable overhead (when compared with wal_removed
+		 * invalidation) even if we were to do inactive_timeout invalidation
+		 * of thousands of replication slots here. If it is ever proven that
+		 * this assumption is wrong, we will have to perform the invalidation
+		 * checks in the above for loop with the following changes:
+		 *
+		 * - Acquire ControlLock lock once before the loop.
+		 *
+		 * - Call InvalidatePossiblyObsoleteSlot for each slot.
+		 *
+		 * - Handle the cases in which ControlLock gets released just like
+		 * InvalidateObsoleteReplicationSlots does.
+		 *
+		 * - Avoid saving slot info to disk two times for each invalidated
+		 * slot.
+		 *
+		 * XXX: Should we move inactive_timeout invalidation check closer to
+		 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+		 */
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT,
+										   0,
+										   InvalidOid,
+										   InvalidTransactionId);
+	}
 }
 
 /*
@@ -2201,6 +2329,7 @@ RestoreSlotFromDisk(const char *name)
 	bool		restored = false;
 	int			readBytes;
 	pg_crc32c	checksum;
+	TimestampTz now = 0;
 
 	/* no need to lock here, no concurrent access allowed yet */
 
@@ -2388,12 +2517,16 @@ RestoreSlotFromDisk(const char *name)
 		slot->in_use = true;
 		slot->active_pid = 0;
 
+		/* Use the same inactive_since time for all the slots. */
+		if (now == 0)
+			now = GetCurrentTimestamp();
+
 		/*
 		 * Set the time since the slot has become inactive after loading the
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
 		 * makes the slot active will reset it.
 		 */
-		slot->inactive_since = GetCurrentTimestamp();
+		ReplicationSlotSetInactiveSince(slot, &now, false);
 
 		restored = true;
 		break;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index c7bfbb15e0..b1b7b075bd 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -540,7 +540,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c5f1009f37..61a0e38715 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -844,7 +844,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1462,7 +1462,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..5e27cd3270 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3028,6 +3028,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time a replication slot can remain inactive before "
+						 "it will be invalidated."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&replication_slot_inactive_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..deca3a4aeb 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -335,6 +335,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#replication_slot_inactive_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 45582cf9d8..27c4f107e5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -56,6 +56,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -224,6 +226,24 @@ typedef struct ReplicationSlotCtlData
 	ReplicationSlot replication_slots[1];
 } ReplicationSlotCtlData;
 
+/*
+ * Set slot's inactive_since property unless it was previously invalidated due
+ * to inactive timeout.
+ */
+static inline void
+ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz *now,
+								bool acquire_lock)
+{
+	if (acquire_lock)
+		SpinLockAcquire(&s->mutex);
+
+	if (s->data.invalidated != RS_INVAL_INACTIVE_TIMEOUT)
+		s->inactive_since = *now;
+
+	if (acquire_lock)
+		SpinLockRelease(&s->mutex);
+}
+
 /*
  * Pointers to shared memory
  */
@@ -233,6 +253,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
+extern PGDLLIMPORT int replication_slot_inactive_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -249,7 +270,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 712924c2fa..c45a5106f4 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -52,6 +52,7 @@ tests += {
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
       't/043_wal_replay_wait.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..669d6ccc7a
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,321 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# =============================================================================
+# Testcase start
+#
+# Invalidate streaming standby slot and logical failover slot on primary due to
+# inactive timeout. Also, check logical failover slot synced to standby from
+# primary doesn't invalidate on its own, but gets the invalidated state from the
+# primary.
+
+# Initialize primary
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid unpredictability
+$primary->append_conf(
+	'postgresql.conf', qq{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb_slot1'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+# Create sync slot on primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('sync_slot1', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb_slot1', immediately_reserve := true);
+]);
+
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Sync primary slot to standby
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that logical failover slot is created on standby
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'sync_slot1' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot sync_slot1 has synced as true on standby');
+
+my $logstart = -s $primary->logfile;
+my $inactive_timeout = 1;
+
+# Set timeout so that next checkpoint will invalidate inactive slot
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '${inactive_timeout}s';
+]);
+$primary->reload;
+
+# Check for logical failover slot to become inactive on primary. Note that
+# nobody has acquired slot yet, so it must get invalidated due to
+# inactive timeout.
+check_for_slot_invalidation($primary, 'sync_slot1', $logstart,
+	$inactive_timeout);
+
+# Sync primary slot to standby. Note that primary slot has already been
+# invalidated due to inactive timeout. Standby must just sync inavalidated
+# state.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+$standby1->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sync_slot1' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for sync_slot1 invalidation to be synced on standby";
+
+# Make standby slot on primary inactive and check for invalidation
+$standby1->stop;
+check_for_slot_invalidation($primary, 'sb_slot1', $logstart,
+	$inactive_timeout);
+
+# Testcase end
+# =============================================================================
+
+# =============================================================================
+# Testcase start
+# Synced slot mustn't get invalidated on standby on its own due to inactive
+# timeout.
+
+# Disable inactive timeout on primary
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$primary->reload;
+
+# Create standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+$connstr = $primary->connstr;
+$standby2->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb_slot2'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+# Create sync slot on primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('sync_slot2', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb_slot2', immediately_reserve := true);
+]);
+
+$standby2->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+
+$standby2->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '${inactive_timeout}s';
+]);
+$standby2->reload;
+
+# Sync primary slot to standby
+$standby2->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that logical failover slot is created on standby
+is( $standby2->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'sync_slot2' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot sync_slot2 has synced as true on standby');
+
+$logstart = -s $standby2->logfile;
+
+# Give enough time
+sleep($inactive_timeout+1);
+
+# Despite inactive timeout being set, synced slot won't get invalidated on its
+# own on standby. So, we must not see invalidation message in server log.
+$standby2->safe_psql('postgres', "CHECKPOINT");
+ok( !$standby2->log_contains(
+		"invalidating obsolete replication slot \"sync_slot2\"",
+		$logstart),
+	'check that syned sync_slot2 has not been invalidated on the standby'
+);
+
+$standby2->stop;
+
+# Testcase end
+# =============================================================================
+
+# =============================================================================
+# Testcase start
+# Invalidate logical subscriber slot due to inactive timeout.
+
+my $publisher = $primary;
+
+# Prepare for test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$publisher->reload;
+
+# Create subscriber
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput');
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+my $result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO ' ${inactive_timeout}s';
+]);
+$publisher->reload;
+
+$logstart = -s $publisher->logfile;
+
+# Make subscriber slot on publisher inactive and check for invalidation
+$subscriber->stop;
+check_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end
+# =============================================================================
+
+# Check for slot to first become inactive and then get invalidated
+sub check_for_slot_invalidation
+{
+	my ($node, $slot, $offset, $inactive_timeout) = @_;
+	my $name = $node->name;
+
+	# Wait for slot to become inactive
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot' AND active = 'f' AND
+				  inactive_since IS NOT NULL;
+	])
+	  or die
+	  "Timed out while waiting for slot $slot to become inactive on node $name";
+
+	trigger_slot_invalidation($node, $slot, $offset, $inactive_timeout);
+
+	# Wait for invalidation reason to be set
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot' AND
+			invalidation_reason = 'inactive_timeout';
+	])
+	  or die
+	  "Timed out while waiting for invalidation reason of slot $slot to be set on node $name";
+
+	# Check that invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+
+	($result, $stdout, $stderr) = $node->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot', '0/1');
+	]);
+
+	ok( $stderr =~
+		  /can no longer get changes from replication slot "$slot"/,
+		"detected error upon trying to acquire invalidated slot $slot on node $name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot on node $name";
+}
+
+# Trigger slot invalidation and confirm it in server log
+sub trigger_slot_invalidation
+{
+	my ($node, $slot, $offset, $inactive_timeout) = @_;
+	my $name = $node->name;
+	my $invalidated = 0;
+
+	# Give enough time to avoid multiple checkpoints
+	sleep($inactive_timeout+1);
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot\"",
+				$offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated,
+		"check that slot $slot invalidation has been logged on node $name"
+	);
+}
+
+done_testing();
-- 
2.43.0

