From 5b6b2ebc60100d6d062bd837aa30f5943d4212cc Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 07:27:11 +0000
Subject: [PATCH v22 2/4] Allow WALReadFromBuffers() to wait for in-progress
 insertions

---
 src/backend/access/transam/xlog.c | 43 ++++++++++++++++++++++++-------
 1 file changed, 33 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d87a66c59..d82557886e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -698,7 +698,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1494,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * to make room for a new one, which in turn requires WALWriteLock.
  */
 static XLogRecPtr
-WaitXLogInsertionsToFinish(XLogRecPtr upto)
+WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog)
 {
 	uint64		bytepos;
 	XLogRecPtr	reservedUpto;
@@ -1521,9 +1521,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	 */
 	if (upto > reservedUpto)
 	{
-		ereport(LOG,
-				(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
-						LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
+		if (emitLog)
+			ereport(LOG,
+					(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
+							LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
 		upto = reservedUpto;
 	}
 
@@ -1712,7 +1713,11 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
  * starting at location 'startptr' and returns total bytes read.
  *
  * The bytes read may be fewer than requested if any of the WAL buffers in the
- * requested range have been evicted.
+ * requested range have been evicted, or if the last requested byte is beyond
+ * the current insert position.
+ *
+ * If reading beyond the current write position, this function will wait for
+ * concurrent inserters to finish. Otherwise, it does not wait at all.
  *
  * This function returns immediately if the requested data is not from the
  * current timeline, or if the server is in recovery.
@@ -1724,6 +1729,7 @@ WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
 	Size		nbytes = count;
 	Size		ntotal = 0;
 	char	   *dst = buf;
+	XLogRecPtr	upto = startptr + count;
 
 	if (RecoveryInProgress() ||
 		tli != GetWALInsertionTimeLine())
@@ -1731,6 +1737,23 @@ WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
 
+	/*
+	 * Caller requested very recent WAL data. Wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 *
+	 * Most callers will have already updated LogwrtResult when determining
+	 * how far to read, but it's OK if it's out of date. XXX: is it worth
+	 * taking a spinlock to update LogwrtResult and check again before calling
+	 * WaitXLogInsertionsToFinish()?
+	 */
+	if (upto > LogwrtResult.Write)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto, false);
+
+		upto = Min(upto, writtenUpto);
+		nbytes = upto - startptr;
+	}
+
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
 	 * read and verify the end pointer, then copy the data out, and finally
@@ -2001,7 +2024,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 				 */
 				LWLockRelease(WALBufMappingLock);
 
-				WaitXLogInsertionsToFinish(OldPageRqstPtr);
+				WaitXLogInsertionsToFinish(OldPageRqstPtr, true);
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
@@ -2795,7 +2818,7 @@ XLogFlush(XLogRecPtr record)
 		 * Before actually performing the write, wait for all in-flight
 		 * insertions to the pages we're about to write to finish.
 		 */
-		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, true);
 
 		/*
 		 * Try to get the write lock. If we can't get it immediately, wait
@@ -2846,7 +2869,7 @@ XLogFlush(XLogRecPtr record)
 			 * We're only calling it again to allow insertpos to be moved
 			 * further forward, not to actually wait for anyone.
 			 */
-			insertpos = WaitXLogInsertionsToFinish(insertpos);
+			insertpos = WaitXLogInsertionsToFinish(insertpos, true);
 		}
 
 		/* try to write/flush later additions to XLOG as well */
@@ -3025,7 +3048,7 @@ XLogBackgroundFlush(void)
 	START_CRIT_SECTION();
 
 	/* now wait for any in-progress insertions to finish and get write lock */
-	WaitXLogInsertionsToFinish(WriteRqst.Write);
+	WaitXLogInsertionsToFinish(WriteRqst.Write, true);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	if (WriteRqst.Write > LogwrtResult.Write ||
-- 
2.34.1

