From 233e24f097d18de6e38b9eb6e2b971b144f205d3 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v31 2/2] pg_upgrade: Allow to replicate logical replication
 slots to new node

This commit allows nodes with logical replication slots to be upgraded. While
reading information from the old cluster, a list of logical replication slots is
fetched. At the later part of upgrading, pg_upgrade revisits the list and
restores slots by executing pg_create_logical_replication_slots() on the new
cluster. Migration of logical replication slots is only supported when the old
cluster is version 17.0 or later.

If the old node has slots with the status 'lost' or with unconsumed WAL records,
the pg_upgrade fails. These checks are needed to prevent data loss.

Note that slot restoration must be done after the final pg_resetwal command
during the upgrade because pg_resetwal will remove WALs that are required by
the slots. Due to this restriction, the timing of restoring replication slots is
different from other objects.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei, Masahiko Sawada,
             Dilip Kumar
---
 doc/src/sgml/ref/pgupgrade.sgml               |  70 +++++-
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    | 152 +++++++++--
 src/bin/pg_upgrade/controldata.c              |  72 ++++++
 src/bin/pg_upgrade/function.c                 | 149 ++++++++++-
 src/bin/pg_upgrade/info.c                     | 178 ++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               | 109 +++++++-
 src/bin/pg_upgrade/pg_upgrade.h               |  25 +-
 .../t/003_logical_replication_slots.pl        | 238 ++++++++++++++++++
 src/include/access/xlog.h                     |  13 -
 src/include/access/xlogdefs.h                 |  13 +
 src/tools/pgindent/typedefs.list              |   5 +
 13 files changed, 981 insertions(+), 47 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..bef107295c 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -360,6 +360,71 @@ make prefix=/usr/local/pgsql.new install
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slots on the new publisher. Migration of logical
+     replication slots is only supported when the old cluster is version 17.0
+     or later.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     Re-enable the subscription after the upgrade.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the replication slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       All slots on the old cluster must be usable, i.e., there are no slots
+       whose
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>wal_status</structfield>
+       is <literal>lost</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>confirmed_flush_lsn</structfield>
+       of all slots on the old cluster must be the same as the latest
+       checkpoint location.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugins referenced by the slots on the old cluster must be
+       installed in the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       configured to a value greater than or equal to the number of slots
+       present in the old cluster.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
@@ -629,8 +694,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
        Configure the servers for log shipping.  (You do not need to run
        <function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
        or take a file system backup as the standbys are still synchronized
-       with the primary.)  Replication slots are not copied and must
-       be recreated.
+       with the primary.)  Replication slots on the old standby are not copied.
+       Only logical slots on the primary are migrated to the new standby,
+       and other slots must be recreated.
       </para>
      </step>
 
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 56e313f562..490196b616 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,6 +9,7 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogdefs.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
@@ -30,6 +31,8 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
 
 
 /*
@@ -104,6 +107,13 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	/*
+	 * Logical replication slots can be migrated since PG17. See comments atop
+	 * get_old_cluster_logical_slot_infos().
+	 */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+		check_old_cluster_for_valid_slots(live_check);
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -189,6 +199,8 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	check_new_cluster_logical_replication_slots();
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -232,27 +244,6 @@ report_clusters_compatible(void)
 }
 
 
-void
-issue_warnings_and_set_wal_level(void)
-{
-	/*
-	 * We unconditionally start/stop the new server because pg_resetwal -o set
-	 * wal_level to 'minimum'.  If the user is upgrading standby servers using
-	 * the rsync instructions, they will need pg_upgrade to write its final
-	 * WAL record showing wal_level as 'replica'.
-	 */
-	start_postmaster(&new_cluster, true);
-
-	/* Reindex hash indexes for old < 10.0 */
-	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
-		old_9_6_invalidate_hash_indexes(&new_cluster, false);
-
-	report_extension_updates(&new_cluster);
-
-	stop_postmaster(false);
-}
-
-
 void
 output_completion_banner(char *deletion_script_file_name)
 {
@@ -1402,3 +1393,122 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Make sure there are no logical replication slots on the new cluster and that
+ * the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+	PGresult   *res;
+	PGconn	   *conn;
+	int			nslots = 0;
+	int			nlost_slots;
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* Logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+		return;
+
+	nslots = count_old_cluster_logical_slots();
+
+	/* Quick return if there are no logical slots to be migrated. */
+	if (nslots == 0)
+		return;
+
+	conn = connectToServer(&new_cluster, "template1");
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SELECT count(*) "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE slot_type = 'logical' AND "
+							"temporary IS FALSE;");
+
+	nlost_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (nlost_slots)
+	{
+		if (nlost_slots == 1)
+			pg_fatal("New cluster must not have logical replication slots but found a slot.");
+		else
+			pg_fatal("New cluster must not have logical replication slots but found %d slots",
+					 nlost_slots);
+	}
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (nslots > max_replication_slots)
+		pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+				 "logical replication slots (%d) on the old cluster.",
+				 max_replication_slots, nslots);
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	PQclear(res);
+	PQfinish(conn);
+
+	check_ok();
+}
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Make sure logical replication slots can be migrated to new cluster.
+ * Following points are checked:
+ *
+ *	- All logical replication slots are usable.
+ *	- All logical replication slots consumed all WALs, except a
+ *	  CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+	int			dbnum;
+
+	prep_status("Checking for valid logical replication slots");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		int			slotnum;
+		LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+			/* Is the slot still usable? */
+			if (slot->wal_status == WALAVAIL_REMOVED)
+				pg_fatal("Logical replication slot \"%s\" is in 'lost' state.",
+						 slot->slotname);
+
+			/*
+			 * Do additional checks to ensure that confirmed_flush LSN of all
+			 * the slots is the same as the latest checkpoint location.
+			 *
+			 * Note: This can be satisfied only when the old cluster has been
+			 * shut down, so we skip this for live checks.
+			 */
+			if (!live_check &&
+				(slot->confirmed_flush != old_cluster.controldata.chkpnt_latest))
+				pg_fatal("logical replication slot \"%s\" has not consumed WALs yet",
+						 slot->slotname);
+		}
+	}
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..fc08d55244 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,38 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Read the latest checkpoint location if the cluster is PG17
+				 * or later. This is used for upgrading logical replication
+				 * slots. Currently, we need it only for the old cluster but
+				 * for simplicity chose not to have additional checks.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+				{
+					bool		have_error = false;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;		/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					cluster->controldata.chkpnt_latest =
+						strtoLSN(p, &have_error);
+
+					if (have_error)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+				}
+			}
 		}
 
 		rc = pclose(output);
@@ -732,3 +764,43 @@ disable_old_cluster(void)
 		   "started once the new cluster has been started.",
 		   old_cluster.pgdata);
 }
+
+/*
+ * Convert String to XLogRecPtr.
+ *
+ * This function is ported from pg_lsn_in_internal(). The function cannot be
+ * called from client binaries.
+ */
+XLogRecPtr
+strtoLSN(const char *str, bool *have_error)
+{
+	int			len1,
+				len2;
+	uint32		id,
+				off;
+	XLogRecPtr	result;
+
+	Assert(have_error != NULL);
+	*have_error = false;
+
+	/* Sanity check input format. */
+	len1 = strspn(str, "0123456789abcdefABCDEF");
+	if (len1 < 1 || str[len1] != '/')
+	{
+		*have_error = true;
+		return InvalidXLogRecPtr;
+	}
+	len2 = strspn(str + len1 + 1, "0123456789abcdefABCDEF");
+	if (len2 < 1)
+	{
+		*have_error = true;
+		return InvalidXLogRecPtr;
+	}
+
+	/* Decode result. */
+	id = (uint32) strtoul(str, NULL, 16);
+	off = (uint32) strtoul(str + len1 + 1, NULL, 16);
+	result = ((uint64) id << 32) | off;
+
+	return result;
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..b00c7a8e41 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -11,8 +11,28 @@
 
 #include "access/transam.h"
 #include "catalog/pg_language_d.h"
+#include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
 
+/*
+ * Structures for listing up unique output plugins.
+ *
+ * XXX: these are not needed if we remove the function get_output_plugins().
+ * See comments atop it.
+ */
+typedef struct plugin_list_head
+{
+	struct plugin_list *head;
+	int			length;
+} plugin_list_head;
+
+typedef struct plugin_list
+{
+	int			dbnum;
+	char	   *plugin;
+	struct plugin_list *next;
+} plugin_list;
+
 /*
  * qsort comparator for pointers to library names
  *
@@ -43,10 +63,103 @@ library_name_compare(const void *p1, const void *p2)
 }
 
 
+/* Fetch list's length */
+static int
+plugin_list_length(plugin_list_head *listhead)
+{
+	return listhead ? listhead->length : 0;
+}
+
+/* Has the given plugin already been listed? */
+static bool
+is_plugin_unique(plugin_list_head *listhead, const char *plugin)
+{
+	plugin_list *point;
+
+	/* Quick return if the head is NULL */
+	if (listhead == NULL)
+		return true;
+
+	/* Seek the plugin list */
+	for (point = listhead->head; point; point = point->next)
+	{
+		if (strcmp(point->plugin, plugin) == 0)
+			return false;
+	}
+
+	return true;
+}
+
+/* Add an item to a plugin_list. */
+static void
+add_plugin_list_item(plugin_list_head **listhead, int dbnum, const char *plugin)
+{
+	plugin_list *newentry = (plugin_list *) pg_malloc(sizeof(plugin_list));
+	plugin_list *oldentry;
+
+	newentry->dbnum = dbnum;
+	newentry->plugin = pg_strdup(plugin);
+
+	/* Initialize the header if not yet */
+	if (*listhead == NULL)
+		*listhead = (plugin_list_head *) pg_malloc0(sizeof(plugin_list_head));
+
+	/*
+	 * Set the new entry as the head of the list. We do not have to consider
+	 * the ordering because they are sorted later.
+	 */
+	oldentry = (*listhead)->head;
+	(*listhead)->head = newentry;
+	newentry->next = oldentry;
+
+	/* Increment the list for plugin_list_length() */
+	(*listhead)->length++;
+}
+
+/*
+ * Load the list of unique output plugins.
+ *
+ * XXX: Currently, we extract the list of unique output plugins, but this may
+ * be overkill. The list is used for two purposes - 1) to allocate the minimal
+ * memory for the library list and 2) to skip storing duplicated plugin names.
+ * However, the consumer check_loadable_libraries() can avoid double checks for
+ * the same library. The above means that we can arrange output plugins without
+ * considering their uniqueness, so that we can remove this function.
+ */
+static plugin_list_head *
+get_output_plugins(void)
+{
+	plugin_list_head *head = NULL;
+	int			dbnum;
+
+	/* Quick return if there are no logical slots to be migrated. */
+	if (count_old_cluster_logical_slots() == 0)
+		return NULL;
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+		int			slotnum;
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+			/* Add to the list if the plugin has not been listed yet */
+			if (is_plugin_unique(head, slot->plugin))
+				add_plugin_list_item(&head, dbnum, slot->plugin);
+		}
+	}
+
+	return head;
+}
+
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries containing either C-language functions
+ *	or are corresponding to logical replication output plugins.
+ *
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -55,6 +168,7 @@ get_loadable_libraries(void)
 	PGresult  **ress;
 	int			totaltups;
 	int			dbnum;
+	plugin_list_head *output_plugins = get_output_plugins();
 
 	ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
 	totaltups = 0;
@@ -76,12 +190,22 @@ get_loadable_libraries(void)
 										"oid >= %u;",
 										ClanguageId,
 										FirstNormalObjectId);
+
 		totaltups += PQntuples(ress[dbnum]);
 
 		PQfinish(conn);
 	}
 
-	os_info.libraries = (LibraryInfo *) pg_malloc(totaltups * sizeof(LibraryInfo));
+	/*
+	 * Allocate a minimal memory for extensions and logical replication output
+	 * plugins.
+	 *
+	 * XXX: As mentioned in comments atop get_output_plugins(), we may not
+	 * have to consider the uniqueness of entries. If so, we can use
+	 * count_old_cluster_logical_slots() instead of plugin_list_length().
+	 */
+	os_info.libraries = (LibraryInfo *) pg_malloc(
+												  (totaltups + plugin_list_length(output_plugins)) * sizeof(LibraryInfo));
 	totaltups = 0;
 
 	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
@@ -89,6 +213,7 @@ get_loadable_libraries(void)
 		PGresult   *res = ress[dbnum];
 		int			ntups;
 		int			rowno;
+		plugin_list *point;
 
 		ntups = PQntuples(res);
 		for (rowno = 0; rowno < ntups; rowno++)
@@ -101,6 +226,26 @@ get_loadable_libraries(void)
 			totaltups++;
 		}
 		PQclear(res);
+
+		/*
+		 * If the old cluster has logical replication slots, plugins used by
+		 * them must be also stored. It must be done only once, so do it at
+		 * dbnum == 0 case.
+		 */
+		if (output_plugins == NULL)
+			continue;
+
+		if (dbnum != 0)
+			continue;
+
+		for (point = output_plugins->head; point; point = point->next)
+		{
+			os_info.libraries[totaltups].name = pg_strdup(point->plugin);
+			os_info.libraries[totaltups].dbnum = point->dbnum;
+
+			totaltups++;
+		}
+
 	}
 
 	pg_free(ress);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d6..f8afcdb533 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,8 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
 
 
 /*
@@ -283,7 +285,18 @@ get_db_and_rel_infos(ClusterInfo *cluster)
 	get_db_infos(cluster);
 
 	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-		get_rel_infos(cluster, &cluster->dbarr.dbs[dbnum]);
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_rel_infos(cluster, pDbInfo);
+
+		/*
+		 * If we are reading the old_cluster, gets infos for logical
+		 * replication slots.
+		 */
+		if (cluster == &old_cluster)
+			get_old_cluster_logical_slot_infos(pDbInfo);
+	}
 
 	if (cluster == &old_cluster)
 		pg_log(PG_VERBOSE, "\nsource databases:");
@@ -394,7 +407,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +613,135 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * Helper function for get_old_cluster_logical_slot_infos()
+ */
+static WALAvailability
+GetWALAvailabilityByString(const char *str)
+{
+	WALAvailability status = WALAVAIL_INVALID_LSN;
+
+	if (strcmp(str, "reserved") == 0)
+		status = WALAVAIL_RESERVED;
+	else if (strcmp(str, "extended") == 0)
+		status = WALAVAIL_EXTENDED;
+	else if (strcmp(str, "unreserved") == 0)
+		status = WALAVAIL_UNRESERVED;
+	else if (strcmp(str, "lost") == 0)
+		status = WALAVAIL_REMOVED;
+
+	return status;
+}
+
+/*
+ * get_old_cluster_logical_slot_infos()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the
+ * database referred to by "dbinfo".
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not saved at shutdown, so
+ * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
+ * which can lead to data loss. It is still not guaranteed for manually created
+ * slots in PG17, so subsequent checks done in
+ * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
+ * are included.
+ */
+static void
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+	int			num_slots;
+
+	/* Logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+		return;
+
+	conn = connectToServer(&old_cluster, dbinfo->db_name);
+
+	/*
+	 * The temporary slots are expressly ignored while checking because such
+	 * slots cannot exist after the upgrade. During the upgrade, clusters are
+	 * started and stopped several times causing any temporary slots to be
+	 * removed.
+	 */
+	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, confirmed_flush_lsn, wal_status "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE slot_type = 'logical' AND "
+							"database = current_database() AND "
+							"temporary IS FALSE;");
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+		int			i_confirmed_flush;
+		int			i_wal_status;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+		i_confirmed_flush = PQfnumber(res, "confirmed_flush_lsn");
+		i_wal_status = PQfnumber(res, "wal_status");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+			bool		have_error = false;
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+			curr->confirmed_flush = strtoLSN(
+											 PQgetvalue(res,
+														slotnum,
+														i_confirmed_flush),
+											 &have_error);
+			curr->wal_status = GetWALAvailabilityByString(
+														  PQgetvalue(res,
+																	 slotnum,
+																	 i_wal_status));
+
+			Assert(!have_error);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+
+/*
+ * count_old_cluster_logical_slots()
+ *
+ * Sum up and return the number of logical replication slots for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather slot information only for cluster versions greater than or
+ * equal to PG17. See get_old_cluster_logical_slot_infos().
+ */
+int
+count_old_cluster_logical_slots(void)
+{
+	int			dbnum;
+	int			slot_count = 0;
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+		slot_count += old_cluster.dbarr.dbs[dbnum].slot_arr.nslots;
+
+	return slot_count;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +752,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.nslots == 0);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -642,8 +790,11 @@ print_db_infos(DbInfoArr *db_arr)
 
 	for (dbnum = 0; dbnum < db_arr->ndbs; dbnum++)
 	{
-		pg_log(PG_VERBOSE, "Database: \"%s\"", db_arr->dbs[dbnum].db_name);
-		print_rel_infos(&db_arr->dbs[dbnum].rel_arr);
+		DbInfo	   *pDbInfo = &db_arr->dbs[dbnum];
+
+		pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+		print_rel_infos(&pDbInfo->rel_arr);
+		print_slot_infos(&pDbInfo->slot_arr);
 	}
 }
 
@@ -660,3 +811,22 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+	{
+		LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+		if (slotnum == 0)
+			pg_log(PG_VERBOSE, "Logical replication slots within the database:");
+
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_info->slotname,
+			   slot_info->plugin,
+			   slot_info->two_phase);
+	}
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..f81b1d5cc8 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,8 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
+static void setup_new_cluster(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +190,10 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	create_script_for_old_cluster_deletion(&deletion_script_file_name);
+
+	setup_new_cluster();
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -197,10 +203,6 @@ main(int argc, char **argv)
 		check_ok();
 	}
 
-	create_script_for_old_cluster_deletion(&deletion_script_file_name);
-
-	issue_warnings_and_set_wal_level();
-
 	pg_log(PG_REPORT,
 		   "\n"
 		   "Upgrade Complete\n"
@@ -860,3 +862,102 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PGconn	   *conn;
+		PQExpBuffer query;
+		int			slotnum;
+		char		log_file_name[MAXPGPATH];
+
+		/* Skip this database if there are no slots */
+		if (slot_arr->nslots == 0)
+			continue;
+
+		conn = connectToServer(&new_cluster, old_db->db_name);
+		query = createPQExpBuffer();
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+			/* Constructs a query for creating logical replication slots. */
+			appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+			appendStringLiteralConn(query, slot_info->slotname, conn);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteralConn(query, slot_info->plugin, conn);
+			appendPQExpBuffer(query, ", false, %s);",
+							  slot_info->two_phase ? "true" : "false");
+
+			PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(query);
+	}
+
+	end_progress_output();
+	check_ok();
+}
+
+/*
+ *	setup_new_cluster()
+ *
+ * Starts a new cluster for updating the wal_level in the control fine, then
+ * does final setups. Logical slots are also created here.
+ */
+static void
+setup_new_cluster(void)
+{
+	/*
+	 * We unconditionally start/stop the new server because pg_resetwal -o set
+	 * wal_level to 'minimum'.  If the user is upgrading standby servers using
+	 * the rsync instructions, they will need pg_upgrade to write its final
+	 * WAL record showing wal_level as 'replica'.
+	 */
+	start_postmaster(&new_cluster, true);
+
+	/*
+	 * If the old cluster has logical slots, migrate them to a new cluster.
+	 *
+	 * Note: This must be done after doing the pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (count_old_cluster_logical_slots())
+		create_logical_replication_slots();
+
+	/*
+	 * Reindex hash indexes for old < 10.0. count_old_cluster_logical_slots()
+	 * returns non-zero when the old_cluster is PG17 and later, so it's OK to
+	 * use "else if" here. See comments atop count_old_cluster_logical_slots()
+	 * and get_old_cluster_logical_slot_infos().
+	 */
+	else if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
+		old_9_6_invalidate_hash_indexes(&new_cluster, false);
+
+	report_extension_updates(&new_cluster);
+
+	stop_postmaster(false);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 7afa96716e..73d8a05b96 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -10,6 +10,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "access/xlogdefs.h"
 #include "common/relpath.h"
 #include "libpq-fe.h"
 
@@ -150,6 +151,24 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* can the slot decode 2PC? */
+	XLogRecPtr	confirmed_flush;	/* confirmed_flush_lsn of the slot */
+	WALAvailability wal_status; /* status of the slot */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;			/* number of logical slot infos */
+	LogicalSlotInfo *slots;		/* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +195,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -225,6 +245,7 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+	XLogRecPtr	chkpnt_latest;
 } ControlData;
 
 /*
@@ -344,7 +365,6 @@ void		output_check_banner(bool live_check);
 void		check_and_dump_old_cluster(bool live_check);
 void		check_new_cluster(void);
 void		report_clusters_compatible(void);
-void		issue_warnings_and_set_wal_level(void);
 void		output_completion_banner(char *deletion_script_file_name);
 void		check_cluster_versions(void);
 void		check_cluster_compatibility(bool live_check);
@@ -400,6 +420,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+int			count_old_cluster_logical_slots(void);
 
 /* option.c */
 
@@ -471,3 +492,5 @@ void		parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr
 										  char *old_pgdata, char *new_pgdata,
 										  char *old_tablespace);
 bool		reap_child(bool wait_for_child);
+
+XLogRecPtr	strtoLSN(const char *str, bool *have_error);
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..7c6cc6d04b
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,238 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'replica');
+
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when new cluster wal_level is not 'logical'
+
+# Preparations for the subsequent test:
+# 1. Create a slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# pg_upgrade will fail because the new cluster wal_level is 'replica'
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has the wrong wal_level');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when max_replication_slots on a new cluster is
+#		too low
+
+# Preparations for the subsequent test:
+# 1. Create a second slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
+);
+
+# 2. Consume WAL records to avoid another type of upgrade failure. It will be
+#	 tested in subsequent cases.
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);"
+);
+$old_publisher->stop;
+
+# 3. max_replication_slots is set to smaller than the number of slots (2)
+#	 present on the old cluster
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# 4. wal_level is set correctly on the new cluster
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+
+# pg_upgrade will fail because the new cluster has insufficient max_replication_slots
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has insufficient max_replication_slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+# Preparations for the subsequent test:
+# 1. Remove the slot 'test_slot2', leaving only 1 slot remaining on the old
+#	 cluster, so the new cluster config  max_replication_slots=1 will now be
+#	 enough.
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
+);
+
+# 2. Generate extra WAL records. Because these WAL records do not get consumed
+#	 it will cause the upcoming pg_upgrade test to fail.
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;"
+);
+$old_publisher->stop;
+
+# pg_upgrade will fail because the slot still has unconsumed WAL records
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Successful --check command
+
+# Preparations for the subsequent test:
+# 1. Start the cluster. --check works well when an old cluster is running.
+$old_publisher->start;
+
+# Actual run, successful --check command is expected
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--check'
+	],
+	'run of pg_upgrade --check for new instance');
+ok(!-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade --check success");
+
+# Remove the remained slot
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot1');"
+);
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES;"
+);
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$old_publisher->stop;
+
+# Actual run, successful upgrade is expected
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check that the slot 'sub' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+	'postgres', qq[
+	ALTER SUBSCRIPTION sub CONNECTION '$new_connstr';
+	ALTER SUBSCRIPTION sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
+
+done_testing();
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 48ca852381..b3ecbdf0d8 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -177,19 +177,6 @@ typedef struct CheckpointStatsData
 
 extern PGDLLIMPORT CheckpointStatsData CheckpointStats;
 
-/*
- * GetWALAvailability return codes
- */
-typedef enum WALAvailability
-{
-	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
-	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot or
-								 * wal_keep_size */
-	WALAVAIL_UNRESERVED,		/* no longer reserved, but not removed yet */
-	WALAVAIL_REMOVED			/* WAL segment has been removed */
-} WALAvailability;
-
 struct XLogRecData;
 struct XLogReaderState;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index fe794c7740..b0ae6f151e 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -64,6 +64,19 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Availability of WAL files claimed by replication slots.
+ */
+typedef enum WALAvailability
+{
+	WALAVAIL_INVALID_LSN,		/* parameter error */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot or
+								 * wal_keep_size */
+	WALAVAIL_UNRESERVED,		/* no longer reserved, but not removed yet */
+	WALAVAIL_REMOVED			/* WAL segment has been removed */
+} WALAvailability;
+
 /*
  * This chunk of hackery attempts to determine which file sync methods
  * are available on the current platform, and to choose an appropriate
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 49a33c0387..9028b33423 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1501,7 +1501,10 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
@@ -3612,6 +3615,8 @@ pltcl_proc_desc
 pltcl_proc_key
 pltcl_proc_ptr
 pltcl_query_desc
+plugin_list
+plugin_list_head
 pointer
 polymorphic_actuals
 pos_trgm
-- 
2.27.0

