From 76b71019e9067d96559639c299384222abb1651e Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 25 Jan 2024 08:19:01 +0000
Subject: [PATCH v20] Add XLogReadFromBuffers().

Allows reading directly from WAL buffers without a lock, avoiding the
need to wait for WAL flushing and read from the filesystem.

For now, the only caller is physical replication, but we can consider
expanding it to other callers as needed.

Author: Bharath Rupireddy
---
 src/backend/access/transam/xlog.c       | 147 ++++++++++++++++++++++--
 src/backend/access/transam/xlogreader.c |   3 -
 src/backend/replication/walsender.c     |   8 ++
 src/include/access/xlog.h               |   3 +
 4 files changed, 149 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..4940e8ca29 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -698,7 +698,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1494,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * to make room for a new one, which in turn requires WALWriteLock.
  */
 static XLogRecPtr
-WaitXLogInsertionsToFinish(XLogRecPtr upto)
+WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog)
 {
 	uint64		bytepos;
 	XLogRecPtr	reservedUpto;
@@ -1521,9 +1521,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	 */
 	if (upto > reservedUpto)
 	{
-		ereport(LOG,
-				(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
-						LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
+		if (emitLog)
+			ereport(LOG,
+					(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
+							LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
 		upto = reservedUpto;
 	}
 
@@ -1705,6 +1706,134 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL directly from WAL buffers, if available.
+ *
+ * This function reads 'count' bytes of WAL from WAL buffers into 'buf'
+ * starting at location 'startptr' and returns total bytes read.
+ *
+ * The bytes read may be fewer than requested if any of the WAL buffers in the
+ * requested range have been evicted, or if the last requested byte is beyond
+ * the current insert position.
+ *
+ * If reading beyond the current write position, this function will wait for
+ * concurrent inserters to finish. Otherwise, it does not wait at all.
+ *
+ * This function returns immediately if the requested data is not from the
+ * current timeline, or if the server is in recovery.
+ */
+Size
+XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
+{
+	XLogRecPtr	ptr = startptr;
+	XLogRecPtr	upto = startptr + count;
+	Size		nbytes = count;
+	Size		ntotal = 0;
+	char	   *dst = buf;
+
+	if (RecoveryInProgress() ||
+		tli != GetWALInsertionTimeLine())
+		return ntotal;
+
+	Assert(!XLogRecPtrIsInvalid(startptr));
+
+	/*
+	 * Caller requested very recent WAL data. Wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 *
+	 * Most callers will have already updated LogwrtResult when determining
+	 * how far to read, but it's OK if it's out of date. XXX: is it worth
+	 * taking a spinlock to update LogwrtResult and check again before calling
+	 * WaitXLogInsertionsToFinish()?
+	 */
+	if (upto > LogwrtResult.Write)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto, false);
+
+		upto = Min(upto, writtenUpto);
+		nbytes = upto - startptr;
+	}
+
+	/*
+	 * Loop through the buffers without a lock. For each buffer, atomically
+	 * read and verify the end pointer, then copy the data out, and finally
+	 * re-read and re-verify the end pointer.
+	 *
+	 * Once a page is evicted, it never returns to the WAL buffers, so if the
+	 * end pointer matches the expected end pointer before and after we copy
+	 * the data, then the right page must have been present during the data
+	 * copy. Read barriers are necessary to ensure that the data copy actually
+	 * happens between the two verification steps.
+	 *
+	 * If the verification fails, we simply terminate the loop and return with
+	 * the data that had been already copied out successfully.
+	 */
+	while (nbytes > 0)
+	{
+		XLogRecPtr	expectedEndPtr;
+		XLogRecPtr	endptr;
+		int			idx;
+		const char *page;
+		const char *data;
+		Size		nread;
+
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+		/*
+		 * First verification step: check that the correct page is present in
+		 * the WAL buffers.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/*
+		 * Ensure that the data copy and the first verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/* how much is available on this page to read? */
+		nread = Min(nbytes, XLOG_BLCKSZ - (data - page));
+
+		/* data copy */
+		memcpy(dst, data, nread);
+
+		/*
+		 * Ensure that the data copy and the second verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/*
+		 * Second verification step: check that the page we read from wasn't
+		 * evicted while we were copying the data.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		dst += nread;
+		ptr += nread;
+		ntotal += nread;
+		nbytes -= nread;
+	}
+
+	Assert(ntotal <= count);
+
+	return ntotal;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
@@ -1895,7 +2024,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 				 */
 				LWLockRelease(WALBufMappingLock);
 
-				WaitXLogInsertionsToFinish(OldPageRqstPtr);
+				WaitXLogInsertionsToFinish(OldPageRqstPtr, true);
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
@@ -2689,7 +2818,7 @@ XLogFlush(XLogRecPtr record)
 		 * Before actually performing the write, wait for all in-flight
 		 * insertions to the pages we're about to write to finish.
 		 */
-		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, true);
 
 		/*
 		 * Try to get the write lock. If we can't get it immediately, wait
@@ -2740,7 +2869,7 @@ XLogFlush(XLogRecPtr record)
 			 * We're only calling it again to allow insertpos to be moved
 			 * further forward, not to actually wait for anyone.
 			 */
-			insertpos = WaitXLogInsertionsToFinish(insertpos);
+			insertpos = WaitXLogInsertionsToFinish(insertpos, true);
 		}
 
 		/* try to write/flush later additions to XLOG as well */
@@ -2919,7 +3048,7 @@ XLogBackgroundFlush(void)
 	START_CRIT_SECTION();
 
 	/* now wait for any in-progress insertions to finish and get write lock */
-	WaitXLogInsertionsToFinish(WriteRqst.Write);
+	WaitXLogInsertionsToFinish(WriteRqst.Write, true);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	if (WriteRqst.Write > LogwrtResult.Write ||
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7190156f2f..74a6b11866 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1500,9 +1500,6 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
  */
 bool
 WALRead(XLogReaderState *state,
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 087031e9dc..95ba656a06 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2910,6 +2910,7 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
+	Size		rbytes;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -3125,6 +3126,13 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
+	/* Read from WAL buffers, if available. */
+	rbytes = XLogReadFromBuffers(&output_message.data[output_message.len],
+								 startptr, nbytes, xlogreader->seg.ws_tli);
+	output_message.len += rbytes;
+	startptr += rbytes;
+	nbytes -= rbytes;
+
 	if (!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 301c5fa11f..f8c281c799 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,9 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count,
+								TimeLineID tli);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

