From bbc589de76cd37ffa6f7969f07b4f750a7911846 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 2 Dec 2022 08:46:23 +0000
Subject: [PATCH v2] Make insertingAt 64-bit atomic

WAL insertion lwlock's insertingAt value is currently read/cleared
with the help of lwlock's wait list lock to avoid torn-free reads
as it is of type XLogRecPtr. This wait list lock can become a point
of contention on a highly concurrent write workloads.

Therefore, make insertingAt a 64-bit atomic which inherently
provides torn-free reads and writes.
---
 src/backend/access/transam/xlog.c |  4 +--
 src/backend/storage/lmgr/lwlock.c | 44 +++++++++++--------------------
 src/include/storage/lwlock.h      |  6 ++---
 3 files changed, 20 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c153c32a77..26e841ef6c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -376,7 +376,7 @@ typedef struct XLogwrtResult
 typedef struct
 {
 	LWLock		lock;
-	XLogRecPtr	insertingAt;
+	pg_atomic_uint64	insertingAt;
 	XLogRecPtr	lastImportantAt;
 } WALInsertLock;
 
@@ -4602,7 +4602,7 @@ XLOGShmemInit(void)
 	for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
 	{
 		LWLockInitialize(&WALInsertLocks[i].l.lock, LWTRANCHE_WAL_INSERT);
-		WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+		pg_atomic_init_u64(&WALInsertLocks[i].l.insertingAt, InvalidXLogRecPtr);
 		WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
 	}
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index a5ad36ca78..f7556dcb7d 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1546,9 +1546,8 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
  * *result is set to true if the lock was free, and false otherwise.
  */
 static bool
-LWLockConflictsWithVar(LWLock *lock,
-					   uint64 *valptr, uint64 oldval, uint64 *newval,
-					   bool *result)
+LWLockConflictsWithVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
+					   uint64 *newval, bool *result)
 {
 	bool		mustwait;
 	uint64		value;
@@ -1570,14 +1569,7 @@ LWLockConflictsWithVar(LWLock *lock,
 
 	*result = false;
 
-	/*
-	 * Read value using the lwlock's wait list lock, as we can't generally
-	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
-	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
-	 */
-	LWLockWaitListLock(lock);
-	value = *valptr;
-	LWLockWaitListUnlock(lock);
+	value = pg_atomic_read_u64(valptr);
 
 	if (value != oldval)
 	{
@@ -1606,7 +1598,8 @@ LWLockConflictsWithVar(LWLock *lock,
  * in shared mode, returns 'true'.
  */
 bool
-LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
+LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
+				 uint64 *newval)
 {
 	PGPROC	   *proc = MyProc;
 	int			extraWaits = 0;
@@ -1734,29 +1727,29 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
  * LWLockUpdateVar - Update a variable and wake up waiters atomically
  *
  * Sets *valptr to 'val', and wakes up all processes waiting for us with
- * LWLockWaitForVar().  Setting the value and waking up the processes happen
- * atomically so that any process calling LWLockWaitForVar() on the same lock
- * is guaranteed to see the new value, and act accordingly.
+ * LWLockWaitForVar().  It first sets the value atomically and then wakes up
+ * the waiting processes so that any process calling LWLockWaitForVar() on the
+ * same lock is guaranteed to see the new value, and act accordingly.
  *
  * The caller must be holding the lock in exclusive mode.
  */
 void
-LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
+LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 {
 	proclist_head wakeup;
 	proclist_mutable_iter iter;
 
 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
 
+	/* Update the lock's value atomically first. */
+	pg_atomic_write_u64(valptr, val);
+
 	proclist_init(&wakeup);
 
 	LWLockWaitListLock(lock);
 
 	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
 
-	/* Update the lock's value */
-	*valptr = val;
-
 	/*
 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
 	 * up. They are always in the front of the queue.
@@ -1872,17 +1865,10 @@ LWLockRelease(LWLock *lock)
  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
  */
 void
-LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
+LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 {
-	LWLockWaitListLock(lock);
-
-	/*
-	 * Set the variable's value before releasing the lock, that prevents race
-	 * a race condition wherein a new locker acquires the lock, but hasn't yet
-	 * set the variables value.
-	 */
-	*valptr = val;
-	LWLockWaitListUnlock(lock);
+	/* Update the lock's value atomically */
+	pg_atomic_write_u64(valptr, val);
 
 	LWLockRelease(lock);
 }
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a494cb598f..4227e59298 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -125,14 +125,14 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
-extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val);
+extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
 
-extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
-extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val);
+extern bool LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval, uint64 *newval);
+extern void LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 
 extern Size LWLockShmemSize(void);
 extern void CreateLWLocks(void);
-- 
2.34.1

