From c0f892fbde509b16cfd4dee03a770ee9899f0e63 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 12 Apr 2021 10:31:52 +0900
Subject: [PATCH v7 2/5] Use HTAB for replication slot statistics.

Previously, we used to use the array to store stats for repilcation
slots. But this had two problems in case where drop-slot-stats message
is lost: 1) the stats for the new slot are not recorded and 2) writing
beyond the end of the array when after restring the number of slots
whose stats are stored in the stats file exceeds
max_replication_slots.

This commit changes to use HTAB for replication slot statistics,
resolving both problems. Instead, we have pgstat_vacuum_stat() checks
if a slot for stats entry in the stats collector still exists or not.
Then send drop-slot-stats message.
---
 src/backend/catalog/system_views.sql      |  26 ++-
 src/backend/postmaster/pgstat.c           | 261 +++++++++++-----------
 src/backend/replication/logical/logical.c |   2 +-
 src/backend/replication/slot.c            |  23 +-
 src/backend/utils/adt/pgstatfuncs.c       | 127 ++++++-----
 src/include/catalog/pg_proc.dat           |  14 +-
 src/include/pgstat.h                      |   8 +-
 src/include/replication/slot.h            |   2 +-
 src/test/regress/expected/rules.out       |   4 +-
 9 files changed, 247 insertions(+), 220 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a..1bceb25571 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -866,18 +866,6 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
-CREATE VIEW pg_stat_replication_slots AS
-    SELECT
-            s.slot_name,
-            s.spill_txns,
-            s.spill_count,
-            s.spill_bytes,
-            s.stream_txns,
-            s.stream_count,
-            s.stream_bytes,
-            s.stats_reset
-    FROM pg_stat_get_replication_slots() AS s;
-
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -982,6 +970,20 @@ CREATE VIEW pg_replication_slots AS
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.slot_name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
+            s.stats_reset
+    FROM pg_replication_slots as r,
+        LATERAL pg_stat_get_replication_slot(slot_name) as s
+    WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d08..142567e654 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
 #define PGSTAT_DB_HASH_SIZE		16
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
+#define PGSTAT_REPLSLOT_HASH_SIZE	32
 
 
 /* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int	nReplSlotStats;
+static HTAB *replSlotStats = NULL;
 static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
-static int	pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_ReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_ReplSlotEntry *slotstats, TimestampTz ts);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
 	/* Clean up */
 	hash_destroy(htab);
 
+	/*
+	 * Check for all replication slots in stats hash table. We do this check
+	 * when replSlotStats has more than max_replication_slots entries, i.g,
+	 * when there are stats for the already-dropped slot, to avoid frequent
+	 * call SearchNamedReplicationSlot() which acquires LWLock.
+	 */
+	if (replSlotStats && hash_get_num_entries(replSlotStats) > max_replication_slots)
+	{
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+		}
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
 
 	if (name)
 	{
-		ReplicationSlot *slot;
-
-		/*
-		 * Check if the slot exists with the given name. It is possible that by
-		 * the time this message is executed the slot is dropped but at least
-		 * this check will ensure that the given name is for a valid slot.
-		 */
-		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-		slot = SearchNamedReplicationSlot(name);
-		LWLockRelease(ReplicationSlotControlLock);
-
-		if (!slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("replication slot \"%s\" does not exist",
-							name)));
-
-		/*
-		 * Nothing to do for physical slots as we collect stats only for
-		 * logical slots.
-		 */
-		if (SlotIsPhysical(slot))
-			return;
-
 		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -2870,17 +2864,19 @@ pgstat_fetch_slru(void)
  * pgstat_fetch_replslot() -
  *
  *	Support function for the SQL-callable pgstat* functions. Returns
- *	a pointer to the replication slot statistics struct and sets the
- *	number of entries in nslots_p.
+ *	a pointer to the replication slot statistics struct.
  * ---------
  */
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_ReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
 {
+	PgStat_ReplSlotEntry *slotent = NULL;
+
 	backend_read_statsfile();
 
-	*nslots_p = nReplSlotStats;
-	return replSlotStats;
+	slotent = pgstat_get_replslot_entry(slotname, false);
+
+	return slotent;
 }
 
 /*
@@ -3652,7 +3648,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
 	int			rc;
-	int			i;
 
 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -3742,11 +3737,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	/*
 	 * Write replication slot stats struct
 	 */
-	for (i = 0; i < nReplSlotStats; i++)
+	if (replSlotStats)
 	{
-		fputc('R', fpout);
-		rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
-		(void) rc;				/* we'll check for error with ferror */
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('R', fpout);
+			rc = fwrite(slotentry, sizeof(PgStat_ReplSlotEntry), 1, fpout);
+			(void) rc;				/* we'll check for error with ferror */
+		}
 	}
 
 	/*
@@ -3973,12 +3974,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-	/* Allocate the space for replication slot statistics */
-	replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
-										   max_replication_slots
-										   * sizeof(PgStat_ReplSlotStats));
-	nReplSlotStats = 0;
-
 	/*
 	 * Clear out global, archiver, WAL and SLRU statistics so they start from
 	 * zero in case we can't load an existing statsfile.
@@ -4003,12 +3998,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
-	/*
-	 * Set the same reset timestamp for all replication slots too.
-	 */
-	for (i = 0; i < max_replication_slots; i++)
-		replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
 	/*
 	 * Try to open the stats file. If it doesn't exist, the backends simply
 	 * return zero for anything and the collector simply starts from scratch
@@ -4195,21 +4184,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+			{
+				PgStat_ReplSlotEntry slotstats;
+				PgStat_ReplSlotEntry *slotent;
+
+				if (fread(&slotstats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
 									statfile)));
-					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
 					goto done;
 				}
-				nReplSlotStats++;
+
+				slotent = pgstat_get_replslot_entry(slotstats.slotname, true);
+				memcpy(slotent, &slotstats, sizeof(PgStat_ReplSlotEntry));
 				break;
+			}
 
 			case 'E':
 				goto done;
@@ -4422,7 +4417,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
-	PgStat_ReplSlotStats myReplSlotStats;
+	PgStat_ReplSlotEntry myReplSlotStats;
 	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
@@ -4551,12 +4546,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
@@ -4763,7 +4758,6 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStats = NULL;
-	nReplSlotStats = 0;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5187,20 +5181,26 @@ static void
 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 								 int len)
 {
-	int			i;
-	int			idx = -1;
+	PgStat_ReplSlotEntry *slotent;
 	TimestampTz ts;
 
+	/* Return if we don't have replication slot statistics */
+	if (replSlotStats == NULL)
+		return;
+
 	ts = GetCurrentTimestamp();
 	if (msg->clearall)
 	{
-		for (i = 0; i < nReplSlotStats; i++)
-			pgstat_reset_replslot(i, ts);
+		HASH_SEQ_STATUS sstat;
+
+		hash_seq_init(&sstat, replSlotStats);
+		while ((slotent = (PgStat_ReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+			pgstat_reset_replslot(slotent, ts);
 	}
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5208,11 +5208,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 		 * corresponding statistics entry is also removed before receiving the
 		 * reset message.
 		 */
-		if (idx < 0)
+		if (!slotent)
 			return;
 
 		/* Reset the stats for the requested replication slot */
-		pgstat_reset_replslot(idx, ts);
+		pgstat_reset_replslot(slotent, ts);
 	}
 }
 
@@ -5530,44 +5530,27 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
 static void
 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 {
-	int			idx;
-
-	/*
-	 * Get the index of replication slot statistics.  On dropping, we don't
-	 * create the new statistics.
-	 */
-	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
-	/*
-	 * The slot entry is not found or there is no space to accommodate the new
-	 * entry.  This could happen when the message for the creation of a slot
-	 * reached before the drop message even though the actual operations
-	 * happen in reverse order.  In such a case, the next update of the
-	 * statistics for the same slot will create the required entry.
-	 */
-	if (idx < 0)
-		return;
-
-	/* it must be a valid replication slot index */
-	Assert(idx < nReplSlotStats);
-
 	if (msg->m_drop)
 	{
 		/* Remove the replication slot statistics with the given name */
-		if (idx < nReplSlotStats - 1)
-			memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
-				   sizeof(PgStat_ReplSlotStats));
-		nReplSlotStats--;
+		if (replSlotStats != NULL)
+			(void) hash_search(replSlotStats, (void *) NameStr(msg->m_slotname),
+							   HASH_REMOVE, NULL);
 	}
 	else
 	{
+		PgStat_ReplSlotEntry *slotent;
+
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+		Assert(slotent);
+
 		/* Update the replication slot statistics */
-		replSlotStats[idx].spill_txns += msg->m_spill_txns;
-		replSlotStats[idx].spill_count += msg->m_spill_count;
-		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
-		replSlotStats[idx].stream_txns += msg->m_stream_txns;
-		replSlotStats[idx].stream_count += msg->m_stream_count;
-		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+		slotent->spill_txns += msg->m_spill_txns;
+		slotent->spill_count += msg->m_spill_count;
+		slotent->spill_bytes += msg->m_spill_bytes;
+		slotent->stream_txns += msg->m_stream_txns;
+		slotent->stream_count += msg->m_stream_count;
+		slotent->stream_bytes += msg->m_stream_bytes;
 	}
 }
 
@@ -5745,57 +5728,77 @@ pgstat_db_requested(Oid databaseid)
 }
 
 /* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
  *
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
  *
  * create_it tells whether to create the new slot entry if it is not found.
  * ----------
  */
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_ReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create_it)
 {
-	int			i;
+	PgStat_ReplSlotEntry *slotent;
+	bool	found;
 
-	Assert(nReplSlotStats <= max_replication_slots);
-	for (i = 0; i < nReplSlotStats; i++)
+	/*
+	 * Create the replication slot stats hash table if we don't have
+	 * it already.
+	 */
+	if (replSlotStats == NULL)
 	{
-		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
-			return i;			/* found */
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(NameData);
+		hash_ctl.entrysize = sizeof(PgStat_ReplSlotEntry);
+		hash_ctl.hcxt = pgStatLocalContext;
+
+		replSlotStats = hash_create("Replication slots hash",
+									PGSTAT_REPLSLOT_HASH_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 	}
 
-	/*
-	 * The slot is not found.  We don't want to register the new statistics if
-	 * the list is already full or the caller didn't request.
-	 */
-	if (i == max_replication_slots || !create_it)
-		return -1;
+	slotent = (PgStat_ReplSlotEntry *) hash_search(replSlotStats,
+												   (void *) &name,
+												   create_it ? HASH_ENTER : HASH_FIND,
+												   &found);
 
-	/* Register new slot */
-	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+	if (!slotent)
+	{
+		/* not found */
+		Assert(!create_it && !found);
+		return NULL;
+	}
+
+	/* initialize the entry */
+	if (create_it && !found)
+	{
+		memset(slotent, 0, sizeof(PgStat_ReplSlotEntry));
+		namestrcpy(&(slotent->slotname), NameStr(name));
+	}
 
-	return nReplSlotStats++;
+	return slotent;
 }
 
 /* ----------
  * pgstat_reset_replslot
  *
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
  * ----------
  */
 static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts)
 {
 	/* reset only counters. Don't clear slot name */
-	replSlotStats[i].spill_txns = 0;
-	replSlotStats[i].spill_count = 0;
-	replSlotStats[i].spill_bytes = 0;
-	replSlotStats[i].stream_txns = 0;
-	replSlotStats[i].stream_count = 0;
-	replSlotStats[i].stream_bytes = 0;
-	replSlotStats[i].stat_reset_timestamp = ts;
+	slotent->spill_txns = 0;
+	slotent->spill_count = 0;
+	slotent->spill_bytes = 0;
+	slotent->stream_txns = 0;
+	slotent->stream_count = 0;
+	slotent->stream_bytes = 0;
+	slotent->stat_reset_timestamp = ts;
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12..d66846faab 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,7 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
-	PgStat_ReplSlotStats repSlotStat;
+	PgStat_ReplSlotEntry repSlotStat;
 
 	/*
 	 * Nothing to do if we haven't spilled or streamed anything since the last
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78..f75e7e95f9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -329,8 +329,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 */
 	if (SlotIsLogical(slot))
 	{
-		PgStat_ReplSlotStats repSlotStat;
-		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		PgStat_ReplSlotEntry repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotEntry));
 		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
 		pgstat_report_replslot(&repSlotStat);
 	}
@@ -349,17 +349,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Search for the named replication slot.
  *
  * Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
  */
 ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
 {
 	int			i;
 	ReplicationSlot	*slot = NULL;
 
-	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
-								LW_SHARED));
+	if (need_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -372,6 +370,9 @@ SearchNamedReplicationSlot(const char *name)
 		}
 	}
 
+	if (need_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
 	return slot;
 }
 
@@ -416,7 +417,7 @@ retry:
 	 * Search for the slot with the specified name if the slot to acquire is
 	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = slot ? slot : SearchNamedReplicationSlot(name);
+	s = slot ? slot : SearchNamedReplicationSlot(name, false);
 	if (s == NULL || !s->in_use)
 	{
 		LWLockRelease(ReplicationSlotControlLock);
@@ -712,7 +713,11 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * and create messages while holding ReplicationSlotAllocationLock to
 	 * reduce that possibility. If the messages reached in reverse, we would
 	 * lose one statistics update message. But the next update message will
-	 * create the statistics for the replication slot.
+	 * create the statistics for the replication slot. In case where the
+	 * message for dropping the old slot gets lost and a slot with the same is
+	 * created, the stats will be accumulated into the old slots since we
+	 * use the slot name as the key. In that case, user can reset the particular
+	 * stats by pg_stat_reset_replication_slot().
 	 */
 	if (SlotIsLogical(slot))
 		pgstat_report_replslot_drop(NameStr(slot->data.name));
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba73614..569abce27e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	char	   *target = NULL;
 
 	if (!PG_ARGISNULL(0))
+	{
+		ReplicationSlot *slot;
+
 		target = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
+		/*
+		 * Check if the slot exists with the given name. It is possible that by
+		 * the time this message is executed the slot is dropped but at least
+		 * this check will ensure that the given name is for a valid slot.
+		 */
+		slot = SearchNamedReplicationSlot(target, true);
+
+		if (!slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("replication slot \"%s\" does not exist",
+							target)));
+
+		/*
+		 * Nothing to do for physical slots as we collect stats only for
+		 * logical slots.
+		 */
+		if (SlotIsPhysical(slot))
+			PG_RETURN_VOID();
+	}
+
 	pgstat_reset_replslot_counter(target);
 
 	PG_RETURN_VOID();
@@ -2280,71 +2305,61 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/* Get the statistics for the replication slots */
+/* Get the statistics for the replication slot */
 Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_REPLICATION_SLOT_COLS 8
-	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	text *slotname_text = PG_GETARG_TEXT_P(0);
+	NameData slotname;
 	TupleDesc	tupdesc;
-	Tuplestorestate *tupstore;
-	MemoryContext per_query_ctx;
-	MemoryContext oldcontext;
-	PgStat_ReplSlotStats *slotstats;
-	int			nstats;
-	int			i;
+	Datum		values[10];
+	bool		nulls[10];
+	PgStat_ReplSlotEntry *slotent;
 
-	/* check to see if caller supports us returning a tuplestore */
-	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("set-valued function called in context that cannot accept a set")));
-	if (!(rsinfo->allowedModes & SFRM_Materialize))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("materialize mode required, but it is not allowed in this context")));
-
-	/* Build a tuple descriptor for our result type */
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
-	oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
-	tupstore = tuplestore_begin_heap(true, false, work_mem);
-	rsinfo->returnMode = SFRM_Materialize;
-	rsinfo->setResult = tupstore;
-	rsinfo->setDesc = tupdesc;
-
-	MemoryContextSwitchTo(oldcontext);
-
-	slotstats = pgstat_fetch_replslot(&nstats);
-	for (i = 0; i < nstats; i++)
-	{
-		Datum		values[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		bool		nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		PgStat_ReplSlotStats *s = &(slotstats[i]);
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
 
-		MemSet(values, 0, sizeof(values));
-		MemSet(nulls, 0, sizeof(nulls));
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
 
-		values[0] = CStringGetTextDatum(NameStr(s->slotname));
-		values[1] = Int64GetDatum(s->spill_txns);
-		values[2] = Int64GetDatum(s->spill_count);
-		values[3] = Int64GetDatum(s->spill_bytes);
-		values[4] = Int64GetDatum(s->stream_txns);
-		values[5] = Int64GetDatum(s->stream_count);
-		values[6] = Int64GetDatum(s->stream_bytes);
+	namestrcpy(&slotname, text_to_cstring(slotname_text));
+	slotent = pgstat_fetch_replslot(slotname);
 
-		if (s->stat_reset_timestamp == 0)
-			nulls[7] = true;
-		else
-			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+	if (!slotent)
+		PG_RETURN_NULL();
 
-		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
-	}
+	values[0] = CStringGetTextDatum(NameStr(slotent->slotname));
+	values[1] = Int64GetDatum(slotent->spill_txns);
+	values[2] = Int64GetDatum(slotent->spill_count);
+	values[3] = Int64GetDatum(slotent->spill_bytes);
+	values[4] = Int64GetDatum(slotent->stream_txns);
+	values[5] = Int64GetDatum(slotent->stream_count);
+	values[6] = Int64GetDatum(slotent->stream_bytes);
 
-	tuplestore_donestoring(tupstore);
+	if (slotent->stat_reset_timestamp == 0)
+		nulls[7] = true;
+	else
+		values[7] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
-	return (Datum) 0;
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae..6c9521603f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5311,14 +5311,14 @@
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
-  proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+  proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
-  prosrc => 'pg_stat_get_replication_slots' },
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058..c2d192fc55 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -915,7 +915,7 @@ typedef struct PgStat_SLRUStats
 /*
  * Replication slot statistics kept in the stats collector
  */
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_ReplSlotEntry
 {
 	NameData	slotname;
 	PgStat_Counter spill_txns;
@@ -925,7 +925,7 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
 	TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_ReplSlotEntry;
 
 
 /*
@@ -1027,7 +1027,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
@@ -1125,7 +1125,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_ReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
 extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50d..357068403a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c..eb741b9ff1 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2069,7 +2069,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_count,
     s.stream_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_replication_slots r,
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset)
+  WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.27.0

