From 324a925b133700faa3c8ff162e5337677067c6ec Mon Sep 17 00:00:00 2001
From: Mahendra Singh Thalor <mahi6run@gmail.com>
Date: Tue, 26 Oct 2021 12:22:55 -0700
Subject: [PATCH] Subject: [PATCH v4] Start WAL receiver before startup process
 replays existing  WAL

If WAL receiver is started only after startup process finishes replaying
WAL already available in pg_wal, synchornous replication is impacted
adversly.  Consider a temporary network outage causing streaming
replication connection to break.  This leads to exit of WAL receiver
process.  If the startup process has fallen behind, it may take a long
time to finish replaying WAL and then start walreceiver again to
re-establish streaming replication.  Commits on master will have to wait
all this while for the standby to flush WAL upto commit LSN.

This experience can be alleviated if replication connection is
re-established as soon as it is found to be disconnected.  The patch
attempts to do so by starting WAL receiver as soon as consistent state
is reached.

The start point to request streaming from is set to the beginning of the
most recently flushed WAL segment.  To determine this, the startup
process scans first page of segments, stating from the segment currently
being read, one file at a time.

A new GUC, wal_receiver_start_condition, controls the new behavior.
When set to 'consistency', the new behavior takes effect.  The default
value is 'replay', which keeps the current behavior.

A TAP test is added to demonstrate the problem and validate the fix.

Note: rebased on commit 5fedf7417b69295294b154a219edd8a26eaa6ab6
---
 src/backend/access/transam/xlog.c             | 120 ++++++++++-
 src/backend/replication/walreceiver.c         |   1 +
 src/backend/replication/walreceiverfuncs.c    |  20 +-
 src/backend/utils/misc/guc.c                  |  17 ++
 src/include/replication/walreceiver.h         |   7 +
 src/test/recovery/t/018_replay_lag_syncrep.pl | 192 ++++++++++++++++++
 6 files changed, 346 insertions(+), 11 deletions(-)
 create mode 100644 src/test/recovery/t/018_replay_lag_syncrep.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f547efd294..77b1da91fd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5817,6 +5817,98 @@ CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI, XLogRecPtr EndOfLog)
 	}
 }
 
+static int
+XLogReadFirstPage(XLogRecPtr targetPagePtr, char *readBuf)
+{
+	int fd;
+	XLogSegNo segno;
+	char xlogfname[MAXFNAMELEN];
+
+	XLByteToSeg(targetPagePtr, segno, wal_segment_size);
+	elog(DEBUG3, "reading first page of segment %lu", segno);
+	fd = XLogFileReadAnyTLI(segno, LOG, XLOG_FROM_PG_WAL);
+	if (fd == -1)
+		return -1;
+
+	/* Seek to the beginning, we want to check if the first page is valid */
+	if (lseek(fd, (off_t) 0, SEEK_SET) < 0)
+	{
+		XLogFileName(xlogfname, ThisTimeLineID, segno, wal_segment_size);
+		close(fd);
+		elog(ERROR, "could not seek XLOG file %s, segment %lu: %m",
+			 xlogfname, segno);
+	}
+
+	if (read(fd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+	{
+		close(fd);
+		elog(ERROR, "could not read from XLOG file %s, segment %lu: %m",
+			 xlogfname, segno);
+	}
+
+	close(fd);
+	return XLOG_BLCKSZ;
+}
+
+/*
+ * Find the LSN that points to the beginning of the segment file most recently
+ * flushed by WAL receiver.  It will be used as start point by new instance of
+ * WAL receiver.
+ *
+ * The XLogReaderState abstraction is not suited for this purpose.  The
+ * interface it offers is XLogReadRecord, which is not suited to read a
+ * specific page from WAL.
+ */
+static XLogRecPtr
+GetLastLSN(XLogRecPtr lsn)
+{
+	XLogSegNo lastValidSegNo;
+	char readBuf[XLOG_BLCKSZ];
+
+	XLByteToSeg(lsn, lastValidSegNo, wal_segment_size);
+	/*
+	 * We know that lsn falls in a valid segment.  Start searching from the
+	 * next segment.
+	 */
+	XLogSegNoOffsetToRecPtr(lastValidSegNo+1, 0, wal_segment_size, lsn);
+
+	elog(LOG, "scanning WAL for last valid segment, starting from %X/%X",
+		 (uint32) (lsn >> 32), (uint32) lsn);
+
+	while (XLogReadFirstPage(lsn, readBuf) == XLOG_BLCKSZ)
+	{
+		/*
+		 * Validate page header, it must be a long header because we are
+		 * inspecting the first page in a segment file.  The big if condition
+		 * is modelled according to XLogReaderValidatePageHeader.
+		 */
+		XLogLongPageHeader longhdr = (XLogLongPageHeader) readBuf;
+		if ((longhdr->std.xlp_info & XLP_LONG_HEADER) == 0 ||
+			(longhdr->std.xlp_magic != XLOG_PAGE_MAGIC) ||
+			((longhdr->std.xlp_info & ~XLP_ALL_FLAGS) != 0) ||
+			(longhdr->xlp_sysid != ControlFile->system_identifier) ||
+			(longhdr->xlp_seg_size != wal_segment_size) ||
+			(longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) ||
+			(longhdr->std.xlp_pageaddr != lsn) ||
+			(longhdr->std.xlp_tli != ThisTimeLineID))
+		{
+			break;
+		}
+		XLByteToSeg(lsn, lastValidSegNo, wal_segment_size);
+		XLogSegNoOffsetToRecPtr(lastValidSegNo+1, 0, wal_segment_size, lsn);
+	}
+
+	/*
+	 * The last valid segment number is previous to the one that was just
+	 * found to be invalid.
+	 */
+	XLogSegNoOffsetToRecPtr(lastValidSegNo, 0, wal_segment_size, lsn);
+
+	elog(LOG, "last valid segment number = %lu", lastValidSegNo);
+
+	return lsn;
+}
+
 /*
  * Extract timestamp from WAL record.
  *
@@ -7534,6 +7626,28 @@ StartupXLOG(void)
 				/* Handle interrupt signals of startup process */
 				HandleStartupProcInterrupts();
 
+				/*
+				 * Start WAL receiver without waiting for startup process to
+				 * finish replay, so that streaming replication is established
+				 * at the earliest.  When the replication is configured to be
+				 * synchronous this would unblock commits waiting for WAL to
+				 * be written and/or flushed by synchronous standby.
+				 */
+				if (StandbyModeRequested &&
+					reachedConsistency &&
+					wal_receiver_start_condition == WAL_RCV_START_AT_CONSISTENCY &&
+					!WalRcvStreaming())
+				{
+					XLogRecPtr startpoint = GetLastLSN(record->xl_prev);
+					elog(LOG, "starting WAL receiver, startpoint %X/%X",
+						 (uint32) (startpoint >> 32), (uint32) startpoint);
+					RequestXLogStreaming(ThisTimeLineID,
+										 startpoint,
+										 PrimaryConnInfo,
+										 PrimarySlotName,
+										 wal_receiver_create_temp_slot);
+				}
+
 				/*
 				 * Pause WAL replay, if requested by a hot-standby session via
 				 * SetRecoveryPause().
@@ -12756,12 +12870,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 			case XLOG_FROM_ARCHIVE:
 			case XLOG_FROM_PG_WAL:
 
-				/*
-				 * WAL receiver must not be running when reading WAL from
-				 * archive or pg_wal.
-				 */
-				Assert(!WalRcvStreaming());
-
 				/* Close any old file we might have open. */
 				if (readFile >= 0)
 				{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b90e5ca98e..8cc9fdba6f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -89,6 +89,7 @@
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int 		wal_receiver_start_condition;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 6f0acbfdef..1df022e8c2 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -261,10 +261,6 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	SpinLockAcquire(&walrcv->mutex);
 
-	/* It better be stopped if we try to restart it */
-	Assert(walrcv->walRcvState == WALRCV_STOPPED ||
-		   walrcv->walRcvState == WALRCV_WAITING);
-
 	if (conninfo != NULL)
 		strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
 	else
@@ -287,12 +283,26 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 		walrcv->is_temp_slot = create_temp_slot;
 	}
 
+	/*
+	 * We used to assert that the WAL receiver is either in WALRCV_STOPPED or
+	 * in WALRCV_WAITING state.
+	 *
+	 * Such an assertion is not possible, now that this function is called by
+	 * startup process on two occasions.  One is just before starting to
+	 * replay WAL when starting up.  And the other is when it has finished
+	 * replaying all WAL in pg_xlog directory.  If the standby is starting up
+	 * after clean shutdown, there is not much WAL to be replayed and both
+	 * calls to this funcion can occur in quick succession.  By the time the
+	 * second request to start streaming is made, the WAL receiver can be in
+	 * any state.  We therefore cannot make any assertion on the state here.
+	 */
+
 	if (walrcv->walRcvState == WALRCV_STOPPED)
 	{
 		launch = true;
 		walrcv->walRcvState = WALRCV_STARTING;
 	}
-	else
+	else if (walrcv->walRcvState == WALRCV_WAITING)
 		walrcv->walRcvState = WALRCV_RESTARTING;
 	walrcv->startTime = now;
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e91d5a3cfd..b2de8039c7 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -247,6 +247,12 @@ static ConfigVariable *ProcessConfigFileInternal(GucContext context,
  * NOTE! Option values may not contain double quotes!
  */
 
+const struct config_enum_entry wal_rcv_start_options[] = {
+	{"catchup", WAL_RCV_START_AT_CATCHUP, true},
+	{"consistency", WAL_RCV_START_AT_CONSISTENCY, true},
+	{NULL, 0, false}
+};
+
 static const struct config_enum_entry bytea_output_options[] = {
 	{"escape", BYTEA_OUTPUT_ESCAPE, false},
 	{"hex", BYTEA_OUTPUT_HEX, false},
@@ -5007,6 +5013,17 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_receiver_start_condition", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("When to start WAL receiver."),
+			NULL,
+		},
+		&wal_receiver_start_condition,
+		WAL_RCV_START_AT_CATCHUP,
+		wal_rcv_start_options,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0b607ed777..362b681ae5 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -24,10 +24,17 @@
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
 
+typedef enum
+{
+	WAL_RCV_START_AT_CATCHUP, /* start a WAL receiver  after replaying all WAL files */
+	WAL_RCV_START_AT_CONSISTENCY /* start a WAL receiver once consistency has been reached */
+} WalRcvStartCondition;
+
 /* user-settable parameters */
 extern int	wal_receiver_status_interval;
 extern int	wal_receiver_timeout;
 extern bool hot_standby_feedback;
+extern int  wal_receiver_start_condition;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
diff --git a/src/test/recovery/t/018_replay_lag_syncrep.pl b/src/test/recovery/t/018_replay_lag_syncrep.pl
new file mode 100644
index 0000000000..e82d8a0a64
--- /dev/null
+++ b/src/test/recovery/t/018_replay_lag_syncrep.pl
@@ -0,0 +1,192 @@
+# Test impact of replay lag on synchronous replication.
+#
+# Replay lag is induced using recovery_min_apply_delay GUC.  Two ways
+# of breaking replication connection are covered - killing walsender
+# and restarting standby.  The test expects that replication
+# connection is restored without being affected due to replay lag.
+# This is validated by performing commits on master after replication
+# connection is disconnected and checking that they finish within a
+# few seconds.
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+# Query checking sync_priority and sync_state of each standby
+my $check_sql =
+  "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication ORDER BY application_name;";
+
+# Check that sync_state of a standby is expected (waiting till it is).
+# If $setting is given, synchronous_standby_names is set to it and
+# the configuration file is reloaded before the test.
+sub test_sync_state
+{
+	my ($self, $expected, $msg, $setting) = @_;
+
+	if (defined($setting))
+	{
+		$self->safe_psql('postgres',
+						 "ALTER SYSTEM SET synchronous_standby_names = '$setting';");
+		$self->reload;
+	}
+
+	ok($self->poll_query_until('postgres', $check_sql, $expected), $msg);
+	return;
+}
+
+# Start a standby and check that it is registered within the WAL sender
+# array of the given primary.  This polls the primary's pg_stat_replication
+# until the standby is confirmed as registered.
+sub start_standby_and_wait
+{
+	my ($master, $standby) = @_;
+	my $master_name  = $master->name;
+	my $standby_name = $standby->name;
+	my $query =
+	  "SELECT count(1) = 1 FROM pg_stat_replication WHERE application_name = '$standby_name'";
+
+	$standby->start;
+
+	print("### Waiting for standby \"$standby_name\" on \"$master_name\"\n");
+	$master->poll_query_until('postgres', $query);
+	return;
+}
+
+# Initialize master node
+my $node_master = get_new_node('master');
+my @extra = (q[--wal-segsize], q[1]);
+$node_master->init(allows_streaming => 1, extra => \@extra);
+$node_master->start;
+my $backup_name = 'master_backup';
+
+# Setup physical replication slot for streaming replication
+$node_master->safe_psql('postgres',
+	q[SELECT pg_create_physical_replication_slot('phys_slot', true, false);]);
+
+# Take backup
+$node_master->backup($backup_name);
+
+# Create standby linking to master
+my $node_standby = get_new_node('standby');
+$node_standby->init_from_backup($node_master, $backup_name,
+								has_streaming => 1);
+$node_standby->append_conf('postgresql.conf',
+						   q[primary_slot_name = 'phys_slot']);
+# Enable debug logging in standby
+$node_standby->append_conf('postgresql.conf',
+						   q[log_min_messages = debug5]);
+# Enable early WAL receiver startup
+$node_standby->append_conf('postgresql.conf',
+						   q[wal_receiver_start_condition = 'consistency']);
+
+start_standby_and_wait($node_master, $node_standby);
+
+# Make standby synchronous
+test_sync_state(
+	$node_master,
+	qq(standby|1|sync),
+	'standby is synchronous',
+	'standby');
+
+# Slow down WAL replay by inducing 10 seconds sleep before replaying
+# a commit WAL record.
+$node_standby->safe_psql('postgres',
+						 'ALTER SYSTEM set recovery_min_apply_delay TO 10000;');
+$node_standby->reload;
+
+# Commit some transactions on master to induce replay lag in standby.
+$node_master->safe_psql('postgres', 'CREATE TABLE replay_lag_test(a int);');
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (101);');
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (102);');
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (103);');
+
+# Obtain WAL sender PID and kill it.
+my $walsender_pid = $node_master->safe_psql(
+	'postgres',
+	q[select active_pid from pg_get_replication_slots() where slot_name = 'phys_slot']);
+
+# Kill walsender, so that the replication connection breaks.
+kill 'SIGTERM', $walsender_pid;
+
+# The replication connection should be re-establised much earlier than
+# what it takes to finish replay.  Try to commit a transaction with a
+# timeout of recovery_min_apply_delay + 2 seconds.  The timeout should
+# not be hit.
+my $timed_out = 0;
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (1);',
+	timeout => 12,
+	timed_out => \$timed_out);
+
+is($timed_out, 0, 'insert after WAL receiver restart');
+
+my $replay_lag = $node_master->safe_psql(
+	'postgres',
+	'select flush_lsn - replay_lsn from pg_stat_replication');
+print("replay lag after WAL receiver restart: $replay_lag\n");
+ok($replay_lag > 0, 'replication resumes in spite of replay lag');
+
+# Break the replication connection by restarting standby.
+$node_standby->restart;
+
+# Like in previous test, the replication connection should be
+# re-establised before pending WAL replay is finished.  Try to commit
+# a transaction with recovery_min_apply_delay + 2 second timeout.  The
+# timeout should not be hit.
+$timed_out = 0;
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (2);',
+	timeout => 12,
+	timed_out => \$timed_out);
+
+is($timed_out, 0, 'insert after standby restart');
+$replay_lag = $node_master->safe_psql(
+	'postgres',
+	'select flush_lsn - replay_lsn from pg_stat_replication');
+print("replay lag after standby restart: $replay_lag\n");
+ok($replay_lag > 0, 'replication starts in spite of replay lag');
+
+# Reset the delay so that the replay process is no longer slowed down.
+$node_standby->safe_psql('postgres', 'ALTER SYSTEM set recovery_min_apply_delay to 0;');
+$node_standby->reload;
+
+# Switch to a new WAL file and see if things work well.
+$node_master->safe_psql(
+	'postgres',
+	'select pg_switch_wal();');
+
+# Transactions should work fine on master.
+$timed_out = 0;
+$node_master->safe_psql(
+	'postgres',
+	'insert into replay_lag_test values (3);',
+	timeout => 1,
+	timed_out => \$timed_out);
+
+# Wait for standby to replay all WAL.
+$node_master->wait_for_catchup('standby', 'replay',
+							   $node_master->lsn('insert'));
+
+# Standby should also have identical content.
+my $count_sql = q[select count(*) from replay_lag_test;];
+my $expected = q[6];
+ok($node_standby->poll_query_until('postgres', $count_sql, $expected), 'standby query');
+
+# Test that promotion followed by query works.
+$node_standby->promote;
+$node_master->stop;
+$node_standby->safe_psql('postgres', 'insert into replay_lag_test values (4);');
+
+$expected = q[7];
+ok($node_standby->poll_query_until('postgres', $count_sql, $expected),
+   'standby query after promotion');
-- 
2.17.1

