From 49a9d1627098440a32c75727860551db3c20b733 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Wed, 9 Mar 2022 18:10:56 +0100
Subject: [PATCH 1/3] fixup: publish_as_relid

Make sure to determine the top-most ancestor listed in any publication.
Otherwise we might end up with different values depending on the order
of publications (as listed in subscription).
---
 src/backend/catalog/pg_publication.c        | 21 +++++++++-
 src/backend/commands/publicationcmds.c      |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 43 +++++++++++++++++++--
 src/include/catalog/pg_publication.h        |  3 +-
 4 files changed, 62 insertions(+), 7 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 25998fbb39b..789b895db89 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -277,16 +277,21 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
 
 /*
  * Returns the relid of the topmost ancestor that is published via this
- * publication if any, otherwise returns InvalidOid.
+ * publication if any and set its ancestor level to ancestor_level,
+ * otherwise returns InvalidOid.
+ *
+ * The ancestor_level value allows us to compare the results for multiple
+ * publications, and decide which value is higher up.
  *
  * Note that the list of ancestors should be ordered such that the topmost
  * ancestor is at the end of the list.
  */
 Oid
-GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
+GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
 {
 	ListCell   *lc;
 	Oid			topmost_relid = InvalidOid;
+	int			level = 0;
 
 	/*
 	 * Find the "topmost" ancestor that is in this publication.
@@ -297,13 +302,25 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
 		List	   *apubids = GetRelationPublications(ancestor);
 		List	   *aschemaPubids = NIL;
 
+		level++;
+
 		if (list_member_oid(apubids, puboid))
+		{
 			topmost_relid = ancestor;
+
+			if (ancestor_level)
+				*ancestor_level = level;
+		}
 		else
 		{
 			aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
 			if (list_member_oid(aschemaPubids, puboid))
+			{
 				topmost_relid = ancestor;
+
+				if (ancestor_level)
+					*ancestor_level = level;
+			}
 		}
 
 		list_free(apubids);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 16b8661a1b7..a7b74dc60ad 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -323,7 +323,7 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
 	 */
 	if (pubviaroot && relation->rd_rel->relispartition)
 	{
-		publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
+		publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
 
 		if (!OidIsValid(publish_as_relid))
 			publish_as_relid = relid;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f0..104432fb3a6 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1748,6 +1748,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		List	   *schemaPubids = GetSchemaPublications(schemaId);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
+		int			publish_ancestor_level = 0;
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
@@ -1815,11 +1816,28 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
+			/*
+			 * Under what relid should we publish changes in this publication?
+			 * We'll use the top-most relid across all publications. Also track
+			 * the ancestor level for this publication.
+			 */
+			Oid			pub_relid = relid;
+			int			ancestor_level = 0;
+
+			/*
+			 * If this is a FOR ALL TABLES publication, pick the partition root
+			 * and set the ancestor level accordingly.
+			 */
 			if (pub->alltables)
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
-					publish_as_relid = llast_oid(get_partition_ancestors(relid));
+				{
+					List	   *ancestors = get_partition_ancestors(relid);
+
+					pub_relid = llast_oid(ancestors);
+					ancestor_level = list_length(ancestors);
+				}
 			}
 
 			if (!publish)
@@ -1835,16 +1853,21 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				if (am_partition)
 				{
 					Oid			ancestor;
+					int			level;
 					List	   *ancestors = get_partition_ancestors(relid);
 
 					ancestor = GetTopMostAncestorInPublication(pub->oid,
-															   ancestors);
+															   ancestors,
+															   &level);
 
 					if (ancestor != InvalidOid)
 					{
 						ancestor_published = true;
 						if (pub->pubviaroot)
-							publish_as_relid = ancestor;
+						{
+							pub_relid = ancestor;
+							ancestor_level = level;
+						}
 					}
 				}
 
@@ -1868,6 +1891,20 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 
 				rel_publications = lappend(rel_publications, pub);
+
+				/*
+				 * We want to publish the changes as the top-most ancestor
+				 * across all publications. So we need to check if the
+				 * already calculated level is higher than the new one. If
+				 * yes, we can ignore the new value (as it's a child).
+				 * Otherwise the new value is an ancestor, so we keep it.
+				 */
+				if (publish_ancestor_level > ancestor_level)
+					continue;
+
+				/* The new value is an ancestor, so let's keep it. */
+				publish_as_relid = pub_relid;
+				publish_ancestor_level = ancestor_level;
 			}
 		}
 
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index ba72e62e614..fe773cf9b7d 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -134,7 +134,8 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid,
 extern List *GetPubPartitionOptionRelations(List *result,
 											PublicationPartOpt pub_partopt,
 											Oid relid);
-extern Oid	GetTopMostAncestorInPublication(Oid puboid, List *ancestors);
+extern Oid	GetTopMostAncestorInPublication(Oid puboid, List *ancestors,
+											int *ancestor_level);
 
 extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
-- 
2.34.1

