From a33e9d43de7321bf2777c99f3eda56ac1b579f10 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v23 2/3] pg_upgrade: Allow to replicate logical replication
 slots to new node

This commit allows nodes with logical replication slots to be upgraded. While
reading information from the old cluster, a list of logical replication slots is
fetched. At the later part of upgrading, pg_upgrade revisits the list and
restores slots by executing pg_create_logical_replication_slots() on the new
cluster.

Note that slot restoration must be done after the final pg_resetwal command
during the upgrade because pg_resetwal will remove WALs that are required by
the slots. Due to ths restriction, the timing of restoring replication slots is
different from other objects.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei, Masahiko Sawada
---
 doc/src/sgml/ref/pgupgrade.sgml               |  63 ++++++++
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  74 ++++++++++
 src/bin/pg_upgrade/function.c                 |  18 ++-
 src/bin/pg_upgrade/info.c                     | 123 +++++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               |  78 ++++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  19 +++
 .../t/003_logical_replication_slots.pl        | 139 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 10 files changed, 517 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..7d7a114f53 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -360,6 +360,69 @@ make prefix=/usr/local/pgsql.new install
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slots on the new publisher.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     After the upgrade is complete, then re-enable the subscription. Note that
+     if the new cluser uses different port number from old one,
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>
+     command must be also executed on subscriber.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the replication slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       All slots on the old cluster must be usable, i.e., there are no slots
+       whose <structfield>wal_status</structfield> is <literal>lost</literal> (see
+       <xref linkend="view-pg-replication-slots"/>).
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <structfield>confirmed_flush_lsn</structfield> (see <xref linkend="view-pg-replication-slots"/>)
+       of all slots on old cluster must be same as latest checkpoint location.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugins referenced by the slots on the old cluster must be
+       installed on the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       configured to value larger than the existing slots on the old cluster.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 64024e3b9e..d899986d79 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,6 +30,7 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_for_logical_replication_slots(ClusterInfo *cluster);
 
 
 /*
@@ -89,6 +90,9 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Extract a list of logical replication slots */
+	get_logical_slot_infos(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -189,6 +193,8 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	check_for_logical_replication_slots(&new_cluster);
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -1402,3 +1408,71 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * check_for_logical_replication_slots()
+ *
+ * Make sure there are no logical replication slots transactions because
+ * required WALs will be removed by subsequent pg_resetwal commands. Also,
+ * verify the parameter settings necessary for creating logical replication
+ * slots if required.
+ */
+static void
+check_for_logical_replication_slots(ClusterInfo *cluster)
+{
+	PGresult   *res;
+	PGconn	   *conn;
+	int			nslots;
+
+	/* logical replication slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) < 1700)
+		return;
+
+	conn = connectToServer(cluster, "template1");
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SELECT slot_name "
+								  "FROM pg_catalog.pg_replication_slots "
+								  "WHERE slot_type = 'logical' AND "
+								  "temporary IS FALSE;");
+
+	if (PQntuples(res))
+		pg_fatal("New cluster must not have logical replication slot, but found \"%s\"",
+				 PQgetvalue(res, 0, 0));
+
+	PQclear(res);
+
+	nslots = count_logical_slots(&old_cluster);
+
+	/*
+	 * Do additional checks when the logical replication slots have on the old
+	 * cluster.
+	 */
+	if (nslots)
+	{
+		int			max_replication_slots;
+		char	   *wal_level;
+
+		res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+		max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+		if (nslots > max_replication_slots)
+			pg_fatal("max_replication_slots must be greater than or equal to existing logical "
+					 "replication slots on old cluster.");
+
+		PQclear(res);
+
+		res = executeQueryOrDie(conn, "SHOW wal_level;");
+		wal_level = PQgetvalue(res, 0, 0);
+
+		if (strcmp(wal_level, "logical") != 0)
+			pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+					wal_level);
+
+		PQclear(res);
+	}
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..baf51d2eb1 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,12 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries:
+ *	1. Name of library files containing C-language functions (for non-built-in
+ *	   functions), and
+ *	2. Shared object (library) names containing the logical replication output
+ *	   plugins
+ *
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -66,14 +71,21 @@ get_loadable_libraries(void)
 		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name);
 
 		/*
-		 * Fetch all libraries containing non-built-in C functions in this DB.
+		 * Fetch all libraries containing non-built-in C functions or referred
+		 * to by logical replication slots in this DB.
 		 */
 		ress[dbnum] = executeQueryOrDie(conn,
 										"SELECT DISTINCT probin "
 										"FROM pg_catalog.pg_proc "
 										"WHERE prolang = %u AND "
 										"probin IS NOT NULL AND "
-										"oid >= %u;",
+										"oid >= %u "
+										"UNION "
+										"SELECT DISTINCT plugin "
+										"FROM pg_catalog.pg_replication_slots "
+										"WHERE wal_status <> 'lost' AND "
+										"database = current_database() AND "
+										"temporary IS FALSE;",
 										ClanguageId,
 										FirstNormalObjectId);
 		totaltups += PQntuples(ress[dbnum]);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d6..6c2478ccbd 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,7 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 
 /*
@@ -394,7 +395,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +601,105 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+
+	int			num_slots;
+
+	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status <> 'lost' AND "
+							"database = current_database() AND "
+							"temporary IS FALSE;");
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+void
+get_logical_slot_infos(ClusterInfo *cluster)
+{
+	int			dbnum;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+}
+
+/*
+ * count_logical_slots()
+ *
+ * Sum up and return the number of logical replication slots for all databases.
+ */
+int
+count_logical_slots(ClusterInfo *cluster)
+{
+	int			dbnum;
+	int			slot_count = 0;
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+		slot_count += cluster->dbarr.dbs[dbnum].slot_arr.nslots;
+
+	return slot_count;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +710,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * doing create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.nslots == 0);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -660,3 +766,18 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+	{
+		LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_info->slotname,
+			   slot_info->plugin,
+			   slot_info->two_phase);
+	}
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..713c67d7bd 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (count_logical_slots(&old_cluster))
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,67 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PGconn     *conn;
+		PQExpBuffer query;
+		int			slotnum;
+		char		log_file_name[MAXPGPATH];
+
+		/* Skip this database if there are no slots */
+		if (slot_arr->nslots == 0)
+			continue;
+
+		conn = connectToServer(&new_cluster, old_db->db_name);
+		query = createPQExpBuffer();
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+			/*
+			 * Constructs query for creating logical replication slots.
+			 *
+			 * XXX: For simplification, pg_create_logical_replication_slot() is
+			 * used. Is it sufficient?
+			 */
+			appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+			appendStringLiteralConn(query, slot_info->slotname, conn);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteralConn(query, slot_info->plugin, conn);
+			appendPQExpBuffer(query, ", false, %s);",
+							  slot_info->two_phase ? "true" : "false");
+
+			PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(query);
+	}
+
+	end_progress_output();
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..2dac266537 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -150,6 +150,22 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;			/* number of logical slot infos */
+	LogicalSlotInfo *slots;		/* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +192,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,6 +417,8 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+void		get_logical_slot_infos(ClusterInfo *cluster);
+int			count_logical_slots(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..4df7edd594
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+$old_publisher->start;
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'replica');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+$old_publisher->stop;
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when new cluster wal_level is not 'logical'
+
+# Create a slot on old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# Cause a failure at the start of pg_upgrade because wal_level is replica
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has the wrong wal_level');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when max_replication_slots on new cluster is
+#		too low
+
+# Preparations for the subsequent test.
+# 1. Create an unnecessary slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# 2. max_replication_slots is set to smaller than the number of slots (2)
+#	 present on the old cluster
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# 3. New cluster wal_level is set correctly
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+
+# Cause a failure at the start of pg_upgrade because the new cluster has
+# insufficient max_replication_slots
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has insufficient max_replication_slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test.
+# 1. Remove an unnecessary slot
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
+);
+
+# 2. Consume WAL records
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)"
+);
+$old_publisher->stop;
+
+# Actual run, successful upgrade is expected
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot1|t), 'check the slot exists on new cluster');
+$new_publisher->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51b7951ad8..0071efef1c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1501,7 +1501,10 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

