From f2d31bdf99240288349a8a3acebc78391c8275f4 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v4] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 74 ++++++++++++++++++---
 1 file changed, 66 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..64c31cc2758 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -1762,9 +1762,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1772,9 +1776,15 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			elog(DEBUG1, "skipped loading publication: %s", pubname);
+			*skipped = true;
+		}
 	}
 
 	return result;
@@ -2026,6 +2036,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	bool		found;
 	MemoryContext oldctx;
 	Oid			relid = RelationGetRelid(relation);
+	bool		publications_updated = false;
 
 	Assert(RelationSyncCache != NULL);
 
@@ -2053,8 +2064,48 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * If the publication is invalid, check for updates.
+	 * This optimization ensures that the next block, which queries the system
+	 * tables and builds the relation entry, runs only if a new publication was
+	 * created.
+	 */
+	if (!publications_valid && data->publications)
+	{
+		bool		skipped_pub = false;
+		List	   *publications;
+
+		publications = LoadPublications(data->publication_names, &skipped_pub);
+
+		/* Check if any new publications have been created. */
+		foreach_ptr(Publication, pub1, publications)
+		{
+			bool match = false;
+			foreach_ptr(Publication, pub2, data->publications)
+			{
+				if (strcmp(pub1->name, pub2->name) == 0)
+				{
+					match = true;
+					break;
+				}
+			}
+
+			if (!match)
+			{
+				publications_updated = true;
+				break;
+			}
+		}
+
+		list_free(publications);
+	}
+
+	/*
+	 * Validate the entry only if the entry is not valid or in case a new
+	 * publication has been added.
+	 */
+	if (!entry->replicate_valid ||
+		(!publications_valid && publications_updated))
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2071,6 +2122,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2078,9 +2130,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.43.0

