From 4c9c11ee6e07c799ef5cd2845a248d8a70e8d09e Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 26 Mar 2026 14:26:52 +1100
Subject: [PATCH v12 2/3] Cache sequence information in the sequence sync
 worker.

Previously, the sequence sync worker would fetch sequence metadata from
the catalog each time it needed to synchronize sequences. This could be
inefficient when many sequences are involved, as the worker would need
to repeatedly open and scan pg_subscription_rel.

To improve this, introduce a cache for sequence information in the sequence sync
worker. The cache is populated on first use and kept across synchronization
cycles. It is invalidated when pg_subscription_rel is modified, ensuring that
changes to subscription relations are reflected promptly.
---
 .../replication/logical/sequencesync.c        | 93 +++++++++++++------
 1 file changed, 66 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index 18321d3b316..d12bd31f09d 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -99,6 +99,19 @@ typedef enum CopySeqResult
 
 static MemoryContext SequenceSyncContext = NULL;
 
+/*
+ * Cached list of sequence information (LogicalRepSequenceInfo) for the current
+ * subscription. The cache is invalidated when pg_subscription_rel is modified.
+ *
+ * Note: To avoid the cost of searching for a specific sequence on relcache
+ * invalidation, we do not invalidate the cache immediately when a sequence is
+ * altered (e.g., renamed or moved to another namespace). Instead, we validate
+ * the sequence name and namespace when next attempting to sync it, at which
+ * point we verify the local sequence state.
+ */
+static List *sequence_infos = NIL;
+static bool sequence_infos_valid = false;
+
 /*
  * Apply worker determines whether a sequence sync worker is needed.
  *
@@ -500,6 +513,9 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 
 #define MAX_SEQUENCES_SYNC_PER_BATCH 100
 
+	if (seqinfos == NIL)
+		return false;
+
 	while (cur_batch_base_index < n_seqinfos)
 	{
 		Oid			seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
@@ -725,24 +741,44 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 }
 
 /*
- * Identifies sequences that require synchronization and initiates the
- * synchronization process.
+ * Callback from syscache invalidation.
+ */
+static void
+invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid,
+								  uint32 hashvalue)
+{
+	sequence_infos_valid = false;
+}
+
+/*
+ * Get the list of sequence information for the current subscription.
  *
- * Returns true if sequences have been updated.
+ * Return cached sequence states if valid; otherwise fetches them from the
+ * catalog, caches the result, and return it.
  */
-static bool
-LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
+static List *
+fetch_sequence_infos(void)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	ScanKeyData skey[1];
 	SysScanDesc scan;
 	Oid			subid = MyLogicalRepWorker->subid;
-	bool		sequence_copied = false;
-	List	   *seqinfos = NIL;
-	MemoryContext oldctx;
+	List	   *tmp_seqinfos = NIL;
+
+	if (sequence_infos_valid)
+		return sequence_infos;
 
-	Assert(SequenceSyncContext);
+	/* Free the existing invalid cache entries */
+	foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+	{
+		pfree(seqinfo->nspname);
+		pfree(seqinfo->seqname);
+		pfree(seqinfo);
+	}
+
+	list_free(sequence_infos);
+	sequence_infos = NIL;
 
 	StartTransactionCommand();
 
@@ -763,6 +799,7 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 		LogicalRepSequenceInfo *seq;
 		Relation	sequence_rel;
 		char		relstate;
+		MemoryContext oldctx;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -785,17 +822,14 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 
 		Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
 
-		/*
-		 * Worker needs to process sequences across transaction boundary, so
-		 * allocate them under SequenceSyncContext.
-		 */
-		oldctx = MemoryContextSwitchTo(SequenceSyncContext);
+		/* Cache the information in a permanent memory context */
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 		seq = palloc0_object(LogicalRepSequenceInfo);
 		seq->localrelid = subrel->srrelid;
 		seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
 		seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
 		seq->relstate = relstate;
-		seqinfos = lappend(seqinfos, seq);
+		tmp_seqinfos = lappend(tmp_seqinfos, seq);
 		MemoryContextSwitchTo(oldctx);
 
 		table_close(sequence_rel, NoLock);
@@ -805,18 +839,12 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 	systable_endscan(scan);
 	table_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
-	/*
-	 * Exit early if no catalog entries found, likely due to concurrent drops.
-	 */
-	if (!seqinfos)
-		return false;
+	sequence_infos = tmp_seqinfos;
+	sequence_infos_valid = true;
 
-	/* Process sequences */
-	sequence_copied = copy_sequences(conn, seqinfos, update_lsn);
+	CommitTransactionCommand();
 
-	return sequence_copied;
+	return sequence_infos;
 }
 
 /*
@@ -831,6 +859,14 @@ start_sequence_sync(void)
 {
 	Assert(am_sequencesync_worker());
 
+	/*
+	 * Setup callback for syscache so that we know when something changes in
+	 * the subscription relation state.
+	 */
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+								  invalidate_syncing_sequence_infos,
+								  (Datum) 0);
+
 	PG_TRY();
 	{
 		char	   *err;
@@ -876,6 +912,7 @@ start_sequence_sync(void)
 			bool		sequence_copied = false;
 			MemoryContext oldctx;
 			bool		update_lsn;
+			List	   *seqinfos;
 			TimestampTz now = GetCurrentTimestamp();
 
 			CHECK_FOR_INTERRUPTS();
@@ -907,8 +944,10 @@ start_sequence_sync(void)
 			/*
 			 * Synchronize all sequences (both READY and INIT states).
 			 */
-			sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn,
-													  update_lsn);
+			seqinfos = fetch_sequence_infos();
+
+			sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos,
+											 update_lsn);
 
 			MemoryContextReset(SequenceSyncContext);
 			MemoryContextSwitchTo(oldctx);
-- 
2.47.3

