From ee20c06e7ecd7e7d1f9d182fc3abecd7d91a93e5 Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <ashu.coek88@gmail.com>
Date: Wed, 18 Mar 2026 10:14:18 +0000
Subject: [PATCH] Add FIRST N and ANY N syntax support to
 synchronized_standby_slots

Extend synchronized_standby_slots to support the same priority and quorum
syntax as synchronous_standby_names:

FIRST N (slot1, slot2, ...) - priority mode
ANY N (slot1, slot2, ...) - quorum mode

This allows logical replication availability to align with the commit
durability guarantees provided by synchronous replication when using
priority or quorum configurations.

Implementation details

The GUC parser reuses the existing syncrep_yyparse infrastructure to parse
FIRST N and ANY N syntax. The behavior differs slightly from
synchronous_standby_names in the following ways:

Plain list syntax (slot1, slot2)
In synchronized_standby_slots, this requires all listed slots to catch up.
In synchronous_standby_names, it is equivalent to FIRST 1.
This behavior is preserved for backward compatibility and because logical
replication has different availability semantics.

FIRST N (priority mode)

Selects the first N slots according to the configured priority
Skips unavailable slots (missing, invalidated, or inactive)
Waits for valid but lagging slots
ANY N (quorum mode)

Allows any N slots that have caught up
Skips problematic slots (unavailable or lagging)
Maximizes availability when N-of-M durability is sufficient
StandbySlotsHaveCaughtup() implements these semantics by distinguishing
between two slot states:

Unavailable slots: missing, logical, invalidated, or inactive
Lagging slots: valid and active but not yet caught up

A comprehensive test suite validates the behavior of all three modes,
including fail-fast behavior, skip semantics, and GUC validation.

Author: Satya Narlapuram <satyanarlapuram@gmail.com>
Author: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Ajin Cherian <itsajin@gmail.com>
---
 doc/src/sgml/config.sgml                      |  79 ++-
 src/backend/replication/slot.c                | 506 ++++++++++++++----
 .../053_synchronized_standby_slots_quorum.pl  | 340 ++++++++++++
 3 files changed, 798 insertions(+), 127 deletions(-)
 create mode 100644 src/test/recovery/t/053_synchronized_standby_slots_quorum.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8cdd826fbd3..a9829dd7b1d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4964,17 +4964,76 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </term>
       <listitem>
        <para>
-        A comma-separated list of streaming replication standby server slot names
-        that logical WAL sender processes will wait for. Logical WAL sender processes
-        will send decoded changes to plugins only after the specified replication
-        slots confirm receiving WAL. This guarantees that logical replication
+        Specifies the streaming replication standby slots that logical WAL
+        sender processes must wait on before delivering decoded changes. This
+        parameter uses the same syntax as
+        <xref linkend="guc-synchronous-standby-names"/>:
+<synopsis>
+[FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+<replaceable class="parameter">slot_name</replaceable> [, ...]
+</synopsis>
+        where <replaceable class="parameter">num_sync</replaceable> is
+        the number of physical replication slots that must confirm WAL
+        receipt before logical decoding proceeds,
+        and <replaceable class="parameter">slot_name</replaceable>
+        is the name of a physical replication slot.
+        <replaceable class="parameter">num_sync</replaceable>
+        must be an integer value greater than zero and must not exceed the
+        number of listed slots.
+       </para>
+       <para>
+        The keyword <literal>FIRST</literal>, coupled with
+        <replaceable class="parameter">num_sync</replaceable>, specifies
+        priority-based semantics. Logical decoding will wait for the first
+        <replaceable class="parameter">num_sync</replaceable> available
+        physical slots in priority order (the order they appear in the list).
+        If a slot is missing, logical, invalidated, or inactive, it will be
+        skipped. However, if a slot exists and is valid and active but has
+        not yet caught up, the system will wait for it rather than skipping
+        to lower-priority slots.
+       </para>
+       <para>
+        A plain comma-separated list without a keyword specifies that
+        <emphasis>all</emphasis> listed physical slots must confirm WAL
+        receipt. This differs from <xref linkend="guc-synchronous-standby-names"/>
+        where a simple list means <literal>FIRST 1</literal>. For
+        <varname>synchronized_standby_slots</varname>, requiring all slots
+        provides safer failover semantics by default.
+       </para>
+       <para>
+        The keyword <literal>ANY</literal>, coupled with
+        <replaceable class="parameter">num_sync</replaceable>, specifies
+        quorum-based semantics. Logical decoding proceeds once at least
+        <replaceable class="parameter">num_sync</replaceable> of the listed
+        slots have caught up. Missing, logical, invalidated, or inactive
+        slots are skipped when determining candidates, and lagging slots
+        simply do not count toward the required number until they catch up,
+        i.e., there is no priority ordering.
+        For example, a setting of <literal>ANY 1 (sb1_slot, sb2_slot)</literal>
+        will allow logical decoding to proceed as soon as either physical slot
+        has confirmed WAL receipt. This is useful in conjunction with
+        quorum-based synchronous replication
+        (<literal>synchronous_standby_names = 'ANY ...'</literal>), so that
+        logical decoding availability matches the commit durability guarantee.
+       </para>
+       <para>
+        <literal>FIRST</literal> and <literal>ANY</literal> are case-insensitive.
+        If these keywords are used as the name of a replication slot,
+        the <replaceable class="parameter">slot_name</replaceable> must
+        be double-quoted.
+       </para>
+       <para>
+        The use of <varname>synchronized_standby_slots</varname> guarantees
+        that logical replication
         failover slots do not consume changes until those changes are received
-        and flushed to corresponding physical standbys. If a
+        and flushed to the required physical standbys. If a
         logical replication connection is meant to switch to a physical standby
         after the standby is promoted, the physical replication slot for the
         standby should be listed here. Note that logical replication will not
-        proceed if the slots specified in the
-        <varname>synchronized_standby_slots</varname> do not exist or are invalidated.
+        proceed if the required number of physical slots specified in
+        <varname>synchronized_standby_slots</varname> do not exist or are
+        invalidated.
         Additionally, the replication management functions
         <link linkend="pg-replication-slot-advance">
         <function>pg_replication_slot_advance</function></link>,
@@ -4982,9 +5041,9 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         <function>pg_logical_slot_get_changes</function></link>, and
         <link linkend="pg-logical-slot-peek-changes">
         <function>pg_logical_slot_peek_changes</function></link>,
-        when used with logical failover slots, will block until all
-        physical slots specified in <varname>synchronized_standby_slots</varname> have
-        confirmed WAL receipt.
+        when used with logical failover slots, will block until the required
+        physical slots specified in <varname>synchronized_standby_slots</varname>
+        have confirmed WAL receipt.
        </para>
        <para>
         The standbys corresponding to the physical replication slots in
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a9092fc2382..026a6ce6a5a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -50,6 +50,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/syncrep.h"
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -90,11 +91,18 @@ typedef struct ReplicationSlotOnDisk
  * Note: this must be a flat representation that can be held in a single chunk
  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
  * synchronized_standby_slots GUC.
+ *
+ * The layout mirrors SyncRepConfigData so that the same quorum/priority
+ * semantics can be expressed.  The syncrep_method field uses the
+ * SYNC_REP_PRIORITY and SYNC_REP_QUORUM constants from syncrep.h.
  */
 typedef struct
 {
-	/* Number of slot names in the slot_names[] */
-	int			nslotnames;
+	int			config_size;	/* total size of this struct, in bytes */
+	int			num_sync;		/* number of slots that must confirm WAL
+								 * receipt before logical decoding proceeds */
+	uint8		syncrep_method;	/* SYNC_REP_PRIORITY or SYNC_REP_QUORUM */
+	int			nslotnames;		/* number of slot names that follow */
 
 	/*
 	 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
@@ -102,6 +110,28 @@ typedef struct
 	char		slot_names[FLEXIBLE_ARRAY_MEMBER];
 } SyncStandbySlotsConfigData;
 
+/*
+ * State of a replication slot specified in synchronized_standby_slots GUC.
+ */
+typedef enum
+{
+	SS_SLOT_NOT_FOUND,		/* slot does not exist */
+	SS_SLOT_LOGICAL,		/* slot is logical, not physical */
+	SS_SLOT_INVALIDATED,	/* slot has been invalidated */
+	SS_SLOT_INACTIVE,		/* slot is inactive (standby not connected) */
+	SS_SLOT_LAGGING			/* slot exists and is active but has not caught up */
+} SyncStandbySlotsState;
+
+/*
+ * Information about a synchronized standby slot's state.
+ */
+typedef struct
+{
+	const char *slot_name;		/* name of the slot */
+	SyncStandbySlotsState state;	/* state of the slot */
+	XLogRecPtr	restart_lsn;	/* current restart_lsn (valid for SS_SLOT_LAGGING) */
+} SyncStandbySlotsStateInfo;
+
 /*
  * Lookup table for slot invalidation causes.
  */
@@ -181,6 +211,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static bool IsPrioritySyncStandbySlotsSyntax(const char *value);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -2947,94 +2978,183 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
 }
 
 /*
- * A helper function to validate slots specified in GUC synchronized_standby_slots.
+ * Return true if value starts with explicit FIRST syntax:
  *
- * The rawname will be parsed, and the result will be saved into *elemlist.
+ *   FIRST N (...)
+ *
+ * This is used to distinguish explicit FIRST from simple list syntax whose
+ * first slot name may start with "first".
  */
 static bool
-validate_sync_standby_slots(char *rawname, List **elemlist)
+IsPrioritySyncStandbySlotsSyntax(const char *value)
 {
-	/* Verify syntax and parse string into a list of identifiers */
-	if (!SplitIdentifierString(rawname, ',', elemlist))
-	{
-		GUC_check_errdetail("List syntax is invalid.");
+	const char *p = value;
+
+	/* Skip leading whitespace */
+	while (*p && isspace((unsigned char) *p))
+		p++;
+
+	/* Must start with FIRST keyword */
+	if (pg_strncasecmp(p, "FIRST", 5) != 0)
 		return false;
-	}
 
-	/* Iterate the list to validate each slot name */
-	foreach_ptr(char, name, *elemlist)
-	{
-		int			err_code;
-		char	   *err_msg = NULL;
-		char	   *err_hint = NULL;
+	p += 5;
 
-		if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
-												 &err_msg, &err_hint))
-		{
-			GUC_check_errcode(err_code);
-			GUC_check_errdetail("%s", err_msg);
-			if (err_hint != NULL)
-				GUC_check_errhint("%s", err_hint);
-			return false;
-		}
-	}
+	/* FIRST must be followed by whitespace, then a positive integer */
+	if (!isspace((unsigned char) *p))
+		return false;
 
-	return true;
+	while (*p && isspace((unsigned char) *p))
+		p++;
+
+	if (!isdigit((unsigned char) *p))
+		return false;
+
+	while (*p && isdigit((unsigned char) *p))
+		p++;
+
+	/* Explicit FIRST syntax then requires a parenthesized member list */
+	while (*p && isspace((unsigned char) *p))
+		p++;
+
+	return (*p == '(');
 }
 
 /*
  * GUC check_hook for synchronized_standby_slots
+ *
+ * This reuses the syncrep_yyparse/syncrep_scanner infrastructure that is
+ * also used for synchronous_standby_names, so the same syntax is accepted:
+ *
+ *   slot1, slot2                   -- wait for ALL listed slots
+ *   ANY N (slot1, slot2, ...)      -- wait for any N-of-M (quorum)
+ *   FIRST N (slot1, slot2, ...)    -- wait for first N in priority order
+ *
+ * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC,
+ * unlike synchronous_standby_names where it means "FIRST 1".
+ *
+ * After parsing, we additionally validate every name as a legal replication
+ * slot name.
  */
 bool
 check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
 {
-	char	   *rawname;
-	char	   *ptr;
-	List	   *elemlist;
-	int			size;
-	bool		ok;
-	SyncStandbySlotsConfigData *config;
-
-	if ((*newval)[0] == '\0')
-		return true;
+	if (*newval != NULL && (*newval)[0] != '\0')
+	{
+		yyscan_t	scanner;
+		int			parse_rc;
+		SyncStandbySlotsConfigData *config;
+		const char *mname;
+
+		/* Result of parsing is returned in one of these two variables */
+		SyncRepConfigData *syncrep_parse_result = NULL;
+		char	   *syncrep_parse_error_msg = NULL;
+
+		/* Parse the synchronized standby slots configuration */
+		syncrep_scanner_init(*newval, &scanner);
+		parse_rc = syncrep_yyparse(&syncrep_parse_result,
+								   &syncrep_parse_error_msg,
+								   scanner);
+		syncrep_scanner_finish(scanner);
+
+		if (parse_rc != 0 || syncrep_parse_result == NULL)
+		{
+			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+			if (syncrep_parse_error_msg)
+				GUC_check_errdetail("%s", syncrep_parse_error_msg);
+			else
+				GUC_check_errdetail("\"%s\" parser failed.",
+									"synchronized_standby_slots");
+			return false;
+		}
+
+		if (syncrep_parse_result->num_sync <= 0)
+		{
+			GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero",
+							 syncrep_parse_result->num_sync);
+			return false;
+		}
 
-	/* Need a modifiable copy of the GUC string */
-	rawname = pstrdup(*newval);
+		/* Reject num_sync > nmembers — it can never be satisfied */
+		if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers)
+		{
+			GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of listed slots (%d)",
+							 syncrep_parse_result->num_sync,
+							 syncrep_parse_result->nmembers);
+			return false;
+		}
 
-	/* Now verify if the specified slots exist and have correct type */
-	ok = validate_sync_standby_slots(rawname, &elemlist);
+		/*
+		 * When using simple list syntax (e.g., "slot1, slot2"), the parser
+		 * returns num_sync=1 and SYNC_REP_PRIORITY. We interpret this as
+		 * "wait for ALL slots" by setting num_sync to nmembers.
+		 *
+		 * This differs from synchronous_standby_names where "standby1, standby2"
+		 * means "wait for first 1 standby". For synchronized_standby_slots,
+		 * requiring all slots provides safer failover semantics by default.
+		 */
+		if (syncrep_parse_result->num_sync == 1 &&
+			syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY &&
+			syncrep_parse_result->nmembers > 1)
+		{
+			/*
+			 * Distinguish a simple list that starts with "first" (for
+			 * example, "firstslot, secondslot") from explicit
+			 * "FIRST N (...)" syntax by checking whether the value starts
+			 * with the FIRST keyword (after leading whitespace).
+			 */
+			if (!IsPrioritySyncStandbySlotsSyntax(*newval))
+				syncrep_parse_result->num_sync = syncrep_parse_result->nmembers;
+		}
 
-	if (!ok || elemlist == NIL)
-	{
-		pfree(rawname);
-		list_free(elemlist);
-		return ok;
-	}
+		/* validate every member name as a slot name */
+		mname = syncrep_parse_result->member_names;
 
-	/* Compute the size required for the SyncStandbySlotsConfigData struct */
-	size = offsetof(SyncStandbySlotsConfigData, slot_names);
-	foreach_ptr(char, slot_name, elemlist)
-		size += strlen(slot_name) + 1;
+		for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+		{
+			int			err_code;
+			char	   *err_msg = NULL;
+			char	   *err_hint = NULL;
 
-	/* GUC extra value must be guc_malloc'd, not palloc'd */
-	config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
-	if (!config)
-		return false;
+			if (!ReplicationSlotValidateNameInternal(mname, false, &err_code,
+													 &err_msg, &err_hint))
+			{
+				GUC_check_errcode(err_code);
+				GUC_check_errdetail("%s", err_msg);
+				if (err_hint != NULL)
+					GUC_check_errhint("%s", err_hint);
+				return false;
+			}
 
-	/* Transform the data into SyncStandbySlotsConfigData */
-	config->nslotnames = list_length(elemlist);
+			mname += strlen(mname) + 1;
+		}
 
-	ptr = config->slot_names;
-	foreach_ptr(char, slot_name, elemlist)
-	{
-		strcpy(ptr, slot_name);
-		ptr += strlen(slot_name) + 1;
-	}
+		/*
+		 * Build SyncStandbySlotsConfigData from the parsed SyncRepConfigData.
+		 * Since the structures have identical layout, we can use the same
+		 * config_size.
+		 */
+		config = (SyncStandbySlotsConfigData *)
+			guc_malloc(LOG, syncrep_parse_result->config_size);
+		if (!config)
+			return false;
+
+		config->config_size = syncrep_parse_result->config_size;
+		config->num_sync = syncrep_parse_result->num_sync;
+		config->syncrep_method = syncrep_parse_result->syncrep_method;
+		config->nslotnames = syncrep_parse_result->nmembers;
 
-	*extra = config;
+		/* Copy all slot names in one operation */
+		memcpy(config->slot_names,
+			   syncrep_parse_result->member_names,
+			   syncrep_parse_result->config_size -
+			   offsetof(SyncRepConfigData, member_names));
+
+		*extra = config;
+	}
+	else
+		*extra = NULL;
 
-	pfree(rawname);
-	list_free(elemlist);
 	return true;
 }
 
@@ -3083,18 +3203,116 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
 }
 
 /*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Report problem states for synchronized standby slots that prevented the
+ * catch-up requirement from being met.
+ */
+static void
+ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states,
+								  int num_slot_states, int elevel,
+								  XLogRecPtr wait_for_lsn)
+{
+	for (int i = 0; i < num_slot_states; i++)
+	{
+		const char *slot_name = slot_states[i].slot_name;
+
+		switch (slot_states[i].state)
+		{
+			case SS_SLOT_NOT_FOUND:
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_LOGICAL:
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
+								  slot_name),
+						errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_INVALIDATED:
+				ereport(elevel,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_INACTIVE:
+				ereport(elevel,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_LAGGING:
+				ereport(DEBUG1,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("The slot's restart_lsn %X/%X is behind the required %X/%X.",
+								  LSN_FORMAT_ARGS(slot_states[i].restart_lsn),
+								  LSN_FORMAT_ARGS(wait_for_lsn)));
+				break;
+
+			default:
+				/* Should not happen */
+				Assert(false);
+				break;
+		}
+	}
+}
+
+/*
+ * Return true if the required standby slots have caught up to the given WAL
+ * location, false otherwise.
+ *
+ * The behavior depends on the synchronized_standby_slots configuration:
+ *
+ *   Simple list (e.g., "slot1, slot2"):
+ *     ALL slots must have caught up. Returns false otherwise.
  *
- * The elevel parameter specifies the error level used for logging messages
- * related to slots that do not exist, are invalidated, or are inactive.
+ *   FIRST N (e.g., "FIRST 2 (slot1, slot2, slot3)"):
+ *     Wait for the first N available slots in priority order. Skips over
+ *     missing/invalid/inactive/logical slots to find N available ones.
+ *     If a valid slot is
+ *     lagging, waits for it (does not skip to lower priority slots).
+ *
+ *   ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"):
+ *     Wait for any N available slots. Skips
+ *     missing/invalid/inactive/logical/lagging slots
+ *     to find N slots that have caught up.
+ *
+ * The elevel parameter specifies the error level used for reporting issues
+ * related to the slots specified in synchronized_standby_slots when the
+ * catch-up requirement is not met.
  */
 bool
 StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 {
 	const char *name;
 	int			caught_up_slot_num = 0;
+	int			required;
 	XLogRecPtr	min_restart_lsn = InvalidXLogRecPtr;
+	bool		wait_for_all;
+	SyncStandbySlotsStateInfo *slot_states;
+	int			num_slot_states = 0;
 
 	/*
 	 * Don't need to wait for the standbys to catch up if there is no value in
@@ -3118,12 +3336,42 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 		ss_oldest_flush_lsn >= wait_for_lsn)
 		return true;
 
+	/*
+	 * Determine how many slots are required and whether we're in "wait for
+	 * ALL" mode versus "wait for N-of-M" mode.
+	 *
+	 * wait_for_all = true means we need ALL slots to be ready (simple
+	 * list syntax like "slot1, slot2"). In this mode, we stop checking
+	 * on the first slot with a problem state or the first lagging slot.
+	 *
+	 * wait_for_all = false means we select N from M candidates (FIRST N or
+	 * ANY N syntax). In this mode, we can skip over slots with problem states.
+	 * In FIRST N mode, we only skip problem slots and wait for the lagging slot
+	 * with higher priority. In ANY N mode, we skip problem slots and lagging
+	 * slots to find any N that have caught up.
+	 */
+	required = synchronized_standby_slots_config->num_sync;
+	wait_for_all = (required == synchronized_standby_slots_config->nslotnames);
+
+	/*
+	 * Allocate array to track slot states. Size it to the total number of
+	 * configured slots since in the worst case all could have problem states.
+	 */
+	slot_states = (SyncStandbySlotsStateInfo *)
+		palloc(sizeof(SyncStandbySlotsStateInfo) * synchronized_standby_slots_config->nslotnames);
+
 	/*
 	 * To prevent concurrent slot dropping and creation while filtering the
 	 * slots, take the ReplicationSlotControlLock outside of the loop.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
+	/*
+	 * Iterate through configured slots, checking their state and tracking
+	 * how many have caught up. Problem states (missing, logical, invalidated,
+	 * inactive) are recorded for deferred reporting - messages are only
+	 * emitted if the catch-up requirement isn't met.
+	 */
 	name = synchronized_standby_slots_config->slot_names;
 	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
 	{
@@ -3134,35 +3382,28 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		slot = SearchNamedReplicationSlot(name, false);
 
-		/*
-		 * If a slot name provided in synchronized_standby_slots does not
-		 * exist, report a message and exit the loop.
-		 */
 		if (!slot)
 		{
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-							  name),
-					errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_NOT_FOUND;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
-		/* Same as above: if a slot is not physical, exit the loop. */
 		if (SlotIsLogical(slot))
 		{
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
-							  name),
-					errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_LOGICAL;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
 		SpinLockAcquire(&slot->mutex);
@@ -3173,33 +3414,45 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		if (invalidated)
 		{
-			/* Specified physical slot has been invalidated */
-			ereport(elevel,
-					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-							  name),
-					errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_INVALIDATED;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
+		}
+
+		if (inactive)
+		{
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_INACTIVE;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
 		if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
 		{
-			/* Log a message if no active_pid for this physical slot */
-			if (inactive)
-				ereport(elevel,
-						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
-							   name, "synchronized_standby_slots"),
-						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-								  name),
-						errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
-								name, "synchronized_standby_slots"));
+			/*
+			 * Slot exists, is valid, and has an active connection, but
+			 * hasn't caught up yet (it's lagging).
+			 * In ALL mode: must wait for it.
+			 * In FIRST N (priority) mode: must wait for this higher priority slot.
+			 * In ANY N (quorum) mode: can skip and use another slot.
+			 */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_LAGGING;
+			slot_states[num_slot_states].restart_lsn = restart_lsn;
+			num_slot_states++;
 
-			/* Continue if the current slot hasn't caught up. */
-			break;
+			if (wait_for_all || synchronized_standby_slots_config->syncrep_method == SYNC_REP_PRIORITY)
+				break;
+			goto next_slot;
 		}
 
 		Assert(restart_lsn >= wait_for_lsn);
@@ -3210,17 +3463,35 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		caught_up_slot_num++;
 
+		/*
+		 * Stop processing if the required number of slots have caught up.
+		 * In priority mode (FIRST N), this ensures we select the first N
+		 * slots in sequential order. In quorum mode (ANY N), we still
+		 * process in order for efficiency, stopping once we find any N.
+		 */
+		if (caught_up_slot_num >= required)
+			break;
+
+next_slot:
 		name += strlen(name) + 1;
 	}
 
 	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
-	 * Return false if not all the standbys have caught up to the specified
-	 * WAL location.
+	 * If the required number of slots have not caught up, report any
+	 * recorded problem states and return false.
+	 *
+	 * We only emit messages when the requirement is not met to avoid
+	 * misleading messages in quorum/priority mode where other slots may
+	 * have satisfied the condition despite some slots having issues.
 	 */
-	if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+	if (caught_up_slot_num < required)
+	{
+		ReportUnavailableSyncStandbySlots(slot_states, num_slot_states, elevel, wait_for_lsn);
+		pfree(slot_states);
 		return false;
+	}
 
 	/* The ss_oldest_flush_lsn must not retreat. */
 	Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
@@ -3228,6 +3499,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 	ss_oldest_flush_lsn = min_restart_lsn;
 
+	pfree(slot_states);
 	return true;
 }
 
diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
new file mode 100644
index 00000000000..a6cd9413e23
--- /dev/null
+++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
@@ -0,0 +1,340 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test synchronized_standby_slots with different syntax modes:
+# - Plain list (ALL mode): slot1, slot2
+# - ANY N (quorum mode): ANY N (slot1, slot2, ...)
+# - FIRST N (priority mode): FIRST N (slot1, slot2, ...)
+#
+# Setup: a 3-node cluster with one primary, two physical standbys, and a
+# logical decoding client using a failover-enabled slot.
+#
+#               | ----> standby1 (primary_slot_name = sb1_slot)
+# primary ------|
+#               | ----> standby2 (primary_slot_name = sb2_slot)
+#
+#   synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+#
+# Test scenarios:
+#
+# A) Plain list 'sb1_slot, sb2_slot' (ALL mode)
+#    - Works when all slots are available
+#    - Blocks immediately if ANY slot is unavailable
+#
+# B) ANY 1 (sb1_slot, sb2_slot) (quorum mode)
+#    - Proceeds when at least N slots have caught up
+#    - Skips missing/invalid/logical/inactive/lagging slots to find N available
+#
+# C) FIRST N (sb1_slot, sb2_slot) (priority mode)
+#    - Selects first N slots in priority order (list order)
+#    - Skips missing/invalid/logical/inactive slots but waits for valid lagging slots
+#    - FIRST 1 works with one slot down (unlike plain list)
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ---------------------------------------------------------------------------
+# 1.  Create a primary with logical replication level, autovacuum off
+# ---------------------------------------------------------------------------
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+$primary->append_conf(
+	'postgresql.conf', qq{
+autovacuum = off
+});
+$primary->start;
+
+# Physical replication slots for the two standbys
+$primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('sb1_slot');");
+$primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('sb2_slot');");
+
+# ---------------------------------------------------------------------------
+# 2.  Create standby1 and standby2 from a fresh backup
+# ---------------------------------------------------------------------------
+my $backup_name = 'base_backup';
+$primary->backup($backup_name);
+
+my $connstr = $primary->connstr;
+
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb2_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+$standby1->start;
+$standby2->start;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 3.  Create a logical failover slot on the primary
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);"
+);
+
+# ---------------------------------------------------------------------------
+# 4.  Configure quorum sync rep with ALL-mode synchronized_standby_slots
+# ---------------------------------------------------------------------------
+$primary->append_conf(
+	'postgresql.conf', qq{
+synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+synchronized_standby_slots = 'sb1_slot, sb2_slot'
+});
+$primary->reload;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 5.  Confirm that quorum sync rep is active for both standbys
+# ---------------------------------------------------------------------------
+is( $primary->safe_psql(
+		'postgres',
+		q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';}
+	),
+	'2',
+	'both standbys are in quorum sync state');
+
+##################################################
+# PART A: Plain list (ALL mode) blocks when any slot is unavailable
+##################################################
+
+$standby1->stop;
+
+# Commit succeeds since standby2 satisfies the quorum.
+my $emit_lsn = $primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');"
+);
+like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/,
+	'synchronous commit succeeds with quorum (standby2 alive)');
+
+$primary->wait_for_replay_catchup($standby2);
+
+my $log_offset = -s $primary->logfile;
+
+my $bg = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+	qr/decode_start/, q(
+   \echo decode_start
+   SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Wait for the primary to log a warning about sb1_slot not being active.
+$primary->wait_for_log(
+	qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+	$log_offset);
+
+pass('plain list (ALL mode): logical decoding blocked by unavailable sb1_slot');
+
+# Unblock by clearing synchronized_standby_slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change so the slot is clean for the next test.
+$primary->safe_psql('postgres',
+	q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART B: ANY mode (quorum) — logical decoding proceeds with N-of-M slots
+##################################################
+
+# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'ANY 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+# standby1 is still down; standby2 is up.
+
+# Emit another transactional message — commits via quorum.
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# In quorum mode, logical decoding should NOT block because sb2_slot has
+# caught up and 1-of-2 is sufficient.
+my $decoded = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%quorum_mode_works%';});
+is($decoded, '1',
+	'ANY mode: logical decoding proceeds with only sb2_slot caught up');
+
+##################################################
+# PART C: Re-check plain list (ALL mode) works when both standbys are up
+##################################################
+
+# Bring standby1 back.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Switch to plain list (ALL mode) with both slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'sb1_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_bc = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%both_caught_up%';});
+is($decoded_bc, '1',
+	'plain list: works when all standbys are up');
+
+##################################################
+# PART D: Verify FIRST N priority semantics
+##################################################
+
+# FIRST N should:
+# 1. Select first N slots in priority order (list order)
+# 2. Skip unavailable slots to find N available ones
+# 3. Wait for valid but lagging slots (not skip to lower priority)
+
+# Test FIRST 2 (sb1_slot, sb2_slot) with both up — should wait for both.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'FIRST 2 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'first_2_both_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_e2 = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%first_2_both_up%';});
+is($decoded_e2, '1',
+	'FIRST 2: decoding works when all required slots are up');
+
+# Test FIRST 1 (sb1_slot, sb2_slot) with sb1_slot unavailable.
+$standby1->stop;
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'FIRST 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'first_1_skip_unavailable');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# FIRST 1 should skip sb1_slot (unavailable) and use sb2_slot.
+my $decoded_e1 = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%first_1_skip_unavailable%';});
+is($decoded_e1, '1',
+	'FIRST 1: skips unavailable first slot, uses second slot');
+
+##################################################
+# PART E: Plain list with first-prefixed slot name still means ALL mode
+##################################################
+
+# Create a slot name starting with "first_" for parser disambiguation checks.
+$primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('first_slot');");
+
+# If simple-list syntax starts with a slot name like "first_slot", it must
+# still be treated as ALL mode (not as explicit FIRST N syntax).
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'first_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'first_prefix_all_mode_blocks');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+$log_offset = -s $primary->logfile;
+
+$bg = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+	qr/decode_start/, q(
+   \echo decode_start
+   SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Plain list must require all listed slots; first_slot is intentionally inactive.
+$primary->wait_for_log(
+	qr/replication slot \"first_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+	$log_offset);
+
+pass('plain list with first-prefixed slot name blocks in ALL mode');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+	q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART F: Verify GUC validation rejects bad values
+##################################################
+
+my ($result, $stdout, $stderr);
+
+# N exceeds number of listed slots
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';");
+like($stderr, qr/ERROR/,
+	'GUC rejects ANY N when N > number of listed slots');
+
+# Missing closing parenthesis
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';");
+like($stderr, qr/ERROR/,
+	'GUC rejects malformed ANY syntax');
+
+# Invalid slot name
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';");
+like($stderr, qr/ERROR/,
+	'GUC rejects invalid slot name in ANY syntax');
+
+# ---------------------------------------------------------------------------
+# Cleanup
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('logical_failover');");
+
+done_testing();
-- 
2.43.0

