From 0f066cefc8520af93ece7ea10618d037005af5dc Mon Sep 17 00:00:00 2001
From: "shiy.fnst" <shiy.fnst@fujitsu.com>
Date: Wed, 8 Jun 2022 11:10:21 +0800
Subject: [PATCH v2 1/2] Fix partition map cache issues.

1. Fix the bad structure in logicalrep_partmap_invalidate_cb().
2. Check whether the entry is valid in logicalrep_partition_open().
3. Update partition map cache when the publisher send new relation mapping.

Author: Shi yu, Hou Zhijie
---
 src/backend/replication/logical/relation.c | 120 ++++++++++++++-------
 src/test/subscription/t/013_partition.pl   |  48 ++++++++-
 2 files changed, 125 insertions(+), 43 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 80fb561a9a..85712fb0f4 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -147,6 +147,66 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
 		pfree(entry->attrmap);
 }
 
+/*
+ * Update remote relation information in the relation map entry.
+ */
+static void
+logicalrep_update_remoterel(LogicalRepRelMapEntry *entry,
+							LogicalRepRelation *remoterel)
+{
+	int			i;
+
+	entry->remoterel.remoteid = remoterel->remoteid;
+	entry->remoterel.nspname = pstrdup(remoterel->nspname);
+	entry->remoterel.relname = pstrdup(remoterel->relname);
+	entry->remoterel.natts = remoterel->natts;
+	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+	for (i = 0; i < remoterel->natts; i++)
+	{
+		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+	}
+	entry->remoterel.replident = remoterel->replident;
+	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+}
+
+/*
+ * Invalidate the existing entry in the partition map.
+ *
+ * Called when new relation mapping is sent by the publisher to update our
+ * expected view of incoming data from said publisher.
+ */
+static void
+logicalrep_partmap_invalidate(LogicalRepRelation *remoterel)
+{
+	MemoryContext oldctx;
+	HASH_SEQ_STATUS status;
+	LogicalRepPartMapEntry *part_entry;
+	LogicalRepRelMapEntry *entry;
+
+	if (LogicalRepPartMap == NULL)
+		return;
+
+	hash_seq_init(&status, LogicalRepPartMap);
+	while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+	{
+		entry = &part_entry->relmapentry;
+
+		if (entry->remoterel.remoteid != remoterel->remoteid)
+			continue;
+
+		logicalrep_relmap_free_entry(entry);
+
+		memset(entry, 0, sizeof(LogicalRepRelMapEntry));
+
+		/* Make cached copy of the data */
+		oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
+		logicalrep_update_remoterel(entry, remoterel);
+		MemoryContextSwitchTo(oldctx);
+	}
+}
+
 /*
  * Add new entry or update existing entry in the relation map cache.
  *
@@ -159,7 +219,6 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 	MemoryContext oldctx;
 	LogicalRepRelMapEntry *entry;
 	bool		found;
-	int			i;
 
 	if (LogicalRepRelMap == NULL)
 		logicalrep_relmap_init();
@@ -177,20 +236,11 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 
 	/* Make cached copy of the data */
 	oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
-	entry->remoterel.remoteid = remoterel->remoteid;
-	entry->remoterel.nspname = pstrdup(remoterel->nspname);
-	entry->remoterel.relname = pstrdup(remoterel->relname);
-	entry->remoterel.natts = remoterel->natts;
-	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
-	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
-	for (i = 0; i < remoterel->natts; i++)
-	{
-		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
-		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
-	}
-	entry->remoterel.replident = remoterel->replident;
-	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+	logicalrep_update_remoterel(entry, remoterel);
 	MemoryContextSwitchTo(oldctx);
+
+	/* Invalidate the corresponding partition map as well */
+	logicalrep_partmap_invalidate(remoterel);
 }
 
 /*
@@ -451,7 +501,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
 static void
 logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 {
-	LogicalRepRelMapEntry *entry;
+	LogicalRepPartMapEntry *entry;
 
 	/* Just to be sure. */
 	if (LogicalRepPartMap == NULL)
@@ -464,11 +514,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 		hash_seq_init(&status, LogicalRepPartMap);
 
 		/* TODO, use inverse lookup hashtable? */
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
 		{
-			if (entry->localreloid == reloid)
+			if (entry->relmapentry.localreloid == reloid)
 			{
-				entry->localrelvalid = false;
+				entry->relmapentry.localrelvalid = false;
 				hash_seq_term(&status);
 				break;
 			}
@@ -481,8 +531,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 
 		hash_seq_init(&status, LogicalRepPartMap);
 
-		while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
-			entry->localrelvalid = false;
+		while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+			entry->relmapentry.localrelvalid = false;
 	}
 }
 
@@ -534,7 +584,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	Oid			partOid = RelationGetRelid(partrel);
 	AttrMap    *attrmap = root->attrmap;
 	bool		found;
-	int			i;
 	MemoryContext oldctx;
 
 	if (LogicalRepPartMap == NULL)
@@ -545,31 +594,24 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 														(void *) &partOid,
 														HASH_ENTER, &found);
 
-	if (found)
-		return &part_entry->relmapentry;
+	entry = &part_entry->relmapentry;
 
-	memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+	if (found && entry->localrelvalid)
+		return entry;
 
 	/* Switch to longer-lived context. */
 	oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
 
-	part_entry->partoid = partOid;
-
-	/* Remote relation is copied as-is from the root entry. */
-	entry = &part_entry->relmapentry;
-	entry->remoterel.remoteid = remoterel->remoteid;
-	entry->remoterel.nspname = pstrdup(remoterel->nspname);
-	entry->remoterel.relname = pstrdup(remoterel->relname);
-	entry->remoterel.natts = remoterel->natts;
-	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
-	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
-	for (i = 0; i < remoterel->natts; i++)
+	if (!found)
 	{
-		entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
-		entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+		memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+
+		part_entry->partoid = partOid;
+
+		/* Remote relation is copied as-is from the root entry. */
+		logicalrep_update_remoterel(entry, remoterel);
+
 	}
-	entry->remoterel.replident = remoterel->replident;
-	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
 
 	entry->localrel = partrel;
 	entry->localreloid = partOid;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index e7f4a94f19..b2183e0232 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -800,9 +800,49 @@ ok( $logfile =~
 	  qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
 	'delete target row is missing in tab2_1');
 
-# No need for this until more tests are added.
-# $node_subscriber1->append_conf('postgresql.conf',
-# 	"log_min_messages = warning");
-# $node_subscriber1->reload;
+$node_subscriber1->append_conf('postgresql.conf',
+	"log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Test the case that target table in subscriber is a partitioned table.
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int);
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
+
+$node_subscriber2->safe_psql(
+	'postgres', q{
+	CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
+	CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
+	CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+	ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
+	ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
+
+$node_subscriber2->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+# Make cache.
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM tab5 ORDER BY 1");
+is($result, qq(2|1), 'updates of tab5 replicated correctly');
+
+# Alter table on publisher.
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab5 ADD COLUMN c INT");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(2|1|1), 'updates of tab5 replicated correctly after altering table on publisher');
 
 done_testing();
-- 
2.18.4

