From 1e885ef66a8c2bd4e0c4cafb90e39bcc6a3d3542 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 18 Aug 2023 11:57:37 +0000
Subject: [PATCH v23 3/3] pg_upgrade: Add check function for logical
 replication slots

In order to prevent data loss, pg_upgrade will fail if the old node has slots with
the status 'lost', or with unconsumed WAL records.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C, Peter Smith, Hou Zhijie
---
 src/bin/pg_upgrade/check.c                    | 99 ++++++++++++++++++-
 src/bin/pg_upgrade/controldata.c              | 37 +++++++
 src/bin/pg_upgrade/info.c                     | 13 ++-
 src/bin/pg_upgrade/pg_upgrade.h               |  7 +-
 .../t/003_logical_replication_slots.pl        | 91 +++++++++++++++--
 5 files changed, 234 insertions(+), 13 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index d899986d79..a5956e0464 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,6 +9,7 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogdefs.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
@@ -91,7 +92,7 @@ check_and_dump_old_cluster(bool live_check)
 	get_db_and_rel_infos(&old_cluster);
 
 	/* Extract a list of logical replication slots */
-	get_logical_slot_infos(&old_cluster);
+	get_logical_slot_infos(&old_cluster, live_check);
 
 	init_tablespaces();
 
@@ -1476,3 +1477,99 @@ check_for_logical_replication_slots(ClusterInfo *cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots are usable.
+ */
+void
+check_for_lost_slots(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn;
+
+	/* logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) < 1700)
+		return;
+
+	conn = connectToServer(cluster, active_db->db_name);
+
+	prep_status("Checking wal_status for logical replication slots");
+
+	/* Check there are no logical replication slots with a 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status = 'lost' AND "
+							"temporary IS FALSE;");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is in 'lost' state.",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots with a state of 'lost' were detected.");
+
+	check_ok();
+}
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn;
+
+	/* logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) < 1700)
+		return;
+
+	conn = connectToServer(cluster, active_db->db_name);
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the latest
+	 * checkpoint position (SHUTDOWN_CHECKPOINT record).
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE confirmed_flush_lsn != '%X/%X' AND temporary IS FALSE;",
+							LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		pg_log(PG_WARNING,
+				"\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+				PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (ntups)
+		pg_fatal("One or more logical replication slots still have unconsumed WAL records.");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..248d5dbc03 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,43 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Gather latest checkpoint location if the cluster is newer or
+				 * equal to 17. This is used for upgrading logical replication
+				 * slots.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+				{
+					char *slash = NULL;
+					uint64 upper_lsn, lower_lsn;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;			/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					/*
+					 * Upper and lower part of LSN must be read separately
+					 * because it is reported as %X/%X format.
+					 */
+					upper_lsn = strtoul(p, &slash, 16);
+					lower_lsn = strtoul(++slash, NULL, 16);
+
+					/* And combine them */
+					cluster->controldata.chkpnt_latest =
+										(upper_lsn << 32) | lower_lsn;
+				}
+			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 6c2478ccbd..d1f5a6a09a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -661,9 +661,10 @@ get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
  * Higher level routine to generate LogicalSlotInfoArr for all databases.
  */
 void
-get_logical_slot_infos(ClusterInfo *cluster)
+get_logical_slot_infos(ClusterInfo *cluster, bool live_check)
 {
 	int			dbnum;
+	int			slot_count = 0;
 
 	if (cluster == &old_cluster)
 		pg_log(PG_VERBOSE, "\nsource databases:");
@@ -675,6 +676,7 @@ get_logical_slot_infos(ClusterInfo *cluster)
 		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
 
 		get_logical_slot_infos_per_db(cluster, pDbInfo);
+		slot_count += pDbInfo->slot_arr.nslots;
 
 		if (log_opts.verbose)
 		{
@@ -682,6 +684,15 @@ get_logical_slot_infos(ClusterInfo *cluster)
 			print_slot_infos(&pDbInfo->slot_arr);
 		}
 	}
+
+	/* Do additional checks if slots are found */
+	if (slot_count)
+	{
+		check_for_lost_slots(cluster);
+
+		if (!live_check)
+			check_for_confirmed_flush_lsn(cluster);
+	}
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 2dac266537..6df948ac73 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -10,6 +10,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "access/xlogdefs.h"
 #include "common/relpath.h"
 #include "libpq-fe.h"
 
@@ -242,6 +243,8 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+
+	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
@@ -366,6 +369,8 @@ void		output_completion_banner(char *deletion_script_file_name);
 void		check_cluster_versions(void);
 void		check_cluster_compatibility(bool live_check);
 void		create_script_for_old_cluster_deletion(char **deletion_script_file_name);
+void		check_for_lost_slots(ClusterInfo *cluster);
+void		check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 
 /* controldata.c */
@@ -417,7 +422,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
-void		get_logical_slot_infos(ClusterInfo *cluster);
+void		get_logical_slot_infos(ClusterInfo *cluster, bool live_check);
 int			count_logical_slots(ClusterInfo *cluster);
 
 /* option.c */
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 4df7edd594..bde801fd2b 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -17,15 +17,16 @@ my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 # Initialize old cluster
 my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
 $old_publisher->init(allows_streaming => 'logical');
-$old_publisher->start;
 
 # Initialize new cluster
 my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
 $new_publisher->init(allows_streaming => 'replica');
 
-my $bindir = $new_publisher->config_data('--bindir');
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_publisher->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # ------------------------------
 # TEST: Confirm pg_upgrade fails when new cluster wal_level is not 'logical'
@@ -67,13 +68,18 @@ $old_publisher->start;
 $old_publisher->safe_psql('postgres',
 	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
 );
+
+# 2. Consume WAL records
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);"
+);
 $old_publisher->stop;
 
-# 2. max_replication_slots is set to smaller than the number of slots (2)
+# 3. max_replication_slots is set to smaller than the number of slots (2)
 #	 present on the old cluster
 $new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# 3. New cluster wal_level is set correctly
+# 4. New cluster wal_level is set correctly
 $new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
 
 # Cause a failure at the start of pg_upgrade because the new cluster has
@@ -98,7 +104,8 @@ ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # ------------------------------
-# TEST: Successful upgrade
+# TEST: Confirm pg_upgrade fails when the slot still have unconsumed WAL
+#		records
 
 # Preparations for the subsequent test.
 # 1. Remove an unnecessary slot
@@ -107,10 +114,58 @@ $old_publisher->safe_psql('postgres',
 	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
 );
 
-# 2. Consume WAL records
+# 2. Generate extra WAL records
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;"
+);
+$old_publisher->stop;
+
+# Cause a failure at the start of pg_upgrade because the slot still have
+# unconsumed WAL records
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test.
+# 1. Remove the remained slot
+$old_publisher->start;
 $old_publisher->safe_psql('postgres',
-	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)"
+	"SELECT * FROM pg_drop_replication_slot('test_slot1');"
 );
+
+# 2. Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES;"
+);
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+# 3. Disable the subscription once
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
 $old_publisher->stop;
 
 # Actual run, successful upgrade is expected
@@ -133,7 +188,23 @@ ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 $new_publisher->start;
 my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot1|t), 'check the slot exists on new cluster');
-$new_publisher->stop;
+is($result, qq(sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
 
 done_testing();
-- 
2.27.0

