From 95ba60dd3afdc134329f3b017588264103f85985 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 27 Jan 2024 05:48:30 +0000
Subject: [PATCH v21] Add XLogReadFromBuffers().

Allows reading directly from WAL buffers without a lock, avoiding the
need to wait for WAL flushing and read from the filesystem.

For now, the only caller is physical replication, but we can consider
expanding it to other callers as needed.
---
 src/backend/access/transam/xlog.c       | 106 ++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c |   3 -
 src/backend/replication/walsender.c     |   8 ++
 src/include/access/xlog.h               |   3 +
 4 files changed, 117 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..eea50bea3c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1705,6 +1705,112 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 	return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL directly from WAL buffers, if available.
+ *
+ * This function reads 'count' bytes of WAL from WAL buffers into 'buf'
+ * starting at location 'startptr' and returns total bytes read.
+ *
+ * The bytes read may be fewer than requested if any of the WAL buffers in the
+ * requested range have been evicted.
+ *
+ * This function returns immediately if the requested data is not from the
+ * current timeline, or if the server is in recovery.
+ */
+Size
+XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
+{
+	XLogRecPtr	ptr = startptr;
+	Size		nbytes = count;
+	Size		ntotal = 0;
+	char	   *dst = buf;
+
+	if (RecoveryInProgress() ||
+		tli != GetWALInsertionTimeLine())
+		return ntotal;
+
+	Assert(!XLogRecPtrIsInvalid(startptr));
+
+	/*
+	 * Loop through the buffers without a lock. For each buffer, atomically
+	 * read and verify the end pointer, then copy the data out, and finally
+	 * re-read and re-verify the end pointer.
+	 *
+	 * Once a page is evicted, it never returns to the WAL buffers, so if the
+	 * end pointer matches the expected end pointer before and after we copy
+	 * the data, then the right page must have been present during the data
+	 * copy. Read barriers are necessary to ensure that the data copy actually
+	 * happens between the two verification steps.
+	 *
+	 * If the verification fails, we simply terminate the loop and return with
+	 * the data that had been already copied out successfully.
+	 */
+	while (nbytes > 0)
+	{
+		XLogRecPtr	expectedEndPtr;
+		XLogRecPtr	endptr;
+		int			idx;
+		const char *page;
+		const char *data;
+		Size		nread;
+
+		idx = XLogRecPtrToBufIdx(ptr);
+		expectedEndPtr = ptr;
+		expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+		/*
+		 * First verification step: check that the correct page is present in
+		 * the WAL buffers.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		/*
+		 * We found WAL buffer page containing given XLogRecPtr. Get starting
+		 * address of the page and a pointer to the right location of given
+		 * XLogRecPtr in that page.
+		 */
+		page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+		data = page + ptr % XLOG_BLCKSZ;
+
+		/*
+		 * Ensure that the data copy and the first verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/* how much is available on this page to read? */
+		nread = Min(nbytes, XLOG_BLCKSZ - (data - page));
+
+		/* data copy */
+		memcpy(dst, data, nread);
+
+		/*
+		 * Ensure that the data copy and the second verification step are not
+		 * reordered.
+		 */
+		pg_read_barrier();
+
+		/*
+		 * Second verification step: check that the page we read from wasn't
+		 * evicted while we were copying the data.
+		 */
+		endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+		if (expectedEndPtr != endptr)
+			break;
+
+		dst += nread;
+		ptr += nread;
+		ntotal += nread;
+		nbytes -= nread;
+	}
+
+	Assert(ntotal <= count);
+
+	return ntotal;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7190156f2f..74a6b11866 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1500,9 +1500,6 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
  */
 bool
 WALRead(XLogReaderState *state,
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aa80f3de20..7efe9ad010 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2910,6 +2910,7 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
+	Size		rbytes;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -3125,6 +3126,13 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
+	/* Read from WAL buffers, if available. */
+	rbytes = XLogReadFromBuffers(&output_message.data[output_message.len],
+								 startptr, nbytes, xlogreader->seg.ws_tli);
+	output_message.len += rbytes;
+	startptr += rbytes;
+	nbytes -= rbytes;
+
 	if (!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 301c5fa11f..f8c281c799 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,9 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size XLogReadFromBuffers(char *buf, XLogRecPtr startptr, Size count,
+								TimeLineID tli);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
-- 
2.34.1

