From 5826ad95c980f4543517cb7014af21bd9a1aa917 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 07:25:22 +0000
Subject: [PATCH v22 1/4] Add WALReadFromBuffers().

Allows reading directly from WAL buffers without a lock, avoiding the
need to wait for WAL flushing and read from the filesystem.

For now, the only caller is physical replication, but we can consider
expanding it to other callers as needed.
---
 src/backend/access/transam/xlog.c       | 106 ++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c |   3 -
 src/backend/replication/walsender.c     |  12 ++-
 src/include/access/xlog.h               |   3 +
 4 files changed, 120 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..0d87a66c59 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1705,6 +1705,112 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL directly from WAL buffers, if available.
+ *
+ * This function reads 'count' bytes of WAL from WAL buffers into 'buf'
+ * starting at location 'startptr' and returns total bytes read.
+ *
+ * The bytes read may be fewer than requested if any of the WAL buffers in the
+ * requested range have been evicted.
+ *
+ * This function returns immediately if the requested data is not from the
+ * current timeline, or if the server is in recovery.
+ */
+Size
+WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
+{
+	XLogRecPtr	ptr = startptr;
+	Size		nbytes = count;
+	Size		ntotal = 0;
+	char	   *dst = buf;
+
+	if (RecoveryInProgress() ||
+		tli != GetWALInsertionTimeLine())
+		return ntotal;
+
+	Assert(!XLogRecPtrIsInvalid(startptr));
+
+	/*
+	 * Loop through the buffers without a lock. For each buffer, atomically
+	 * read and verify the end pointer, then copy the data out, and finally
+	 * re-read and re-verify the end pointer.
+	 *
+	 * Once a page is evicted, it never returns to the WAL buffers, so if the
+	 * end pointer matches the expected end pointer before and after we copy
+	 * the data, then the right page must have been present during the data
+	 * copy. Read barriers are necessary to ensure that the data copy actually
+	 * happens between the two verification steps.
+	 *
+	 * If the verification fails, we simply terminate the loop and return with
+	 * the data that had been already copied out successfully.
+	 */
+	while (nbytes > 0)
+	{
+		XLogRecPtr	expectedEndPtr;
+		XLogRecPtr	endptr;
+		int			idx;
+		const char *page;
+		const char *data;
+		Size		nread;
+
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+		/*
+		 * First verification step: check that the correct page is present in
+		 * the WAL buffers.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/*
+		 * Ensure that the data copy and the first verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/* how much is available on this page to read? */
+		nread = Min(nbytes, XLOG_BLCKSZ - (data - page));
+
+		/* data copy */
+		memcpy(dst, data, nread);
+
+		/*
+		 * Ensure that the data copy and the second verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/*
+		 * Second verification step: check that the page we read from wasn't
+		 * evicted while we were copying the data.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		dst += nread;
+		ptr += nread;
+		ntotal += nread;
+		nbytes -= nread;
+	}
+
+	Assert(ntotal <= count);
+
+	return ntotal;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7190156f2f..74a6b11866 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1500,9 +1500,6 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
  */
 bool
 WALRead(XLogReaderState *state,
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 77c8baa32a..0551f0f2d8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2966,6 +2966,7 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
+	Size		rbytes;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -3181,7 +3182,16 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
-	if (!WALRead(xlogreader,
+	/* Attempt to read WAL from WAL buffers first. */
+	rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
+								startptr, nbytes, xlogreader->seg.ws_tli);
+	output_message.len += rbytes;
+	startptr += rbytes;
+	nbytes -= rbytes;
+
+	/* Now read the remaining WAL from WAL file. */
+	if (nbytes > 0 &&
+		!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
 				 nbytes,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 301c5fa11f..6d5de9812c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,9 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count,
+							   TimeLineID tli);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

