From 832496b6175514eb8a00203f376809f3688a26b5 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 26 Mar 2026 14:41:41 +1100
Subject: [PATCH v12 3/3] Synchronize sequences directly in REFRESH SEQUENCES
 command.

The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all
sequence states in pg_subscription_rel to INIT and relies on the sequence sync
worker to perform the actual synchronization and update states to READY.

With the recent change making the sequence sync worker long-lived, most
sequences are now synchronized in the background, reducing the need for
REFRESH SEQUENCES. However, the command remains necessary for sequences that
haven't been synchronized.

This commit enhances REFRESH SEQUENCES to synchronize sequences directly within
the command itself, eliminating the overhead of launching a worker and updating
catalog entries unnecessarily.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 src/backend/commands/subscriptioncmds.c       |  27 ++--
 .../replication/logical/sequencesync.c        | 148 +++++++++++++-----
 src/include/replication/logicalworker.h       |   5 +
 src/test/subscription/t/036_sequences.pl      |  49 ++++++
 5 files changed, 176 insertions(+), 58 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3865f617816..e3472cc952e 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1791,8 +1791,9 @@ Publications:
 
   <para>
    A <firstterm>sequence synchronization worker</firstterm> will be started
-   after executing any of the above subscriber commands. The worker will
-   remain running for the life of the subscription, periodically
+   after executing <command>CREATE SUBSCRIPTION</command> or
+   <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> command. The
+   worker will remain running for the life of the subscription, periodically
    synchronizing all published sequences.
   </para>
   <para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7375e214cb4..bf4fef91f56 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1288,25 +1288,20 @@ AlterSubscription_refresh_seq(Subscription *sub)
 
 	PG_TRY();
 	{
-		List	   *subrel_states;
-
 		check_publications_origin_sequences(wrconn, sub->publications, true,
 											sub->origin, NULL, 0, sub->name);
 
-		/* Get local sequence list. */
-		subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
-		foreach_ptr(SubscriptionRelState, subrel, subrel_states)
-		{
-			Oid			relid = subrel->relid;
-
-			UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
-									   InvalidXLogRecPtr, false);
-			ereport(DEBUG1,
-					errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
-									get_namespace_name(get_rel_namespace(relid)),
-									get_rel_name(relid),
-									sub->name));
-		}
+		/*
+		 * Stop the sequencesync worker to prevent concurrent updates. This
+		 * avoids a race condition where the sequence value could be updated
+		 * by this command and then immediately moved backward by a
+		 * concurrently running worker. Stopping the worker is safe even if it
+		 * attempts to restart, as it will wait on the subscription lock
+		 * already held by this ALTER SUBSCRIPTION command.
+		 */
+		logicalrep_worker_stop(WORKERTYPE_SEQUENCESYNC, sub->oid, InvalidOid);
+
+		AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner);
 	}
 	PG_FINALLY();
 	{
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index d12bd31f09d..bead305156d 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -217,7 +217,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf)
  */
 static void
 report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
-					   List *missing_seqs_idx, List *seqinfos)
+					   List *missing_seqs_idx, List *seqinfos, char *subname)
 {
 	StringInfo	seqstr;
 
@@ -263,7 +263,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
 	ereport(ERROR,
 			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 			errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
-				   MySubscription->name));
+				   subname));
 }
 
 /*
@@ -420,10 +420,9 @@ check_seq_privileges_and_drift(LogicalRepSequenceInfo *seqinfo,
  */
 static CopySeqResult
 copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
-			  bool update_lsn)
+			  Oid subid, bool run_as_owner, bool update_lsn)
 {
 	UserContext ucxt;
-	bool		run_as_owner = MySubscription->runasowner;
 	Oid			seqoid = seqinfo->localrelid;
 	CopySeqResult result;
 	bool		need_lsn_update = false;
@@ -467,8 +466,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
 		XLogRecPtr	local_page_lsn;
 
 		/* Get the local page LSN for comparison with the remote value */
-		(void) GetSubscriptionRelState(MySubscription->oid,
-									   RelationGetRelid(sequence_rel),
+		(void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel),
 									   &local_page_lsn);
 
 		need_lsn_update = (local_page_lsn != seqinfo->page_lsn);
@@ -480,7 +478,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
 	 * cycle (update_lsn is true).
 	 */
 	if (seqinfo->relstate == SUBREL_STATE_INIT || need_lsn_update)
-		UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+		UpdateSubscriptionRelState(subid, seqoid, SUBREL_STATE_READY,
 								   seqinfo->page_lsn, false);
 
 	return COPYSEQ_SUCCESS;
@@ -499,7 +497,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
  * Returns true/false if any sequences were actually copied.
  */
 static bool
-copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
+copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname,
+			   bool runasowner, bool update_lsn)
 {
 	int			cur_batch_base_index = 0;
 	int			n_seqinfos = list_length(seqinfos);
@@ -528,11 +527,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 		int			batch_no_drift = 0;
 		int			batch_missing_count;
 		Relation	sequence_rel = NULL;
+		bool		started_tx = false;
 
 		WalRcvExecResult *res;
 		TupleTableSlot *slot;
 
-		StartTransactionCommand();
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+			started_tx = true;
+		}
 
 		for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
 		{
@@ -624,14 +628,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 													&seqinfo, &seqidx, seqinfos);
 
 			if (sync_status == COPYSEQ_ALLOWED)
-				sync_status = copy_sequence(seqinfo, sequence_rel, update_lsn);
+				sync_status = copy_sequence(seqinfo, sequence_rel, subid,
+											runasowner, update_lsn);
 
 			switch (sync_status)
 			{
 				case COPYSEQ_SUCCESS:
 					elog(DEBUG1,
 						 "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
-						 seqinfo->nspname, seqinfo->seqname, MySubscription->name);
+						 seqinfo->nspname, seqinfo->seqname, subname);
 					batch_succeeded_count++;
 					sequence_copied = true;
 					break;
@@ -704,13 +709,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 
 		elog(DEBUG1,
 			 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift",
-			 MySubscription->name,
+			 subname,
 			 (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
 			 batch_size, batch_succeeded_count, batch_mismatched_count,
 			 batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift);
 
-		/* Commit this batch, and prepare for next batch */
-		CommitTransactionCommand();
+		/*
+		 * Commit this batch if started a transaction, and prepare for next
+		 * batch.
+		 */
+		if (started_tx)
+			CommitTransactionCommand();
 
 		if (batch_missing_count)
 		{
@@ -735,7 +744,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 
 	/* Report mismatches, permission issues, or missing sequences */
 	report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
-						   missing_seqs_idx, seqinfos);
+						   missing_seqs_idx, seqinfos, subname);
 
 	return sequence_copied;
 }
@@ -751,37 +760,28 @@ invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid,
 }
 
 /*
- * Get the list of sequence information for the current subscription.
+ * Get the list of sequence information for the given subscription from
+ * catalog.
  *
- * Return cached sequence states if valid; otherwise fetches them from the
- * catalog, caches the result, and return it.
+ * All entries in the returned list are allocated in the specified memory
+ * context.
  */
 static List *
-fetch_sequence_infos(void)
+fetch_sequences_from_catalog(Oid subid, MemoryContext ctx)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	ScanKeyData skey[1];
 	SysScanDesc scan;
-	Oid			subid = MyLogicalRepWorker->subid;
-	List	   *tmp_seqinfos = NIL;
+	List	   *seqinfos = NIL;
+	bool		started_tx = false;
 
-	if (sequence_infos_valid)
-		return sequence_infos;
-
-	/* Free the existing invalid cache entries */
-	foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+	if (!IsTransactionState())
 	{
-		pfree(seqinfo->nspname);
-		pfree(seqinfo->seqname);
-		pfree(seqinfo);
+		StartTransactionCommand();
+		started_tx = true;
 	}
 
-	list_free(sequence_infos);
-	sequence_infos = NIL;
-
-	StartTransactionCommand();
-
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
 	/* Scan for all sequences belonging to this subscription */
@@ -822,14 +822,14 @@ fetch_sequence_infos(void)
 
 		Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
 
-		/* Cache the information in a permanent memory context */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		/* Cache the information in the given memory context */
+		oldctx = MemoryContextSwitchTo(ctx);
 		seq = palloc0_object(LogicalRepSequenceInfo);
 		seq->localrelid = subrel->srrelid;
 		seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
 		seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
 		seq->relstate = relstate;
-		tmp_seqinfos = lappend(tmp_seqinfos, seq);
+		seqinfos = lappend(seqinfos, seq);
 		MemoryContextSwitchTo(oldctx);
 
 		table_close(sequence_rel, NoLock);
@@ -839,10 +839,38 @@ fetch_sequence_infos(void)
 	systable_endscan(scan);
 	table_close(rel, AccessShareLock);
 
-	sequence_infos = tmp_seqinfos;
-	sequence_infos_valid = true;
+	if (started_tx)
+		CommitTransactionCommand();
 
-	CommitTransactionCommand();
+	return seqinfos;
+}
+
+/*
+ * Get the list of sequence information for the current subscription.
+ *
+ * Return cached sequence states if valid; otherwise fetches them from the
+ * catalog, caches the result, and return it.
+ */
+static List *
+fetch_sequence_infos(void)
+{
+	if (sequence_infos_valid)
+		return sequence_infos;
+
+	/* Free the existing invalid cache entries */
+	foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+	{
+		pfree(seqinfo->nspname);
+		pfree(seqinfo->seqname);
+		pfree(seqinfo);
+	}
+
+	list_free(sequence_infos);
+	sequence_infos = NIL;
+
+	sequence_infos = fetch_sequences_from_catalog(MySubscription->oid,
+												  CacheMemoryContext);
+	sequence_infos_valid = true;
 
 	return sequence_infos;
 }
@@ -947,6 +975,9 @@ start_sequence_sync(void)
 			seqinfos = fetch_sequence_infos();
 
 			sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos,
+											 MySubscription->oid,
+											 MySubscription->name,
+											 MySubscription->runasowner,
 											 update_lsn);
 
 			MemoryContextReset(SequenceSyncContext);
@@ -1016,3 +1047,40 @@ SequenceSyncWorkerMain(Datum main_arg)
 
 	FinishSyncWorker();
 }
+
+/*
+ * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a
+ * subscription from outside the sequencesync worker.
+ */
+void
+AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname,
+					  bool runasowner)
+{
+	List *seqinfos;
+
+	Assert(!SequenceSyncContext);
+
+	/*
+	 * Fetch sequences directly from the catalog rather than using the
+	 * sequence cache, which is maintained only for the sequence sync
+	 * worker.
+	 */
+	seqinfos = fetch_sequences_from_catalog(subid, CurrentMemoryContext);
+
+	PG_TRY();
+	{
+		/*
+		 * Use the current memory context for synchronization. Since this should
+		 * be short-lived command context that will be cleaned up automatically,
+		 * we can simply assign it as the synchronization context.
+		 */
+		SequenceSyncContext = CurrentMemoryContext;
+
+		(void) copy_sequences(conn, seqinfos, subid, subname, runasowner, true);
+	}
+	PG_FINALLY();
+	{
+		SequenceSyncContext = NULL;
+	}
+	PG_END_TRY();
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 7d748a28da8..73afd7853d0 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
 
 #include <signal.h>
 
+#include "replication/walreceiver.h"
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
@@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
 
 extern void AtEOXact_LogicalRepWorkers(bool isCommit);
 
+extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid,
+								  char *subname, bool runasowner);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index af190713b2b..8d25ac40ce0 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -176,4 +176,53 @@ $node_subscriber->wait_for_log(
 	qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/,
 	$log_offset);
 
+##########
+# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online,
+# eliminating the need to launch a sequencesync worker.
+##########
+
+# Reduce max_logical_replication_workers to disallow sequence worker from running
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 0));
+$node_subscriber->restart;
+
+# Verify there is no logical replication apply worker running
+$result = $node_subscriber->safe_psql(
+	'postgres',
+	"SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'");
+
+is($result, '0', 'no logical replication worker is running');
+
+# Increment sequence on publisher
+$node_publisher->safe_psql('postgres',
+	qq(SELECT nextval('regress_s1');));
+
+# The command should fail due to missing sequence ('regress_s4')
+my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+like(
+	$stderr,
+	qr/WARNING:  missing sequence on publisher \("public.regress_s4"\)/,
+	"output the wanring for the missing sequence regress_s4");
+
+like(
+	$stderr,
+	qr/ERROR:  logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/,
+	"the command failed due to the missing sequence regress_s4");
+
+# Refresh the publication to remove the missing sequence
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;");
+
+# Sync the sequence regress_s1
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+# Get the current sequence value on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	qq(SELECT last_value FROM regress_s1;));
+
+is($result, '201', 'sequence regress_s1 is synced now');
+
 done_testing();
-- 
2.47.3

