From 48e1c968d96096cb4f20604889738050e9e3c565 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 5 Jan 2026 09:58:23 -0800
Subject: [PATCH v1] pg_upgrade: Optimize replication slot caught-up check
 logic.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
---
 src/bin/pg_upgrade/info.c | 159 ++++++++++++++++++++++++++------------
 1 file changed, 111 insertions(+), 48 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 2de0ee4d030..568e81e7776 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -29,8 +29,8 @@ static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static char *get_old_cluster_logical_slot_infos_query(void);
 static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg);
+static void process_old_cluter_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg);
 
 
 /*
@@ -307,11 +307,77 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
 	if (cluster == &old_cluster &&
 		GET_MAJOR_VERSION(cluster->major_version) > 1600)
 	{
-		logical_slot_infos_query = get_old_cluster_logical_slot_infos_query();
+		const char *slot_info_query = "SELECT slot_name, plugin, two_phase, failover, "
+			"invalidation_reason IS NOT NULL as invalid "
+			"FROM pg_catalog.pg_replication_slots "
+			"WHERE slot_type = 'logical' AND "
+			"database = current_database() AND "
+			"temporary IS FALSE;";
+
+		/*
+		 * Fetch the logical replication slot information.
+		 *
+		 * The temporary slots are explicitly ignored while checking because
+		 * such slots cannot exist after the upgrade. During the upgrade,
+		 * clusters are started and stopped several times causing any
+		 * temporary slots to be removed.
+		 */
 		upgrade_task_add_step(task,
-							  logical_slot_infos_query,
+							  slot_info_query,
 							  process_old_cluster_logical_slot_infos,
 							  true, NULL);
+
+		if (!user_opts.live_check)
+		{
+			/*
+			 * Check whether slots have consumed all WAL records.
+			 *
+			 * The determination of whether a slot is caught up is performed
+			 * by an upgrade function. This considers the slot caught up if no
+			 * decodable changes are found. See
+			 * binary_upgrade_logical_slot_has_caught_up().
+			 *
+			 * Note that we cannot guarantee that the slot is caught up during
+			 * a live_check, as new WAL records could be generated
+			 * concurrently.
+			 *
+			 * We optimize this check to avoid reading the same WAL stream
+			 * multiple times: execute the caught-up check only for the slot
+			 * with the minimum confirmed_flush_lsn, and apply the same result
+			 * to all other slots in the same database. This way, we check at
+			 * most one logical slot per database.
+			 *
+			 * Note that we don't distinguish slots based on their output
+			 * plugin. If a plugin applies replication origin filters, we
+			 * might get a false positive (i.e., erroneously considering a
+			 * slot caught up). However, such cases are very rare, and the
+			 * impact of a false positive is minimal.
+			 */
+			const char *slot_caughtup_info_query =
+				"WITH check_caught_up AS ( "
+				"  SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name) AS caught_up "
+				"  FROM pg_replication_slots AS s "
+				"  WHERE database = current_database() AND "
+				"    slot_type = 'logical' AND "
+				"    temporary IS FALSE AND "
+				"    invalidation_reason IS NULL "
+				"  ORDER BY confirmed_flush_lsn ASC "
+				"  LIMIT 1"
+				") "
+				"SELECT slot_name, "
+				"  CASE WHEN invalidation_reason IS NULL THEN c.caught_up "
+				"    ELSE FALSE "
+				"  END as caught_up "
+				"FROM pg_replication_slots AS s, check_caught_up AS c "
+				"WHERE s.database = current_database() AND "
+				"  s.slot_type = 'logical' AND "
+				"  s.temporary IS FALSE";
+
+			upgrade_task_add_step(task,
+								  slot_caughtup_info_query,
+								  process_old_cluter_logical_slot_catchup_infos,
+								  true, NULL);
+		}
 	}
 
 	upgrade_task_run(task, cluster);
@@ -676,48 +742,7 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
 }
 
 /*
- * get_old_cluster_logical_slot_infos_query()
- *
- * Returns the query for retrieving the logical slot information for all the
- * logical replication slots in the database, for use by
- * get_db_rel_and_slot_infos()'s UpgradeTask.  The status of each logical slot
- * is checked in check_old_cluster_for_valid_slots().
- */
-static char *
-get_old_cluster_logical_slot_infos_query(void)
-{
-	/*
-	 * Fetch the logical replication slot information. The check whether the
-	 * slot is considered caught up is done by an upgrade function. This
-	 * regards the slot as caught up if we don't find any decodable changes.
-	 * See binary_upgrade_logical_slot_has_caught_up().
-	 *
-	 * Note that we can't ensure whether the slot is caught up during
-	 * live_check as the new WAL records could be generated.
-	 *
-	 * We intentionally skip checking the WALs for invalidated slots as the
-	 * corresponding WALs could have been removed for such slots.
-	 *
-	 * The temporary slots are explicitly ignored while checking because such
-	 * slots cannot exist after the upgrade. During the upgrade, clusters are
-	 * started and stopped several times causing any temporary slots to be
-	 * removed.
-	 */
-	return psprintf("SELECT slot_name, plugin, two_phase, failover, "
-					"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
-					"FROM pg_catalog.pg_replication_slots "
-					"WHERE slot_type = 'logical' AND "
-					"database = current_database() AND "
-					"temporary IS FALSE;",
-					user_opts.live_check ? "FALSE" :
-					"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
-					"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-					"END)");
-}
-
-/*
- * Callback function for processing results of the query returned by
- * get_old_cluster_logical_slot_infos_query(), which is used for
+ * Callback function for processing results of the query, which is used for
  * get_db_rel_and_slot_infos()'s UpgradeTask.  This function stores the logical
  * slot information for later use.
  */
@@ -736,7 +761,6 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
 		int			i_plugin;
 		int			i_twophase;
 		int			i_failover;
-		int			i_caught_up;
 		int			i_invalid;
 
 		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
@@ -745,7 +769,6 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
 		i_plugin = PQfnumber(res, "plugin");
 		i_twophase = PQfnumber(res, "two_phase");
 		i_failover = PQfnumber(res, "failover");
-		i_caught_up = PQfnumber(res, "caught_up");
 		i_invalid = PQfnumber(res, "invalid");
 
 		for (int slotnum = 0; slotnum < num_slots; slotnum++)
@@ -756,8 +779,14 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
 			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
 			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
 			curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
-			curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
 			curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+
+			/*
+			 * Set false by default. This field will be updated in a separate
+			 * task, process_old_cluter_logical_slot_catchup_infos, if we're
+			 * not doing live_check.
+			 */
+			curr->caught_up = false;
 		}
 	}
 
@@ -765,6 +794,40 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
 	dbinfo->slot_arr.nslots = num_slots;
 }
 
+/*
+ * Callback function for processing results of the query, which is used for
+ * get_db_rel_and_slot_infos()'s UpgradeTask. This function updates the caught_up
+ * field for each slot information.
+ */
+static void
+process_old_cluter_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+	int			num_slots = PQntuples(res);
+
+	AssertVariableIsOfType(&process_old_cluter_logical_slot_catchup_infos,
+						   UpgradeTaskProcessCB);
+	Assert(num_slots == dbinfo->slot_arr.nslots);
+
+	for (int i = 0; i < num_slots; i++)
+	{
+		char	   *slotname;
+		bool		caught_up;
+
+		slotname = PQgetvalue(res, i, PQfnumber(res, "slot_name"));
+		caught_up = (strcmp(PQgetvalue(res, i, PQfnumber(res, "caught_up")), "t") == 0);
+
+		for (int slotnum = 0; slotnum < dbinfo->slot_arr.nslots; slotnum++)
+		{
+			LogicalSlotInfo *s = &(dbinfo->slot_arr.slots[slotnum]);
+
+			if (strcmp(s->slotname, slotname) == 0)
+			{
+				s->caught_up = caught_up;
+				break;
+			}
+		}
+	}
+}
 
 /*
  * count_old_cluster_logical_slots()
-- 
2.47.3

