From e5fd69c0ec80cde40e3111fe580b8cb43ffabcda Mon Sep 17 00:00:00 2001
From: "shiy.fnst" <shiy.fnst@fujitsu.com>
Date: Wed, 8 Jun 2022 11:10:21 +0800
Subject: [PATCH] Fix partition map cache issues.

1. Fix the bad structure in logicalrep_partmap_invalidate_cb().
2. Check whether the entry is valid in logicalrep_partition_open() and rebuild
   the entry if not.
3. Invalidate partition map cache when the publisher send new relation mapping.
4. Fix the column map build for dropped column in partition.

Author: Shi yu, Hou Zhijie
---
 src/backend/replication/logical/relation.c | 120 +++++++++++++++++++----------
 src/backend/replication/logical/worker.c   |   6 ++
 src/include/replication/logicalrelation.h  |   1 +
 src/test/subscription/t/013_partition.pl   |  71 ++++++++++++++++-
 4 files changed, 154 insertions(+), 44 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 80fb561..34c2f53 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -148,6 +148,30 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
 }
 
 /*
+ * Update remote relation information in the relation map entry.
+ */
+static void
+logicalrep_update_remoterel(LogicalRepRelMapEntry *entry,
+							LogicalRepRelation *remoterel)
+{
+	int			i;
+
+	entry->remoterel.remoteid = remoterel->remoteid;
+	entry->remoterel.nspname = pstrdup(remoterel->nspname);
+	entry->remoterel.relname = pstrdup(remoterel->relname);
+	entry->remoterel.natts = remoterel->natts;
+	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+	for (i = 0; i < remoterel->natts; i++)
+	{
+		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+	}
+	entry->remoterel.replident = remoterel->replident;
+	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+}
+
+/*
  * Add new entry or update existing entry in the relation map cache.
  *
  * Called when new relation mapping is sent by the publisher to update
@@ -159,7 +183,6 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 	MemoryContext oldctx;
 	LogicalRepRelMapEntry *entry;
 	bool		found;
-	int			i;
 
 	if (LogicalRepRelMap == NULL)
 		logicalrep_relmap_init();
@@ -177,19 +200,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 
 	/* Make cached copy of the data */
 	oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
-	entry->remoterel.remoteid = remoterel->remoteid;
-	entry->remoterel.nspname = pstrdup(remoterel->nspname);
-	entry->remoterel.relname = pstrdup(remoterel->relname);
-	entry->remoterel.natts = remoterel->natts;
-	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
-	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
-	for (i = 0; i < remoterel->natts; i++)
-	{
-		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
-		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
-	}
-	entry->remoterel.replident = remoterel->replident;
-	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+	logicalrep_update_remoterel(entry, remoterel);
 	MemoryContextSwitchTo(oldctx);
 }
 
@@ -451,7 +462,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
 static void
 logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 {
-	LogicalRepRelMapEntry *entry;
+	LogicalRepPartMapEntry *entry;
 
 	/* Just to be sure. */
 	if (LogicalRepPartMap == NULL)
@@ -464,11 +475,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 		hash_seq_init(&status, LogicalRepPartMap);
 
 		/* TODO, use inverse lookup hashtable? */
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
 		{
-			if (entry->localreloid == reloid)
+			if (entry->relmapentry.localreloid == reloid)
 			{
-				entry->localrelvalid = false;
+				entry->relmapentry.localrelvalid = false;
 				hash_seq_term(&status);
 				break;
 			}
@@ -481,8 +492,42 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 
 		hash_seq_init(&status, LogicalRepPartMap);
 
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
-			entry->localrelvalid = false;
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+			entry->relmapentry.localrelvalid = false;
+	}
+}
+
+/*
+ * Invalidate the entries in the partition map that refer to remoterel
+ *
+ * Called when new relation mapping is sent by the publisher to update our
+ * expected view of incoming data from said publisher.
+ *
+ * Note that we don't update the remoterel information in the entry here,
+ * we will update the information in logicalrep_partition_open to save
+ * unnecessary work.
+ */
+void
+logicalrep_partmap_invalidate(LogicalRepRelation *remoterel)
+{
+	HASH_SEQ_STATUS status;
+	LogicalRepPartMapEntry *part_entry;
+	LogicalRepRelMapEntry *entry;
+
+	if (LogicalRepPartMap == NULL)
+		return;
+
+	hash_seq_init(&status, LogicalRepPartMap);
+	while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+	{
+		entry = &part_entry->relmapentry;
+
+		if (entry->remoterel.remoteid != remoterel->remoteid)
+			continue;
+
+		logicalrep_relmap_free_entry(entry);
+
+		memset(entry, 0, sizeof(LogicalRepRelMapEntry));
 	}
 }
 
@@ -534,7 +579,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	Oid			partOid = RelationGetRelid(partrel);
 	AttrMap    *attrmap = root->attrmap;
 	bool		found;
-	int			i;
 	MemoryContext oldctx;
 
 	if (LogicalRepPartMap == NULL)
@@ -545,31 +589,23 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 														(void *) &partOid,
 														HASH_ENTER, &found);
 
-	if (found)
-		return &part_entry->relmapentry;
+	entry = &part_entry->relmapentry;
 
-	memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+	if (found && entry->localrelvalid)
+		return entry;
 
 	/* Switch to longer-lived context. */
 	oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
 
-	part_entry->partoid = partOid;
-
-	/* Remote relation is copied as-is from the root entry. */
-	entry = &part_entry->relmapentry;
-	entry->remoterel.remoteid = remoterel->remoteid;
-	entry->remoterel.nspname = pstrdup(remoterel->nspname);
-	entry->remoterel.relname = pstrdup(remoterel->relname);
-	entry->remoterel.natts = remoterel->natts;
-	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
-	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
-	for (i = 0; i < remoterel->natts; i++)
+	if (!found)
 	{
-		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
-		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+		memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+		part_entry->partoid = partOid;
 	}
-	entry->remoterel.replident = remoterel->replident;
-	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+
+	/* Remote relation is copied as-is from the root entry. */
+	if (!entry->remoterel.remoteid)
+		logicalrep_update_remoterel(entry, remoterel);
 
 	entry->localrel = partrel;
 	entry->localreloid = partOid;
@@ -594,7 +630,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 		{
 			AttrNumber	root_attno = map->attnums[attno];
 
-			entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+			/* 0 means it's a dropped attribute */
+			if (root_attno == 0)
+				entry->attrmap->attnums[attno] = -1;
+			else
+				entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
 		}
 	}
 	else
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc210a9..81ce2e9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1562,6 +1562,12 @@ apply_handle_relation(StringInfo s)
 
 	rel = logicalrep_read_rel(s);
 	logicalrep_relmap_update(rel);
+
+	/*
+	 * Also invalidate all entries in the partition map that refer to
+	 * remoterel.
+	 */
+	logicalrep_partmap_invalidate(rel);
 }
 
 /*
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7bf8cd2..71a1fa7 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry
 } LogicalRepRelMapEntry;
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
+extern void logicalrep_partmap_invalidate(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index e7f4a94..daff675 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -800,9 +800,72 @@ ok( $logfile =~
 	  qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
 	'delete target row is missing in tab2_1');
 
-# No need for this until more tests are added.
-# $node_subscriber1->append_conf('postgresql.conf',
-# 	"log_min_messages = warning");
-# $node_subscriber1->reload;
+$node_subscriber1->append_conf('postgresql.conf',
+	"log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Test the case that target table on subscriber is a partitioned table and
+# check that the changes are replicated correctly after changing the schema of
+# table on publisher and subcriber.
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int);
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
+
+$node_subscriber2->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
+	CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
+	ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
+
+$node_subscriber2->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Make partition map cache
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM tab5 ORDER BY 1");
+is($result, qq(2|1), 'updates of tab5 replicated correctly');
+
+# Change the column order of table on publisher
+$node_publisher->safe_psql(
+	'postgres', q{
+	ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
+	ALTER TABLE tab5 ADD COLUMN b INT;});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(2||1), 'updates of tab5 replicated correctly after altering table on publisher');
+
+# Change the column order of partition on subscriber
+$node_subscriber2->safe_psql(
+	'postgres', q{
+	ALTER TABLE tab5 DETACH PARTITION tab5_1;
+	ALTER TABLE tab5_1 DROP COLUMN b;
+	ALTER TABLE tab5_1 ADD COLUMN b int;
+	ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on subscriber');
 
 done_testing();
-- 
2.7.2.windows.1

