From bfa242f68ebdbab65c3ff196b3285f643b0cb96a Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Sun, 11 Jan 2026 15:57:16 +0800
Subject: [PATCH v4] Add WALRCV_CONNECTING and WALRCV_CONNECTED states to
 walreceiver

Previously, walreceiver set status='streaming' early in startup before
receiving any WAL, making it unreliable for health monitoring.

Introduce two intermediate states:

- WALRCV_CONNECTING: Walreceiver enters CONNECTING on startup
- WALRCV_CONNECTED: Walreceiver transitions from CONNECTING to
  CONNECTED after START_REPLICATION succeeded, awaiting first WAL record

The final transition from CONNECTED to STREAMING occurs when startup
successfully applies the first record, confirming end-to-end data flow.
This allows monitoring tools to distinguish connection establishment
from active WAL streaming.
---
 src/backend/access/transam/xlogrecovery.c  | 18 ++++++++++++++
 src/backend/replication/walreceiver.c      | 20 ++++++++++++---
 src/backend/replication/walreceiverfuncs.c | 29 +++++++++++++++++++++-
 src/include/replication/walreceiver.h      |  4 +++
 4 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 0b5f871abe7..e56585f3f29 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -250,6 +250,9 @@ static XLogSource currentSource = XLOG_FROM_ANY;
 static bool lastSourceFailed = false;
 static bool pendingWalRcvRestart = false;
 
+/* Guard to update walreceiver state only once per streaming session. */
+static bool walrcv_streaming_set = false;
+
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -1842,6 +1845,18 @@ PerformWalRecovery(void)
 			 */
 			ApplyWalRecord(xlogreader, record, &replayTLI);
 
+			/*
+			 * If we are reading from the stream and successfully applied
+			 * the first WAL record, transition to STREAMING state. This
+			 * confirms end-to-end data flow: the record was received,
+			 * parsed, and applied without error.
+			 */
+			if (!walrcv_streaming_set && readSource == XLOG_FROM_STREAM)
+			{
+				if (WalRcvSetStreaming())
+					walrcv_streaming_set = true;
+			}
+
 			/* Exit loop if we reached inclusive recovery target */
 			if (recoveryStopsAfter(xlogreader))
 			{
@@ -3702,6 +3717,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * one can hope...
 					 */
 
+					/* Reset our "streaming active" guard flag */
+					walrcv_streaming_set = false;
+
 					/*
 					 * We should be able to move to XLOG_FROM_STREAM only in
 					 * standby mode.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a41453530a1..b97dafa4a44 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -206,6 +206,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			break;
 
 		case WALRCV_WAITING:
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_STREAMING:
 		case WALRCV_RESTARTING:
 		default:
@@ -215,7 +217,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
+	walrcv->walRcvState = WALRCV_CONNECTING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -395,6 +397,11 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 							   LSN_FORMAT_ARGS(startpoint), startpointTLI));
 			first_stream = false;
 
+			SpinLockAcquire(&walrcv->mutex);
+			if (walrcv->walRcvState == WALRCV_CONNECTING)
+				walrcv->walRcvState = WALRCV_CONNECTED;
+			SpinLockRelease(&walrcv->mutex);
+
 			/* Initialize LogstreamResult and buffers for processing messages */
 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
 			initStringInfo(&reply_message);
@@ -650,7 +657,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
 	SpinLockAcquire(&walrcv->mutex);
 	state = walrcv->walRcvState;
-	if (state != WALRCV_STREAMING)
+	if (state != WALRCV_STREAMING && state != WALRCV_CONNECTED &&
+		state != WALRCV_CONNECTING)
 	{
 		SpinLockRelease(&walrcv->mutex);
 		if (state == WALRCV_STOPPING)
@@ -689,7 +697,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			 */
 			*startpoint = walrcv->receiveStart;
 			*startpointTLI = walrcv->receiveStartTLI;
-			walrcv->walRcvState = WALRCV_STREAMING;
+			walrcv->walRcvState = WALRCV_CONNECTING;
 			SpinLockRelease(&walrcv->mutex);
 			break;
 		}
@@ -792,6 +800,8 @@ WalRcvDie(int code, Datum arg)
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+		   walrcv->walRcvState == WALRCV_CONNECTING ||
+		   walrcv->walRcvState == WALRCV_CONNECTED ||
 		   walrcv->walRcvState == WALRCV_RESTARTING ||
 		   walrcv->walRcvState == WALRCV_STARTING ||
 		   walrcv->walRcvState == WALRCV_WAITING ||
@@ -1391,6 +1401,10 @@ WalRcvGetStateString(WalRcvState state)
 			return "stopped";
 		case WALRCV_STARTING:
 			return "starting";
+		case WALRCV_CONNECTING:
+			return "connecting";
+		case WALRCV_CONNECTED:
+			return "connected";
 		case WALRCV_STREAMING:
 			return "streaming";
 		case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index da8794cba7c..c9b6ed6e874 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,12 +179,37 @@ WalRcvStreaming(void)
 	}
 
 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-		state == WALRCV_RESTARTING)
+		state == WALRCV_RESTARTING || state == WALRCV_CONNECTING ||
+		state == WALRCV_CONNECTED)
 		return true;
 	else
 		return false;
 }
 
+/*
+ * Transition from CONNECTED to STREAMING state.
+ *
+ * This is called by the startup process after the first WAL record from
+ * the walreceiver is successfully applied, confirming end-to-end data
+ * flow: the record was received, parsed, and applied without error.
+ */
+bool
+WalRcvSetStreaming(void)
+{
+	WalRcvData *walrcv = WalRcv;
+	bool		set = false;
+
+	SpinLockAcquire(&walrcv->mutex);
+	if (walrcv->walRcvState == WALRCV_CONNECTED)
+	{
+		walrcv->walRcvState = WALRCV_STREAMING;
+		set = true;
+	}
+	SpinLockRelease(&walrcv->mutex);
+
+	return set;
+}
+
 /*
  * Stop walreceiver (if running) and wait for it to die.
  * Executed by the Startup process.
@@ -211,6 +236,8 @@ ShutdownWalRcv(void)
 			stopped = true;
 			break;
 
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_STREAMING:
 		case WALRCV_WAITING:
 		case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f3ad00fb6f3..70159ceb032 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,9 @@ typedef enum
 	WALRCV_STOPPED,				/* stopped and mustn't start up again */
 	WALRCV_STARTING,			/* launched, but the process hasn't
 								 * initialized yet */
+	WALRCV_CONNECTING,			/* connection starting, but not established yet */
+	WALRCV_CONNECTED,			/* replication connection established, but no WAL
+								 * streamed yet */
 	WALRCV_STREAMING,			/* walreceiver is streaming */
 	WALRCV_WAITING,				/* stopped streaming, waiting for orders */
 	WALRCV_RESTARTING,			/* asked to restart streaming */
@@ -492,6 +495,7 @@ extern void WalRcvForceReply(void);
 /* prototypes for functions in walreceiverfuncs.c */
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
+extern bool WalRcvSetStreaming(void);
 extern void ShutdownWalRcv(void);
 extern bool WalRcvStreaming(void);
 extern bool WalRcvRunning(void);
-- 
2.51.0

