From a47ce432122759561063da8a70f208a314a3423b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 15 Jun 2018 22:01:53 +1200
Subject: [PATCH] Move RecoveryLockList into a hash table.

Standbys frequently need to release all locks held by a given xid.
Instead of searching one big list linearly, let's create one list
per xid and put them in a hash table, so we can find what we need
in O(1) time.

Author: Thomas Munro
Diagnosed-by: Andres Freund
Reviewed-by: Andres Freund, Tom Lane, Robert Haas
Discussion: https://postgr.es/m/CAEepm%3D1mL0KiQ2KJ4yuPpLGX94a4Ns_W6TL4EGRouxWibu56pA%40mail.gmail.com
---
 src/backend/storage/ipc/standby.c | 175 ++++++++++++++++--------------
 1 file changed, 92 insertions(+), 83 deletions(-)

diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 1eacc5ec143..147784c4b67 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -29,6 +29,8 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -38,7 +40,7 @@ int			vacuum_defer_cleanup_age;
 int			max_standby_archive_delay = 30 * 1000;
 int			max_standby_streaming_delay = 30 * 1000;
 
-static List *RecoveryLockList;
+static HTAB *RecoveryLockLists;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 									   ProcSignalReason reason);
@@ -46,6 +48,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
+/*
+ * Keep track of all the locks owned by a given transaction.
+ */
+typedef struct RecoveryLockListsEntry
+{
+	TransactionId	xid;
+	List		   *locks;
+} RecoveryLockListsEntry;
 
 /*
  * InitRecoveryTransactionEnvironment
@@ -63,6 +73,19 @@ void
 InitRecoveryTransactionEnvironment(void)
 {
 	VirtualTransactionId vxid;
+	HASHCTL			hash_ctl;
+
+	/*
+	 * Initialize the hash table for tracking the list of locks held by each
+	 * transaction.
+	 */
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = sizeof(TransactionId);
+	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
+	RecoveryLockLists = hash_create("RecoveryLockLists",
+									64,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS);
 
 	/*
 	 * Initialize shared invalidation management for Startup process, being
@@ -107,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
 	/* Release all locks the tracked transactions were holding */
 	StandbyReleaseAllLocks();
 
+	/* Destroy the hash table of locks. */
+	hash_destroy(RecoveryLockLists);
+	RecoveryLockLists = NULL;
+
 	/* Cleanup our VirtualTransaction */
 	VirtualXactLockTableCleanup();
 }
@@ -586,8 +613,8 @@ StandbyLockTimeoutHandler(void)
  * We only keep track of AccessExclusiveLocks, which are only ever held by
  * one transaction on one relation.
  *
- * We keep a single dynamically expandible list of locks in local memory,
- * RecoveryLockList, so we can keep track of the various entries made by
+ * We keep a hash table of lists of locks in local memory keyed by xid,
+ * RecoveryLockLists, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * List elements use type xl_standby_lock, since the WAL record type exactly
@@ -601,8 +628,10 @@ StandbyLockTimeoutHandler(void)
 void
 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 {
+	RecoveryLockListsEntry *entry;
 	xl_standby_lock *newlock;
 	LOCKTAG		locktag;
+	bool		found;
 
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
@@ -616,11 +645,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 	/* dbOid is InvalidOid when we are locking a shared relation. */
 	Assert(OidIsValid(relOid));
 
+	/* Create a new list for this xid, if we don't have one already. */
+	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
+	if (!found)
+	{
+		entry->xid = xid;
+		entry->locks = NIL;
+	}
+
 	newlock = palloc(sizeof(xl_standby_lock));
 	newlock->xid = xid;
 	newlock->dbOid = dbOid;
 	newlock->relOid = relOid;
-	RecoveryLockList = lappend(RecoveryLockList, newlock);
+	entry->locks = lappend(entry->locks, newlock);
 
 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
@@ -628,46 +665,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 }
 
 static void
-StandbyReleaseLocks(TransactionId xid)
+StandbyReleaseLockList(List *locks)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-
-	/*
-	 * Release all matching locks and remove them from list
-	 */
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	while (locks)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
+		LOCKTAG		locktag;
+		elog(trace_recovery(DEBUG4),
+			 "releasing recovery lock: xid %u db %u rel %u",
+			 lock->xid, lock->dbOid, lock->relOid);
+		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+		if (!LockRelease(&locktag, AccessExclusiveLock, true))
+		{
+			elog(LOG,
+				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+				 lock->xid, lock->dbOid, lock->relOid);
+			Assert(false);
+		}
+		pfree(lock);
+		locks = list_delete_first(locks);
+	}
+}
 
-		next = lnext(cell);
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+	RecoveryLockListsEntry *entry;
 
-		if (!TransactionIdIsValid(xid) || lock->xid == xid)
+	if (TransactionIdIsValid(xid))
+	{
+		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
 		{
-			LOCKTAG		locktag;
-
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
-				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
-
-			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-			pfree(lock);
+			StandbyReleaseLockList(entry->locks);
+			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 		}
-		else
-			prev = cell;
 	}
+	else
+		StandbyReleaseAllLocks();
 }
 
 /*
  * Release locks for a transaction tree, starting at xid down, from
- * RecoveryLockList.
+ * RecoveryLockLists.
  *
  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
  * to remove any AccessExclusiveLocks requested by a transaction.
@@ -689,30 +728,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
 void
 StandbyReleaseAllLocks(void)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-	LOCKTAG		locktag;
+	HASH_SEQ_STATUS	status;
+	RecoveryLockListsEntry *entry;
 
 	elog(trace_recovery(DEBUG2), "release all standby locks");
 
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	hash_seq_init(&status, RecoveryLockLists);
+	while ((entry = hash_seq_search(&status)))
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-
-		next = lnext(cell);
-
-		elog(trace_recovery(DEBUG4),
-			 "releasing recovery lock: xid %u db %u rel %u",
-			 lock->xid, lock->dbOid, lock->relOid);
-		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-		if (!LockRelease(&locktag, AccessExclusiveLock, true))
-			elog(LOG,
-				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-		RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-		pfree(lock);
+		StandbyReleaseLockList(entry->locks);
+		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 	}
 }
 
@@ -724,41 +749,25 @@ StandbyReleaseAllLocks(void)
 void
 StandbyReleaseOldLocks(TransactionId oldxid)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-	LOCKTAG		locktag;
+	HASH_SEQ_STATUS	status;
+	RecoveryLockListsEntry *entry;
 
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	hash_seq_init(&status, RecoveryLockLists);
+	while ((entry = hash_seq_search(&status)))
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-		bool		remove = false;
-
-		next = lnext(cell);
+		Assert(TransactionIdIsValid(entry->xid));
 
-		Assert(TransactionIdIsValid(lock->xid));
+		/* Skip if prepared transaction. */
+		if (StandbyTransactionIdIsPrepared(entry->xid))
+			continue;
 
-		if (StandbyTransactionIdIsPrepared(lock->xid))
-			remove = false;
-		else if (TransactionIdPrecedes(lock->xid, oldxid))
-			remove = true;
+		/* Skip if >= oldxid. */
+		if (!TransactionIdPrecedes(entry->xid, oldxid))
+			continue;
 
-		if (remove)
-		{
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
-				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
-			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-			pfree(lock);
-		}
-		else
-			prev = cell;
+		/* Remove all locks and hash table entry. */
+		StandbyReleaseLockList(entry->locks);
+		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 	}
 }
 
-- 
2.17.0

