From 8a8a0c7da5d4c36a71ff868ba5531c1e9a1a0b27 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Sep 2023 06:01:40 +0000
Subject: [PATCH v37 2/2] Reads all WAL records ahead confirmed_flush_lsn

---
 contrib/pg_walinspect/pg_walinspect.c         | 94 -------------------
 doc/src/sgml/ref/pgupgrade.sgml               |  5 +-
 src/backend/access/transam/xlogutils.c        | 92 ++++++++++++++++++
 src/backend/utils/adt/pg_upgrade_support.c    | 87 +++++++++++++++++
 src/bin/pg_upgrade/check.c                    |  8 +-
 src/bin/pg_upgrade/controldata.c              | 39 --------
 src/bin/pg_upgrade/info.c                     |  6 +-
 src/bin/pg_upgrade/pg_upgrade.h               |  1 -
 .../t/003_logical_replication_slots.pl        | 20 ++++
 src/include/access/xlogutils.h                |  3 +
 src/include/catalog/pg_proc.dat               |  6 ++
 11 files changed, 216 insertions(+), 145 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..49f4f92e98 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -40,8 +40,6 @@ PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
 
 static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
 static XLogRecPtr GetCurrentLSN(void);
-static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
-static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
 static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
 							 bool *nulls, uint32 ncols);
 static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
@@ -84,98 +82,6 @@ GetCurrentLSN(void)
 	return curr_lsn;
 }
 
-/*
- * Initialize WAL reader and identify first valid LSN.
- */
-static XLogReaderState *
-InitXLogReaderState(XLogRecPtr lsn)
-{
-	XLogReaderState *xlogreader;
-	ReadLocalXLogPageNoWaitPrivate *private_data;
-	XLogRecPtr	first_valid_record;
-
-	/*
-	 * Reading WAL below the first page of the first segments isn't allowed.
-	 * This is a bootstrap WAL page and the page_read callback fails to read
-	 * it.
-	 */
-	if (lsn < XLOG_BLCKSZ)
-		ereport(ERROR,
-				(errmsg("could not read WAL at LSN %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	private_data = (ReadLocalXLogPageNoWaitPrivate *)
-		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
-
-	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
-											   .segment_open = &wal_segment_open,
-											   .segment_close = &wal_segment_close),
-									private_data);
-
-	if (xlogreader == NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("out of memory"),
-				 errdetail("Failed while allocating a WAL reading processor.")));
-
-	/* first find a valid recptr to start from */
-	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
-
-	if (XLogRecPtrIsInvalid(first_valid_record))
-		ereport(ERROR,
-				(errmsg("could not find a valid record after %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	return xlogreader;
-}
-
-/*
- * Read next WAL record.
- *
- * By design, to be less intrusive in a running system, no slot is allocated
- * to reserve the WAL we're about to read. Therefore this function can
- * encounter read errors for historical WAL.
- *
- * We guard against ordinary errors trying to read WAL that hasn't been
- * written yet by limiting end_lsn to the flushed WAL, but that can also
- * encounter errors if the flush pointer falls in the middle of a record. In
- * that case we'll return NULL.
- */
-static XLogRecord *
-ReadNextXLogRecord(XLogReaderState *xlogreader)
-{
-	XLogRecord *record;
-	char	   *errormsg;
-
-	record = XLogReadRecord(xlogreader, &errormsg);
-
-	if (record == NULL)
-	{
-		ReadLocalXLogPageNoWaitPrivate *private_data;
-
-		/* return NULL, if end of WAL is reached */
-		private_data = (ReadLocalXLogPageNoWaitPrivate *)
-			xlogreader->private_data;
-
-		if (private_data->end_of_wal)
-			return NULL;
-
-		if (errormsg)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
-		else
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
-	}
-
-	return record;
-}
-
 /*
  * Output values that make up a row describing caller's WAL record.
  *
diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 4e2281bae4..2588d6d7b8 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -418,10 +418,7 @@ make prefix=/usr/local/pgsql.new install
      </listitem>
      <listitem>
       <para>
-       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>confirmed_flush_lsn</structfield>
-       of all slots on the old cluster must be the same as the latest
-       checkpoint location. This ensures that all the data has been replicated
-       before the upgrade.
+       Old cluster has replicated all the changes replicated to subscribers.
       </para>
      </listitem>
      <listitem>
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 43f7b31205..e2cabfef32 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1048,3 +1048,95 @@ WALReadRaiseError(WALReadError *errinfo)
 						errinfo->wre_req)));
 	}
 }
+
+/*
+ * Initialize WAL reader and identify first valid LSN.
+ */
+XLogReaderState *
+InitXLogReaderState(XLogRecPtr lsn)
+{
+	XLogReaderState *xlogreader;
+	ReadLocalXLogPageNoWaitPrivate *private_data;
+	XLogRecPtr	first_valid_record;
+
+	/*
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+
+	private_data = (ReadLocalXLogPageNoWaitPrivate *)
+		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									private_data);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+		ereport(ERROR,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+
+	return xlogreader;
+}
+
+/*
+ * Read next WAL record.
+ *
+ * By design, to be less intrusive in a running system, no slot is allocated
+ * to reserve the WAL we're about to read. Therefore this function can
+ * encounter read errors for historical WAL.
+ *
+ * We guard against ordinary errors trying to read WAL that hasn't been
+ * written yet by limiting end_lsn to the flushed WAL, but that can also
+ * encounter errors if the flush pointer falls in the middle of a record. In
+ * that case we'll return NULL.
+ */
+XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		ReadLocalXLogPageNoWaitPrivate *private_data;
+
+		/* return NULL, if end of WAL is reached */
+		private_data = (ReadLocalXLogPageNoWaitPrivate *)
+			xlogreader->private_data;
+
+		if (private_data->end_of_wal)
+			return NULL;
+
+		if (errormsg)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X: %s",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+	}
+
+	return record;
+}
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f..02078920f3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,14 +11,22 @@
 
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
+#include "storage/standbydefs.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
 
 
 #define CHECK_IS_BINARY_UPGRADE									\
@@ -29,6 +37,9 @@ do {															\
 				 errmsg("function can only be called when server is in binary upgrade mode"))); \
 } while (0)
 
+#define CHECK_WAL_RECORD(rmgrid, info, expected_rmgrid, expected_info) \
+	(rmgrid == expected_rmgrid && info == expected_info)
+
 Datum
 binary_upgrade_set_next_pg_tablespace_oid(PG_FUNCTION_ARGS)
 {
@@ -261,3 +272,79 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+/*
+ * Get WAL records from start LSN and check their type.
+ *
+ * This function is used to verify that there are no WAL records (except some
+ * types) after confirmed_flush_lsn of logical slots, which means all the
+ * changes were replicated to the subscriber. There is a possibility that some
+ * WALs are inserted after logical waslenders exit, so such types would be
+ * ignored.
+ *
+ * Currently following types of WAL records are ignored:
+ *		- XLOG_CHECKPOINT_SHUTDOWN
+ *		- XLOG_CHECKPOINT_ONLINE
+ *		- XLOG_RUNNING_XACTS
+ *		- XLOG_FPI_FOR_HINT
+ *		- XLOG_HEAP2_PRUNE
+ */
+Datum
+binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	  start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	  curr_lsn = GetFlushRecPtr(NULL);
+	XLogReaderState *xlogreader;
+	bool			initial_record = true;
+	bool			result = true;
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	/* Quick exit if the given lsn is larger than current one */
+	if (start_lsn >= curr_lsn)
+		PG_RETURN_BOOL(true);
+
+	xlogreader = InitXLogReaderState(start_lsn);
+
+	/* Read records till end of WAL */
+	while (result && ReadNextXLogRecord(xlogreader))
+	{
+		RmgrIds		   rmid;
+		uint8		   info;
+
+		/*
+		 * XXX: check the type of WAL. Currently XLOG info is directly
+		 * extracted, but it may be better to use the descriptor instead.
+		 */
+		rmid = XLogRecGetRmid(xlogreader);
+		info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+		if (initial_record)
+		{
+			/* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */
+			if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
+								  XLOG_CHECKPOINT_SHUTDOWN))
+				result = false;
+
+			initial_record = false;
+			continue;
+		}
+
+		/*
+		 * XXX: There is a possibility that following records may be
+		 * generated during the upgrade.
+		 */
+		if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, XLOG_CHECKPOINT_SHUTDOWN) &&
+			!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, XLOG_CHECKPOINT_ONLINE) &&
+			!CHECK_WAL_RECORD(rmid, info, RM_STANDBY_ID, XLOG_RUNNING_XACTS) &&
+			!CHECK_WAL_RECORD(rmid, info, RM_HEAP2_ID, XLOG_HEAP2_PRUNE))
+			result = false;
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	pfree(xlogreader->private_data);
+	XLogReaderFree(xlogreader);
+
+	PG_RETURN_BOOL(result);
+}
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index b1424fdf9c..df1ce67fc0 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1480,8 +1480,8 @@ check_new_cluster_logical_replication_slots(void)
  * Following points are checked:
  *
  *	- All logical replication slots are usable.
- *	- All logical replication slots consumed all WALs, except a
- *	  CHECKPOINT_SHUTDOWN record.
+ *	- All logical replication slots consumed all WALs, except some acceptable
+ *	  types.
  */
 static void
 check_old_cluster_for_valid_slots(bool live_check)
@@ -1521,8 +1521,8 @@ check_old_cluster_for_valid_slots(bool live_check)
 			}
 
 			/*
-			 * Do additional checks to ensure that confirmed_flush LSN of all
-			 * the slots is the same as the latest checkpoint location.
+			 * Do additional checks to ensure that all logical replication
+			 * slots have reached the current WAL position.
 			 *
 			 * Note: This can be satisfied only when the old cluster has been
 			 * shut down, so we skip this for live checks.
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index f8f823e2be..4beb65ab22 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,45 +169,6 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
-
-			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
-			{
-				/*
-				 * Read the latest checkpoint location if the cluster is PG17
-				 * or later. This is used for upgrading logical replication
-				 * slots. Currently, we need it only for the old cluster but
-				 * for simplicity chose not to have additional checks.
-				 */
-				if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
-				{
-					char	   *slash = NULL;
-					uint32		upper_lsn,
-								lower_lsn;
-
-					p = strchr(p, ':');
-
-					if (p == NULL || strlen(p) <= 1)
-						pg_fatal("%d: controldata retrieval problem", __LINE__);
-
-					p++;		/* remove ':' char */
-
-					p = strpbrk(p, "01234567890ABCDEF");
-
-					if (p == NULL || strlen(p) <= 1)
-						pg_fatal("%d: controldata retrieval problem", __LINE__);
-
-					/*
-					 * The upper and lower part of LSN must be read separately
-					 * because it is stored as in %X/%X format.
-					 */
-					upper_lsn = strtoul(p, &slash, 16);
-					lower_lsn = strtoul(++slash, NULL, 16);
-
-					/* And combine them */
-					cluster->controldata.chkpnt_latest =
-						((uint64) upper_lsn << 32) | lower_lsn;
-				}
-			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index f7b0deca87..5d25d1604e 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -647,12 +647,12 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
-							"(confirmed_flush_lsn = '%X/%X') as caught_up, conflicting as invalid "
+							"pg_catalog.binary_upgrade_validate_wal_record_types_after_lsn(confirmed_flush_lsn) as caught_up, "
+							"conflicting as invalid "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
-							"temporary IS FALSE;",
-							LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+							"temporary IS FALSE;");
 
 	num_slots = PQntuples(res);
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index f5ce6c3b4d..8a7f56831e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -246,7 +246,6 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
-	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 01cb04ca12..b91fb2f88f 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -169,6 +169,26 @@ $subscriber->wait_for_subscription_sync($old_publisher, 'sub');
 $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
 $old_publisher->stop;
 
+# Dry run, successful check is expected. This is not live check, so shutdown
+# checkpoint record would be inserted. We want to test that
+# binary_upgrade_validate_wal_record_types_after_lsn() skips the WAL and then
+# upcoming pg_upgrade would succeed.
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--check'
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
 # Actual run, successful upgrade is expected
 command_ok(
 	[
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5b77b11f50..1cf31aa24f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -115,4 +115,7 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
+extern XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f3d843222b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11370,6 +11370,12 @@
   proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_validate_wal_record_types_after_lsn',
+  prorows => '10', proretset => 't', provolatile => 's', prorettype => 'bool',
+  proargtypes => 'pg_lsn', proallargtypes => '{pg_lsn,bool}',
+  proargmodes => '{i,o}', proargnames => '{start_lsn,is_ok}',
+  prosrc => 'binary_upgrade_validate_wal_record_types_after_lsn' },
 
 # conversion functions
 { oid => '4302',
-- 
2.27.0

