From 165c9a9c5ecf300c2be1b79e2e480807416b2fae Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 9 Dec 2019 17:22:07 +1300
Subject: [PATCH 3/5] Add WalRcvGetWriteRecPtr() (new definition).

A later patch will read received WAL to prefetch referenced blocks,
without waiting for the data to be flushed to disk.  To do that,
it needs to be able to see the write pointer advancing in shared
memory.

The function formerly bearing name was recently renamed to
WalRcvGetFlushRecPtr(), which better described what it does.
---
 src/backend/replication/walreceiver.c      |  5 +++++
 src/backend/replication/walreceiverfuncs.c | 10 ++++++++++
 src/include/replication/walreceiver.h      |  9 +++++++++
 3 files changed, 24 insertions(+)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2ab15c3cbb..88a51ba35f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -244,6 +244,8 @@ WalReceiverMain(void)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
@@ -985,6 +987,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 
 		LogstreamResult.Write = recptr;
 	}
+
+	/* Update shared-memory status */
+	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 }
 
 /*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 9bce63b534..14e9a6245a 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -310,6 +310,16 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 	return recptr;
 }
 
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+	return pg_atomic_read_u64(&WalRcv->writtenUpto);
+}
+
 /*
  * Returns the replication apply delay in ms or -1
  * if the apply delay info is not available
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 147b374a26..1e8f304dc4 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -16,6 +16,7 @@
 #include "access/xlogdefs.h"
 #include "getaddrinfo.h"		/* for NI_MAXHOST */
 #include "pgtime.h"
+#include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/latch.h"
@@ -83,6 +84,13 @@ typedef struct
 	XLogRecPtr	receivedUpto;
 	TimeLineID	receivedTLI;
 
+	/*
+	 * Same as above, but advanced after writing and before flushing, without
+	 * the need to acquire the spin lock.  Data can be read by another process
+	 * up to this point, but shouldn't be used for data integrity purposes.
+	 */
+	pg_atomic_uint64 writtenUpto;
+
 	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
@@ -323,6 +331,7 @@ extern bool WalRcvRunning(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname);
 extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.23.0

