From 1f154f4d84a413a6fa490b28c6f2a67a8a697647 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Thu, 1 Aug 2024 12:16:24 +0530
Subject: [PATCH v7] Distribute invalidatons if change in catalog tables

Distribute invalidations to inprogress transactions if the current
committed transaction change any catalog table.
---
 .../replication/logical/reorderbuffer.c       |  36 +++++
 src/backend/replication/logical/snapbuild.c   |  26 +++-
 src/include/replication/reorderbuffer.h       |   7 +
 src/test/subscription/t/100_bugs.pl           | 131 ++++++++++++++++++
 4 files changed, 197 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 00a8327e77..28694370ea 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5308,3 +5308,39 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Get a list of invalidation messages in current committed transaction
+ */
+List *
+GetInvalidationMsg(ReorderBuffer *rb, XLogRecPtr lsn, TransactionId xid)
+{
+	List	   *invalmsgs = NIL;
+	ReorderBufferTXN *txn;
+	ReorderBufferIterTXNState *volatile iterstate = NULL;
+	ReorderBufferChange *change;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+	ReorderBufferIterTXNInit(rb, txn, &iterstate);
+
+	while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+	{
+		if (change->action == REORDER_BUFFER_CHANGE_INVALIDATION)
+		{
+			InvalidationMsg *invalmsg = (InvalidationMsg *) palloc(sizeof(InvalidationMsg));
+
+			invalmsg->nmsgs = change->data.inval.ninvalidations;
+			invalmsg->msgs = (SharedInvalidationMessage *) palloc(sizeof(SharedInvalidationMessage) * invalmsg->nmsgs);
+			memcpy(invalmsg->msgs, change->data.inval.invalidations, sizeof(SharedInvalidationMessage) * invalmsg->nmsgs);
+
+			invalmsgs = lappend(invalmsgs, invalmsg);
+		}
+	}
+
+	/* clean up the iterator */
+
+	ReorderBufferIterTXNFinish(rb, iterstate);
+	iterstate = NULL;
+
+	return invalmsgs;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ae676145e6..d79b380699 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -300,7 +300,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, List *invalmsgs);
 
 static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
 												 uint32 xinfo);
@@ -867,7 +867,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
  * contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, List *invalidmsgs)
 {
 	dlist_iter	txn_i;
 	ReorderBufferTXN *txn;
@@ -913,6 +913,19 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		SnapBuildSnapIncRefcount(builder->snapshot);
 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
 								 builder->snapshot);
+
+		/*
+		 * Add invalidation messages to the reorder buffer of inprogress
+		 * transactions except the current committed transaction
+		 */
+		if (txn->xid != xid)
+		{
+			foreach_ptr(InvalidationMsg, invalmsg, invalidmsgs)
+			{
+				ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+											  invalmsg->nmsgs, invalmsg->msgs);
+			}
+		}
 	}
 }
 
@@ -1156,6 +1169,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	/* if there's any reason to build a historic snapshot, do so now */
 	if (needs_snapshot)
 	{
+		List	   *invalmsgs;
+
 		/*
 		 * If we haven't built a complete snapshot yet there's no need to hand
 		 * it out, it wouldn't (and couldn't) be used anyway.
@@ -1184,8 +1199,13 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
+		/* get invalidation messages from reorder buffer */
+		invalmsgs = GetInvalidationMsg(builder->reorder, lsn, xid);
+
 		/* add a new catalog snapshot to all currently running transactions */
-		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+		SnapBuildDistributeNewCatalogSnapshot(builder, lsn, xid, invalmsgs);
+
+		list_free_deep(invalmsgs);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 851a001c8b..7e2d5d9661 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -664,6 +664,11 @@ struct ReorderBuffer
 	int64		totalBytes;		/* total amount of data decoded */
 };
 
+typedef struct InvalidationMsg
+{
+	uint32		nmsgs;
+	SharedInvalidationMessage *msgs;
+}			InvalidationMsg;
 
 extern ReorderBuffer *ReorderBufferAllocate(void);
 extern void ReorderBufferFree(ReorderBuffer *rb);
@@ -740,4 +745,6 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+extern List *GetInvalidationMsg(ReorderBuffer *rb, XLogRecPtr lsn, TransactionId xid);
+
 #endif
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index cb36ca7b16..82497c9d11 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -487,6 +487,137 @@ $result =
 is( $result, qq(2|f
 3|t), 'check replicated update on subscriber');
 
+# Clean up
+$node_publisher->safe_psql('postgres', qq(DROP  PUBLICATION pub1;));
+$node_subscriber->safe_psql('postgres', qq(DROP  SUBSCRIPTION sub1;));
+
+# The bug was that the incremental data synchronization was being skipped when
+# a new table is added to the publication in presence of a concurrent active
+# transaction performing the DML on the same table.
+
+# Initial setup.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE SCHEMA sch4;
+	CREATE TABLE sch4.tab_conc(a int);
+	CREATE PUBLICATION regress_pub1;
+));
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE SCHEMA sch4;
+	CREATE TABLE sch4.tab_conc(a int);
+	CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1;
+));
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Initiate 3 background sessions.
+my $background_psql1 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+$background_psql1->set_query_timer_restart();
+
+my $background_psql2 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+$background_psql2->set_query_timer_restart();
+
+my $background_psql3 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+$background_psql3->set_query_timer_restart();
+
+# Maintain an active transaction with the table that will be added to the
+# publication.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (1);
+]);
+
+# Maintain an active transaction with a schema table that will be added to the
+# publication.
+$background_psql2->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (1);
+]);
+
+# Add the table to the publication using background_psql, as the alter
+# publication operation will wait for the lock and can only be completed after
+$background_psql3->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc, TABLES IN SCHEMA sch4, sch3;\n"
+);
+
+# Complete the transaction on the tables, so that ALTER PUBLICATION can proceed
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql2->query_safe(qq[COMMIT]);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (2);
+	INSERT INTO sch3.tab_conc VALUES (2);
+));
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the sch3.tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+# Perform an insert.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (3);
+	INSERT INTO sch3.tab_conc VALUES (3);
+));
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$background_psql1->quit;
+$background_psql2->quit;
+$background_psql3->quit;
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
-- 
2.34.1

