From 3ac0cdd9fef5bbaa3cdd152e3128a8e4747208a1 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v2] pg_upgrade: Add --include-replication-slots option

This commit introduces a new pg_upgrade option called "--include-replication-slots".
This allows nodes with logical replication slots to be upgraded. The commit can
be divided into two parts: one for pg_dump and another for pg_upgrade.

For pg_dump this commit includes a new option called "--replication-slots-only".
This option can be used to dump replication slots. When this option is specified,
the slot_name, plugin, and two_phase parameters are extracted from pg_replication_slots.
An SQL file is then generated which executes pg_create_logical_replication_slot()
with the extracted parameters.

For pg_upgrade, when '--include-replication-slots' is specified, it executes pg_dump
with the new "--replication-slots-only" option and restores from the dump. Apart
from restoring schema, pg_resetwal must not be called after restoring replication
slots. This is because the command discards WAL files and starts from a new segment,
even if they are required by replication slots. This leads to an ERROR: "requested
WAL segment XXX has already been removed". To avoid this, replication slots are
restored at a different time than other objects, after running pg_resetwal.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously, pg_upgrade
allowed copying publications to a new node. With this new commit, adjusting the
connection string to the new publisher will cause the apply worker on the subscriber
to connect to the new publisher automatically. This enables seamless continuation
of logical replication, even after an upgrade.

Author: Hayato Kuroda
Reviewed-by: Peter Smith, Julien Rouhaud
---
 doc/src/sgml/ref/pg_dump.sgml                 |  10 ++
 doc/src/sgml/ref/pgupgrade.sgml               |  11 ++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     | 141 ++++++++++++++++++
 src/bin/pg_dump/pg_dump.h                     |  15 +-
 src/bin/pg_dump/pg_dump_sort.c                |   4 +
 src/bin/pg_upgrade/dump.c                     |  23 +++
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/option.c                   |   6 +
 src/bin/pg_upgrade/pg_upgrade.c               |  61 ++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |   2 +
 .../pg_upgrade/t/003_logical_replication.pl   |  89 +++++++++++
 12 files changed, 363 insertions(+), 1 deletion(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication.pl

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index e81e35c13b..2cd4fd10b0 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -1206,6 +1206,16 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--replication-slots-only</option></term>
+      <listitem>
+       <para>
+        Dump only replication slots; not the schema (data definitions), nor
+        data. This is mainly used when upgrading nodes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>-?</option></term>
        <term><option>--help</option></term>
diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..6505b0fd34 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--include-replication-slots</option></term>
+      <listitem>
+       <para>
+        Upgrade replication slots. Only logical replication slots are currently
+        supported, and temporary slots are ignored. Note that pg_upgrade does
+        not check the installation of plugins.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..8a6f25cf2c 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -187,6 +187,7 @@ typedef struct _dumpOptions
 	int			use_setsessauth;
 	int			enable_row_security;
 	int			load_via_partition_root;
+	int			slot_only;
 
 	/* default, if no "inclusion" switches appear, is to dump everything */
 	bool		include_everything;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7a504dfe25..78c7102d3e 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -328,6 +328,9 @@ static void setupDumpWorker(Archive *AH);
 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
 
+static void getReplicationSlots(Archive *fout);
+static void dumpReplicationSlot(Archive *fout,
+								const ReplicationSlotInfo * slotinfo);
 
 int
 main(int argc, char **argv)
@@ -431,6 +434,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
+		{"replication-slots-only", no_argument, NULL, 15},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -657,6 +661,11 @@ main(int argc, char **argv)
 										  optarg);
 				break;
 
+			case 15:			/* dump only replication slot(s) */
+				dopt.slot_only = true;
+				dopt.include_everything = false;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -714,6 +723,11 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	if (dopt.slot_only && dopt.dataOnly)
+		pg_fatal("options --replication-slots-only and -a/--data-only cannot be used together");
+	if (dopt.slot_only && dopt.schemaOnly)
+		pg_fatal("options --replication-slots-only and -s/--schema-only cannot be used together");
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -876,6 +890,16 @@ main(int argc, char **argv)
 			pg_fatal("no matching extensions were found");
 	}
 
+	/*
+	 * If dump replication-slots-only was requested, dump only them and skip
+	 * everything else.
+	 */
+	if (dopt.slot_only)
+	{
+		getReplicationSlots(fout);
+		goto dump;
+	}
+
 	/*
 	 * Dumping LOs is the default for dumps where an inclusion switch is not
 	 * used (an "include everything" dump).  -B can be used to exclude LOs
@@ -936,6 +960,8 @@ main(int argc, char **argv)
 	if (!dopt.no_security_labels)
 		collectSecLabels(fout);
 
+dump:
+
 	/* Lastly, create dummy objects to represent the section boundaries */
 	boundaryObjs = createBoundaryObjects();
 
@@ -1119,6 +1145,7 @@ help(const char *progname)
 	printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
 	printf(_("  --on-conflict-do-nothing     add ON CONFLICT DO NOTHING to INSERT commands\n"));
 	printf(_("  --quote-all-identifiers      quote all identifiers, even if not key words\n"));
+	printf(_("  --replication-slots-only     dump only replication slots, no schema or data\n"));
 	printf(_("  --rows-per-insert=NROWS      number of rows per INSERT; implies --inserts\n"));
 	printf(_("  --section=SECTION            dump named section (pre-data, data, or post-data)\n"));
 	printf(_("  --serializable-deferrable    wait until the dump can run without anomalies\n"));
@@ -10252,6 +10279,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_REPLICATION_SLOT:
+			dumpReplicationSlot(fout, (const ReplicationSlotInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18227,6 +18257,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_REPLICATION_SLOT:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
@@ -18488,3 +18519,113 @@ appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
 	if (!res)
 		pg_log_warning("could not parse %s array", "reloptions");
 }
+
+/*
+ * getReplicationSlots
+ *	  get information about replication slots
+ */
+static void
+getReplicationSlots(Archive *fout)
+{
+	PGresult   *res;
+	ReplicationSlotInfo *slotinfo;
+	PQExpBuffer query;
+	DumpOptions *dopt = fout->dopt;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+	int			i,
+				ntups;
+
+	/* Check whether we should dump or not */
+	if (fout->remoteVersion < 160000 || !dopt->slot_only)
+		return;
+
+	query = createPQExpBuffer();
+
+	resetPQExpBuffer(query);
+
+	/*
+	 * Get replication slots.
+	 *
+	 * XXX: Which information must be extracted from old node? Currently three
+	 * attributes are extracted because they are used by
+	 * pg_create_logical_replication_slot().
+	 * XXX: Do we have to support physical slots?
+	 */
+	appendPQExpBufferStr(query,
+						 "SELECT r.slot_name, r.plugin, r.two_phase "
+						 "FROM pg_replication_slots r "
+						 "WHERE r.database = current_database() AND temporary = false "
+						 "AND wal_status IN ('reserved', 'extended');");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	slotinfo = pg_malloc(ntups * sizeof(ReplicationSlotInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		slotinfo[i].dobj.objType = DO_REPLICATION_SLOT;
+
+		slotinfo[i].dobj.catId.tableoid = InvalidOid;
+		slotinfo[i].dobj.catId.oid = InvalidOid;
+		AssignDumpId(&slotinfo[i].dobj);
+
+		slotinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_slotname));
+
+		slotinfo[i].plugin = pg_strdup(PQgetvalue(res, i, i_plugin));
+		slotinfo[i].twophase = pg_strdup(PQgetvalue(res, i, i_twophase));
+
+		/* FIXME: force dumping */
+		slotinfo[i].dobj.dump = DUMP_COMPONENT_ALL;
+	}
+	PQclear(res);
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpReplicationSlot
+ *	  write down a script for pg_restore command
+ */
+static void
+dumpReplicationSlot(Archive *fout, const ReplicationSlotInfo * slotinfo)
+{
+	DumpOptions *dopt = fout->dopt;
+
+	if (!dopt->slot_only)
+		return;
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		PQExpBuffer query = createPQExpBuffer();
+		char	   *slotname = pg_strdup(slotinfo->dobj.name);
+
+		/*
+		 * XXX: For simplification, pg_create_logical_replication_slot() is
+		 * used. Is it sufficient?
+		 */
+		appendPQExpBuffer(query, "SELECT pg_create_logical_replication_slot('%s', ",
+						  slotname);
+		appendStringLiteralAH(query, slotinfo->plugin, fout);
+		appendPQExpBuffer(query, ", ");
+		appendStringLiteralAH(query, slotinfo->twophase, fout);
+		appendPQExpBuffer(query, ");");
+
+		ArchiveEntry(fout, slotinfo->dobj.catId, slotinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = slotname,
+								  .description = "REPLICATION SLOT",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+		pfree(slotname);
+		destroyPQExpBuffer(query);
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed6ce41ad7..e59cfdd8fa 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_REPLICATION_SLOT
 } DumpableObjectType;
 
 /*
@@ -666,6 +667,18 @@ typedef struct _SubscriptionInfo
 	char	   *subpasswordrequired;
 } SubscriptionInfo;
 
+/*
+ * The ReplicationSlotInfo struct is used to represent replication slots.
+ * XXX: add more attrbutes if needed
+ */
+typedef struct _ReplicationSlotInfo
+{
+	DumpableObject dobj;
+	char	   *plugin;
+	char	   *slottype;
+	char	   *twophase;
+}			ReplicationSlotInfo;
+
 /*
  *	common utility functions
  */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 8266c117a3..4280283f0d 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1497,6 +1497,10 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 			snprintf(buf, bufsize,
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
+		case DO_REPLICATION_SLOT:
+			snprintf(buf, bufsize,
+					 "REPLICATION SLOT (ID %d NAME %s)",
+					 obj->dumpId, obj->name);
 			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..f8d0c6ddde 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -59,6 +59,29 @@ generate_old_dump(void)
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
+		/*
+		 * Dump replication slots if needed.
+		 *
+		 * XXX We cannot dump replication slots at the same time as the schema
+		 * dump because we need to separate the timing of restoring
+		 * replication slots and other objects. Replication slots, in
+		 * particular, should not be restored before executing the pg_resetwal
+		 * command because it will remove WALs that are required by the slots.
+		 */
+		if (user_opts.include_slots)
+		{
+			char		slots_file_name[MAXPGPATH];
+
+			snprintf(slots_file_name, sizeof(slots_file_name), DB_DUMP_SLOTS_FILE_MASK, old_db->db_oid);
+			parallel_exec_prog(log_file_name, NULL,
+							   "\"%s/pg_dump\" %s --replication-slots-only "
+							   "--quote-all-identifiers --binary-upgrade %s "
+							   "--file=\"%s/%s\" %s",
+							   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+							   log_opts.verbose ? "--verbose" : "",
+							   log_opts.dumpdir,
+							   slots_file_name, escaped_connstr.data);
+		}
 		termPQExpBuffer(&escaped_connstr);
 	}
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..7f5d48b7e1 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 8869b6b60d..d8d9f69b47 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"include-replication-slots", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.include_slots = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -289,6 +294,7 @@ usage(void)
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --copy                        copy files to new cluster (default)\n"));
+	printf(_("  --include-replication-slots   upgrade replication slots\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 75bab0a04c..04bf1d867a 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create replication slots if requested.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because the
+	 * command will remove required WALs.
+	 */
+	if (user_opts.include_slots)
+	{
+		start_postmaster(&new_cluster, true);
+		create_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,50 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores replication slots.
+ */
+static void
+create_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		slots_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		snprintf(slots_file_name, sizeof(slots_file_name),
+				 DB_DUMP_SLOTS_FILE_MASK, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		parallel_exec_prog(log_file_name,
+						   NULL,
+						   "\"%s/psql\" %s --echo-queries --set ON_ERROR_STOP=on "
+						   "--no-psqlrc --dbname %s -f \"%s/%s\"",
+						   new_cluster.bindir,
+						   cluster_conn_opts(&new_cluster),
+						   old_db->db_name,
+						   log_opts.dumpdir,
+						   slots_file_name);
+	}
+
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info now that we have objects in the databases */
+	get_db_and_rel_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..1aa41f68bc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -29,6 +29,7 @@
 /* contains both global db information and CREATE DATABASE commands */
 #define GLOBALS_DUMP_FILE	"pg_upgrade_dump_globals.sql"
 #define DB_DUMP_FILE_MASK	"pg_upgrade_dump_%u.custom"
+#define DB_DUMP_SLOTS_FILE_MASK	"pg_upgrade_dump_%u_slots.sql"
 
 /*
  * Base directories that include all the files generated internally, from the
@@ -304,6 +305,7 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		include_slots;	/* true -> dump and restore replication slots */
 } UserOpts;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_logical_replication.pl b/src/bin/pg_upgrade/t/003_logical_replication.pl
new file mode 100644
index 0000000000..13ddde3d5f
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication.pl
@@ -0,0 +1,89 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Tests for logical replication, especially for upgrading publisher
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+$old_publisher->start;
+
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+$subscriber->start;
+
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1,10) AS a");
+$subscriber->safe_psql('postgres', "CREATE TABLE tbl (a int)");
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub");
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(10), 'check initial rows on subscriber');
+
+# Preparations for upgrading publisher
+$old_publisher->stop;
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# Run pg_upgrade. pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--include-replication-slot'
+	],
+	'run of pg_upgrade for new publisher');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check whether the replication slot is copied
+$new_publisher->start;
+$result =
+  $new_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(1), 'check the replication slot is copied to new publisher');
+
+# Change connection string and enable logical replication
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+
+$new_publisher->wait_for_catchup('sub');
+
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
+
+done_testing();
-- 
2.27.0

