From c957627ebbeb640fc39bbdd7f1f79f729ea7cddc Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 18 Aug 2023 11:57:37 +0000
Subject: [PATCH v28 3/3] pg_upgrade: Add check function for logical
 replication slots

To prevent data loss, pg_upgrade will fail if the old node has slots with the
status 'lost', or with unconsumed WAL records.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C, Peter Smith, Hou Zhijie
---
 src/bin/pg_upgrade/check.c                    | 87 ++++++++++++++++++
 src/bin/pg_upgrade/controldata.c              | 38 ++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  2 +
 .../t/003_logical_replication_slots.pl        | 91 +++++++++++++++++--
 4 files changed, 210 insertions(+), 8 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 4056b7a7a9..a013366280 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,6 +9,7 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogdefs.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
@@ -31,6 +32,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
 
 
 /*
@@ -108,6 +110,13 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	/*
+	 * Logical replication slots can be migrated since PG17. See comments atop
+	 * get_old_cluster_logical_slot_infos().
+	 */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+		check_old_cluster_for_valid_slots(live_check);
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -1471,3 +1480,81 @@ check_new_cluster_logical_replication_slots(void)
 
 	check_ok();
 }
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Make sure logical replication slots can be migrated to new cluster.
+ * Following points are checked:
+ *
+ *	- All logical replication slots are usable.
+ *	- All logical replication slots consumed all WALs, except a
+ *	  CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &old_cluster.dbarr.dbs[0];
+	PGconn	   *conn;
+
+	/* Quick exit if the cluster does not have logical slots. */
+	if (count_old_cluster_logical_slots() == 0)
+		return;
+
+	conn = connectToServer(&old_cluster, active_db->db_name);
+
+	prep_status("Checking for logical replication slots");
+
+	/* Check there are no logical replication slots with a 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status = 'lost' AND "
+							"temporary IS FALSE;");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is in 'lost' state.",
+			   PQgetvalue(res, i, i_slotname));
+
+	PQclear(res);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots with a state of 'lost' were detected.");
+
+	/*
+	 * Do additional checks if a live check is not required. This requires
+	 * that confirmed_flush_lsn of all the slots is the same as the latest
+	 * checkpoint location, but it would be satisfied only when the server has
+	 * been shut down.
+	 */
+	if (!live_check)
+	{
+		res = executeQueryOrDie(conn,
+								"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+								"WHERE confirmed_flush_lsn != '%X/%X' AND temporary IS FALSE;",
+								LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+
+		ntups = PQntuples(res);
+		i_slotname = PQfnumber(res, "slot_name");
+
+		for (i = 0; i < ntups; i++)
+			pg_log(PG_WARNING,
+				   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+				   PQgetvalue(res, i, i_slotname));
+
+		PQclear(res);
+
+		if (ntups)
+			pg_fatal("One or more logical replication slots still have unconsumed WAL records.");
+	}
+
+	PQfinish(conn);
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..808156ec09 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,44 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Read the latest checkpoint location if the cluster is PG17
+				 * or later. This is used for upgrading logical replication
+				 * slots.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+				{
+					char	   *slash = NULL;
+					uint32		upper_lsn,
+								lower_lsn;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;		/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					/*
+					 * The upper and lower part of LSN must be read separately
+					 * because it is stored as in %X/%X format.
+					 */
+					upper_lsn = strtoul(p, &slash, 16);
+					lower_lsn = strtoul(++slash, NULL, 16);
+
+					/* And combine them */
+					cluster->controldata.chkpnt_latest =
+						((uint64) upper_lsn << 32) | lower_lsn;
+				}
+			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index dae92ef6c0..e72318f500 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -10,6 +10,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "access/xlogdefs.h"
 #include "common/relpath.h"
 #include "libpq-fe.h"
 
@@ -242,6 +243,7 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index ae87c33708..640964c4e1 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -22,6 +22,10 @@ $old_publisher->init(allows_streaming => 'logical');
 my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
 $new_publisher->init(allows_streaming => 'replica');
 
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+
 my $bindir = $new_publisher->config_data('--bindir');
 
 # ------------------------------
@@ -65,13 +69,19 @@ $old_publisher->start;
 $old_publisher->safe_psql('postgres',
 	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
 );
+
+# 2. Consume WAL records to avoid another type of upgrade failure. It will be
+#	 tested in subsequent cases.
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);"
+);
 $old_publisher->stop;
 
-# 2. max_replication_slots is set to smaller than the number of slots (2)
+# 3. max_replication_slots is set to smaller than the number of slots (2)
 #	 present on the old cluster
 $new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# 3. wal_level is set correctly on the new cluster
+# 4. wal_level is set correctly on the new cluster
 $new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
 
 # pg_upgrade will fail because the new cluster has insufficient max_replication_slots
@@ -95,7 +105,7 @@ ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # ------------------------------
-# TEST: Successful upgrade
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
 
 # Preparations for the subsequent test:
 # 1. Remove the slot 'test_slot2', leaving only 1 slot remaining on the old
@@ -106,10 +116,57 @@ $old_publisher->safe_psql('postgres',
 	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
 );
 
-# 2. Consume WAL records
+# 2. Generate extra WAL records. Because these WAL records do not get consumed
+#	 it will cause the upcoming pg_upgrade test to fail.
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;"
+);
+$old_publisher->stop;
+
+# pg_upgrade will fail because the slot still has unconsumed WAL records
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+# Remove the remained slot
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot1');"
+);
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
 $old_publisher->safe_psql('postgres',
-	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)"
+	"CREATE PUBLICATION pub FOR ALL TABLES;"
 );
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
 $old_publisher->stop;
 
 # Actual run, successful upgrade is expected
@@ -129,11 +186,29 @@ command_ok(
 ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ removed after pg_upgrade success");
 
-# Check that the slot 'test_slot1' has migrated to the new cluster
+# Check that the slot 'sub' has migrated to the new cluster
 $new_publisher->start;
 my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot1|t), 'check the slot exists on new cluster');
-$new_publisher->stop;
+is($result, qq(sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+	'postgres', qq[
+	ALTER SUBSCRIPTION sub CONNECTION '$new_connstr';
+	ALTER SUBSCRIPTION sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
 
 done_testing();
-- 
2.27.0

