From 2000834724f56e80e32a273408f595e3479aa784 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Thu, 22 Jan 2026 16:57:40 +0800
Subject: [PATCH v6] Add WALRCV_CONNECTING state to walreceiver

Previously, walreceiver set its state to WALRCV_STREAMING immediately
at startup, before actually establishing a replication connection. This
was misleading for monitoring, as pg_stat_wal_receiver would show
streaming even while the connection was still being established.

Introduce WALRCV_CONNECTING state to accurately reflect the period
between walreceiver startup and successful START_REPLICATION. The
transition to WALRCV_STREAMING now occurs only after
walrcv_startstreaming() returns successfully.

Update pg_stat_wal_receiver documentation to describe the new state as well.
---
 doc/src/sgml/monitoring.sgml               |  6 +++++
 src/backend/replication/walreceiver.c      | 27 ++++++++++++++++++----
 src/backend/replication/walreceiverfuncs.c |  3 ++-
 src/include/replication/walreceiver.h      |  2 ++
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 88450facebd..b77d189a500 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1751,6 +1751,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage
           but is not yet initialized.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>connecting</literal>: WAL receiver is connecting to the
+          upstream server, replication has not yet started.
+         </para>
+        </listitem>
         <listitem>
          <para>
           <literal>stopping</literal>: WAL receiver has been requested to
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a41453530a1..d0e8db5ecc0 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -161,6 +161,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv;
+	WalRcvState state;
 	TimestampTz now;
 	char	   *err;
 	char	   *sender_host = NULL;
@@ -180,7 +181,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	Assert(walrcv != NULL);
 
 	/*
-	 * Mark walreceiver as running in shared memory.
+	 * Mark walreceiver as connecting in shared memory.
 	 *
 	 * Do this as early as possible, so that if we fail later on, we'll set
 	 * state to STOPPED. If we die before this, the startup process will keep
@@ -205,6 +206,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			/* The usual case */
 			break;
 
+		case WALRCV_CONNECTING:
 		case WALRCV_WAITING:
 		case WALRCV_STREAMING:
 		case WALRCV_RESTARTING:
@@ -215,7 +217,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
+	walrcv->walRcvState = WALRCV_CONNECTING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -385,6 +387,20 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 		options.proto.physical.startpointTLI = startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
+			/* Connection established, switch to streaming state */
+			SpinLockAcquire(&walrcv->mutex);
+			state = walrcv->walRcvState;
+			if (state == WALRCV_CONNECTING)
+				walrcv->walRcvState = WALRCV_STREAMING;
+			SpinLockRelease(&walrcv->mutex);
+
+			if (state != WALRCV_CONNECTING)
+			{
+				if (state == WALRCV_STOPPING)
+					proc_exit(0);
+				elog(FATAL, "unexpected walreceiver state");
+			}
+
 			if (first_stream)
 				ereport(LOG,
 						errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
@@ -650,7 +666,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
 	SpinLockAcquire(&walrcv->mutex);
 	state = walrcv->walRcvState;
-	if (state != WALRCV_STREAMING)
+	if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
 	{
 		SpinLockRelease(&walrcv->mutex);
 		if (state == WALRCV_STOPPING)
@@ -689,7 +705,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			 */
 			*startpoint = walrcv->receiveStart;
 			*startpointTLI = walrcv->receiveStartTLI;
-			walrcv->walRcvState = WALRCV_STREAMING;
+			walrcv->walRcvState = WALRCV_CONNECTING;
 			SpinLockRelease(&walrcv->mutex);
 			break;
 		}
@@ -792,6 +808,7 @@ WalRcvDie(int code, Datum arg)
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+		   walrcv->walRcvState == WALRCV_CONNECTING ||
 		   walrcv->walRcvState == WALRCV_RESTARTING ||
 		   walrcv->walRcvState == WALRCV_STARTING ||
 		   walrcv->walRcvState == WALRCV_WAITING ||
@@ -1391,6 +1408,8 @@ WalRcvGetStateString(WalRcvState state)
 			return "stopped";
 		case WALRCV_STARTING:
 			return "starting";
+		case WALRCV_CONNECTING:
+			return "connecting";
 		case WALRCV_STREAMING:
 			return "streaming";
 		case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index da8794cba7c..42e3e170bc0 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,7 +179,7 @@ WalRcvStreaming(void)
 	}
 
 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-		state == WALRCV_RESTARTING)
+		state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
 		return true;
 	else
 		return false;
@@ -211,6 +211,7 @@ ShutdownWalRcv(void)
 			stopped = true;
 			break;
 
+		case WALRCV_CONNECTING:
 		case WALRCV_STREAMING:
 		case WALRCV_WAITING:
 		case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f3ad00fb6f3..4387821469b 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,8 @@ typedef enum
 	WALRCV_STOPPED,				/* stopped and mustn't start up again */
 	WALRCV_STARTING,			/* launched, but the process hasn't
 								 * initialized yet */
+	WALRCV_CONNECTING,			/* connecting to upstream server, replication
+								 * not yet started */
 	WALRCV_STREAMING,			/* walreceiver is streaming */
 	WALRCV_WAITING,				/* stopped streaming, waiting for orders */
 	WALRCV_RESTARTING,			/* asked to restart streaming */
-- 
2.51.0

