From 4e8fbce7accd6872c09ff0da02bfd0d7ae51b1a4 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Thu, 19 Mar 2026 17:34:55 +0900
Subject: [PATCH v2 1/2] Fix slotsync worker busy loop causing repeated logical
 decoding logs.

Previously, the slotsync worker could enter a busy loop and emit four logical
log messages every 200 ms, even when both the primary and standby were idle.

This happened because the worker incorrectly treated certain cases as
successful slot updates, causing it to use the minimum sleep interval and
repeatedly restart slot syncing.

This commit fixes this by ensuring the worker does not treat such cases as
updates, allowing it to sleep normally and avoid excessive log output.
---
 src/backend/replication/logical/logical.c  | 32 ++++++++++++++++++----
 src/backend/replication/logical/slotsync.c |  7 +++--
 src/backend/replication/slotfuncs.c        |  2 +-
 src/include/replication/logical.h          |  5 ++--
 4 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 603a2b94d05..9a3b0020040 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1806,18 +1806,22 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 
 /*
  * Handle a consumer's confirmation having received all changes up to lsn.
+ *
+ * Return TRUE if the local slot is updated.
  */
-void
+bool
 LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
+	bool		updated_confirmed = false;
+	bool		updated_xmin = false;
+	bool		updated_restart = false;
+
 	Assert(XLogRecPtrIsValid(lsn));
 
 	/* Do an unlocked check for candidate_lsn first. */
 	if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) ||
 		XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid))
 	{
-		bool		updated_xmin = false;
-		bool		updated_restart = false;
 		XLogRecPtr	restart_lsn pg_attribute_unused();
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
@@ -1837,7 +1841,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		 * LSN. See similar case handling in CreateDecodingContext.
 		 */
 		if (lsn > MyReplicationSlot->data.confirmed_flush)
+		{
 			MyReplicationSlot->data.confirmed_flush = lsn;
+			updated_confirmed = true;
+		}
 
 		/* if we're past the location required for bumping xmin, do so */
 		if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) &&
@@ -1920,10 +1927,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		 * for the details.
 		 */
 		if (lsn > MyReplicationSlot->data.confirmed_flush)
+		{
 			MyReplicationSlot->data.confirmed_flush = lsn;
+			updated_confirmed = true;
+		}
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
+
+	return updated_confirmed || updated_xmin || updated_restart;
 }
 
 /*
@@ -2089,10 +2101,14 @@ LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal,
  *
  * *found_consistent_snapshot will be true if the initial decoding snapshot has
  * been built; Otherwise, it will be false.
+ *
+ * *updated_xmin_or_lsn will be true if the local slot is updated; Otherwise,
+ * it will be false.
  */
 XLogRecPtr
 LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
-									bool *found_consistent_snapshot)
+									bool *found_consistent_snapshot,
+									bool *updated_xmin_or_lsn)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
@@ -2103,6 +2119,9 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
 	if (found_consistent_snapshot)
 		*found_consistent_snapshot = false;
 
+	if (updated_xmin_or_lsn)
+		*updated_xmin_or_lsn = false;
+
 	PG_TRY();
 	{
 		/*
@@ -2174,7 +2193,10 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
 
 		if (XLogRecPtrIsValid(ctx->reader->EndRecPtr))
 		{
-			LogicalConfirmReceivedLocation(moveto);
+			bool		slot_updated = LogicalConfirmReceivedLocation(moveto);
+
+			if (updated_xmin_or_lsn)
+				*updated_xmin_or_lsn = slot_updated;
 
 			/*
 			 * If only the confirmed_flush LSN has changed the slot won't get
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index e75db69e3f6..3e70cfb0a3e 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -315,13 +315,16 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 			slot->data.catalog_xmin = remote_slot->catalog_xmin;
 			SpinLockRelease(&slot->mutex);
+
+			updated_xmin_or_lsn = true;
 		}
 		else
 		{
 			bool		found_consistent_snapshot;
 
 			LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
-												&found_consistent_snapshot);
+												&found_consistent_snapshot,
+												&updated_xmin_or_lsn);
 
 			/* Sanity check */
 			if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
@@ -349,8 +352,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 				skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
 			}
 		}
-
-		updated_xmin_or_lsn = true;
 	}
 
 	/* Update slot sync skip stats */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9f5e4f998fe..b6846bb313e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -524,7 +524,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
-	return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
+	return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL, NULL);
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index bc9d4ece672..0208f9ca6a2 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -140,7 +140,7 @@ extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn,
 									   TransactionId xmin);
 extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
-extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
@@ -151,6 +151,7 @@ extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 extern XLogRecPtr LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal,
 														XLogRecPtr scan_cutoff_lsn);
 extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
-													  bool *found_consistent_snapshot);
+													  bool *found_consistent_snapshot,
+													  bool *updated_xmin_or_lsn);
 
 #endif
-- 
2.51.2

