From 757211818ac8e39a24d990b28c2ab755065b1eab Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 21 Feb 2024 11:11:44 +0000
Subject: [PATCH v12 3/3] Add WAL Copy pointer, representing data copied to WAL
 buffers

---
 src/backend/access/transam/xlog.c | 26 +++++++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4e7720f71a..888d441f02 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -325,6 +325,7 @@ static bool doPageWrites;
 
 typedef struct XLogwrtAtomic
 {
+	pg_atomic_uint64 Copy;		/* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 Write;		/* last byte + 1 written out */
 	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
 } XLogwrtAtomic;
@@ -1495,6 +1496,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1503,6 +1505,12 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	pg_read_barrier();
+	copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1582,6 +1590,9 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Copy, finishedUpto);
+
 	return finishedUpto;
 }
 
@@ -1723,13 +1734,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	pg_read_barrier();
+	copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy);
+	if (startptr + count > copyptr)
+		ereport(WARNING,
+				(errmsg("request to read past end of generated WAL; request %X/%X, current position %X/%X",
+						LSN_FORMAT_ARGS(startptr + count), LSN_FORMAT_ARGS(copyptr))));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -4912,6 +4934,7 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Copy, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr);
 
@@ -5936,6 +5959,7 @@ StartupXLOG(void)
 	 * because no other process can be reading or writing WAL yet.
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Copy, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
-- 
2.34.1

