From 4e5fb7119ff3ecc8448fd13278b62add1e132dba Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 13 Jan 2021 17:23:08 +0530
Subject: [PATCH v1 2/2] Invalidate relation map cache in subscriber syscache
 invalidation callbacks

Currently, in logical replication, relation map cache in the
subscriber is not getting invalidated when anything changes in
pg_subscription_rel or pg_subscription catalogues. So we end up
not reading the latest system catalogues always in logicalrep_rel_open
using GetSubscriptionRelState.

To fix this, invalidate the relation map cache entries in
invalidate_syncing_table_states and subscription_change_cb which
are invalidation callbacks for pg_subscription_rel and pg_subscription
catalogues respectively.
---
 src/backend/replication/logical/relation.c  | 26 +++++++++++++++++++++
 src/backend/replication/logical/tablesync.c |  3 +++
 src/backend/replication/logical/worker.c    |  3 +++
 src/include/replication/logicalrelation.h   |  1 +
 4 files changed, 33 insertions(+)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e861c0ff80..88ac772444 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -96,6 +96,32 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
 	}
 }
 
+/*
+ * Invalidate relation map cache whenever syscache of pg_subscription_rel or
+ * pg_subscription gets changed.
+ */
+void
+logicalrep_relmap_invalidate(void)
+{
+	LogicalRepRelMapEntry *entry;
+	HASH_SEQ_STATUS status;
+
+	/* Just to be sure. */
+	if (LogicalRepRelMap == NULL)
+		return;
+
+	/*
+	 * There is no way to find the cache entry for which the syscache has been
+	 * changed, so we mark all the cache entries state as unknown. Because of
+	 * this, in logicalrep_rel_open the cache entry state will be read from
+	 * the system catalogue using GetSubscriptionRelState.
+	 */
+	hash_seq_init(&status, LogicalRepRelMap);
+
+	while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		entry->state = SUBREL_STATE_UNKNOWN;
+}
+
 /*
  * Initialize the relation map cache.
  */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 863d196fd7..6b29173a5c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -257,6 +257,9 @@ void
 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 {
 	table_states_valid = false;
+
+	/* Invalidate relation map cache. */
+	logicalrep_relmap_invalidate();
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 1b1d70ed68..e291697ad7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2486,6 +2486,9 @@ static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	MySubscriptionValid = false;
+
+	/* Invalidate relation map cache. */
+	logicalrep_relmap_invalidate();
 }
 
 /*
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 3f0b3deefb..f5e70b75b0 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -49,4 +49,5 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern char *logicalrep_typmap_gettypname(Oid remoteid);
 
+extern void logicalrep_relmap_invalidate(void);
 #endif							/* LOGICALRELATION_H */
-- 
2.25.1

