From 8d0b799dbde9ef1b4d651c827759c56092ef4221 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Mon, 13 Jun 2022 14:06:05 +0800
Subject: [PATCH] Fix partition map cache invalidation on subscriber

There are serval issues on subscriber's code which prevent the partition
map cache from being properly rebuilt.

When invalidating the entry in callback function
logicalrep_partmap_invalidate_cb(), we cast the hash entry to wrong type.
Fix it by using the correct entry type.

Besides, we lack the valid check in the beginning of
logicalrep_partition_open(). Add this check and rebuild the entry if it's
no longer valid.

In addition, when building partition's column map, we missed the check for
dropped column which cause cache lookup error. Fix this by ignoring the
dropped column.

---
 src/backend/replication/logical/relation.c | 62 ++++++++++++++++++------------
 src/test/subscription/t/013_partition.pl   | 57 +++++++++++++++++++++++++--
 2 files changed, 90 insertions(+), 29 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 80fb561..c1de920 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -451,7 +451,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
 static void
 logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 {
-	LogicalRepRelMapEntry *entry;
+	LogicalRepPartMapEntry *entry;
 
 	/* Just to be sure. */
 	if (LogicalRepPartMap == NULL)
@@ -464,11 +464,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 		hash_seq_init(&status, LogicalRepPartMap);
 
 		/* TODO, use inverse lookup hashtable? */
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
 		{
-			if (entry->localreloid == reloid)
+			if (entry->relmapentry.localreloid == reloid)
 			{
-				entry->localrelvalid = false;
+				entry->relmapentry.localrelvalid = false;
 				hash_seq_term(&status);
 				break;
 			}
@@ -481,8 +481,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 
 		hash_seq_init(&status, LogicalRepPartMap);
 
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
-			entry->localrelvalid = false;
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+			entry->relmapentry.localrelvalid = false;
 	}
 }
 
@@ -534,7 +534,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	Oid			partOid = RelationGetRelid(partrel);
 	AttrMap    *attrmap = root->attrmap;
 	bool		found;
-	int			i;
 	MemoryContext oldctx;
 
 	if (LogicalRepPartMap == NULL)
@@ -545,31 +544,40 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 														(void *) &partOid,
 														HASH_ENTER, &found);
 
-	if (found)
-		return &part_entry->relmapentry;
+	entry = &part_entry->relmapentry;
 
-	memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+	if (found && entry->localrelvalid)
+		return entry;
 
 	/* Switch to longer-lived context. */
 	oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
 
-	part_entry->partoid = partOid;
+	if (!found)
+	{
+		memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+		part_entry->partoid = partOid;
+	}
 
-	/* Remote relation is copied as-is from the root entry. */
-	entry = &part_entry->relmapentry;
-	entry->remoterel.remoteid = remoterel->remoteid;
-	entry->remoterel.nspname = pstrdup(remoterel->nspname);
-	entry->remoterel.relname = pstrdup(remoterel->relname);
-	entry->remoterel.natts = remoterel->natts;
-	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
-	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
-	for (i = 0; i < remoterel->natts; i++)
+	if (!entry->remoterel.remoteid)
 	{
-		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
-		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+		int	i;
+
+		/* Remote relation is copied as-is from the root entry. */
+		entry = &part_entry->relmapentry;
+		entry->remoterel.remoteid = remoterel->remoteid;
+		entry->remoterel.nspname = pstrdup(remoterel->nspname);
+		entry->remoterel.relname = pstrdup(remoterel->relname);
+		entry->remoterel.natts = remoterel->natts;
+		entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+		entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+		for (i = 0; i < remoterel->natts; i++)
+		{
+			entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+			entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+		}
+		entry->remoterel.replident = remoterel->replident;
+		entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
 	}
-	entry->remoterel.replident = remoterel->replident;
-	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
 
 	entry->localrel = partrel;
 	entry->localreloid = partOid;
@@ -594,7 +602,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 		{
 			AttrNumber	root_attno = map->attnums[attno];
 
-			entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+			/* 0 means it's a dropped attribute */
+			if (root_attno == 0)
+				entry->attrmap->attnums[attno] = -1;
+			else
+				entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
 		}
 	}
 	else
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index e7f4a94..85dd7f5 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -800,9 +800,58 @@ ok( $logfile =~
 	  qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
 	'delete target row is missing in tab2_1');
 
-# No need for this until more tests are added.
-# $node_subscriber1->append_conf('postgresql.conf',
-# 	"log_min_messages = warning");
-# $node_subscriber1->reload;
+$node_subscriber1->append_conf('postgresql.conf',
+	"log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Test the case that target table on subscriber is a partitioned table and
+# check that the changes are replicated correctly after changing the schema of
+# table on subcriber.
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int);
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
+
+$node_subscriber2->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
+	CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
+	ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
+
+$node_subscriber2->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Make partition map cache
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM tab5 ORDER BY 1");
+is($result, qq(2|1), 'updates of tab5 replicated correctly');
+
+# Change the column order of partition on subscriber
+$node_subscriber2->safe_psql(
+	'postgres', q{
+	ALTER TABLE tab5 DETACH PARTITION tab5_1;
+	ALTER TABLE tab5_1 DROP COLUMN b;
+	ALTER TABLE tab5_1 ADD COLUMN b int;
+	ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
 
 done_testing();
-- 
2.7.2.windows.1

