From 46d2b060421458b9c153cd665f03237f85e18070 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v47] pg_upgrade: Allow to replicate logical replication slots
 to new node

This commit allows nodes with logical replication slots to be upgraded. While
reading information from the old cluster, a list of logical replication slots is
fetched. At the later part of upgrading, pg_upgrade revisits the list and
restores slots by executing a new binary upgrading function,
binary_upgrade_create_logical_replication_slot() on the new cluster. Migration
of logical replication slots is only supported when the old cluster is version
17.0 or later.

If the old node has slots with the status 'lost' or with unconsumed WAL records,
the pg_upgrade fails. These checks are needed to prevent data loss.

Note that the pg_resetwal command would remove WAL files, which are required for
restart_lsn. If WALs required by logical replication slots are removed, the
slots are unusable. Therefore, binary_upgrade_create_logical_replication_slot()
sets a startpoint of next wal segment which will be created by pg_resetwal.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei, Masahiko Sawada,
             Dilip Kumar, Bharath Rupireddy
---
 doc/src/sgml/ref/pgupgrade.sgml               |  76 +++-
 src/backend/replication/logical/decode.c      |  43 ++-
 src/backend/replication/logical/logical.c     | 161 +++++++--
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/slot.c                |  12 +
 src/backend/replication/slotfuncs.c           |   7 +-
 src/backend/replication/walsender.c           |   2 +-
 src/backend/utils/adt/pg_upgrade_support.c    | 150 ++++++++
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    | 205 ++++++++++-
 src/bin/pg_upgrade/function.c                 |  31 +-
 src/bin/pg_upgrade/info.c                     | 170 ++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               | 112 +++++-
 src/bin/pg_upgrade/pg_upgrade.h               |  22 +-
 src/bin/pg_upgrade/server.c                   |  25 +-
 .../003_upgrade_logical_replication_slots.pl  | 325 ++++++++++++++++++
 src/include/catalog/pg_proc.dat               |  12 +
 src/include/replication/logical.h             |  38 +-
 src/include/replication/slot.h                |   4 +
 src/tools/pgindent/typedefs.list              |   3 +
 21 files changed, 1335 insertions(+), 69 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index f17fdb1ba5..4d579e793d 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -383,6 +383,77 @@ make prefix=/usr/local/pgsql.new install
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slots on the new publisher. Migration of logical
+     replication slots is only supported when the old cluster is version 17.0
+     or later.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     Re-enable the subscription after the upgrade.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the replication slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       All slots on the old cluster must be usable, i.e., there are no slots
+       whose
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
+       is <literal>true</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The old cluster has replicated all the changes to subscribers.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugins referenced by the slots on the old cluster must be
+       installed in the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must not have permanent logical replication slots, i.e.,
+       there must be no slots where
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>temporary</structfield>
+       is <literal>false</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       configured to a value greater than or equal to the number of slots
+       present in the old cluster.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
@@ -650,8 +721,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
        Configure the servers for log shipping.  (You do not need to run
        <function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
        or take a file system backup as the standbys are still synchronized
-       with the primary.)  Replication slots are not copied and must
-       be recreated.
+       with the primary.)  Only logical slots on the primary are copied to the
+       new standby, but other slots on the old standby are not copied so must
+       be recreated manually.
       </para>
      </step>
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 730061c9da..6de54153f7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -295,7 +295,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 */
 				if (TransactionIdIsValid(xid))
 				{
-					if (!ctx->fast_forward)
+					if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
 						ReorderBufferAddInvalidations(reorder, xid,
 													  buf->origptr,
 													  invals->nmsgs,
@@ -303,7 +303,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
 													  buf->origptr);
 				}
-				else if ((!ctx->fast_forward))
+				else if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
 					ReorderBufferImmediateInvalidation(ctx->reorder,
 													   invals->nmsgs,
 													   invals->msgs);
@@ -416,7 +416,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * point in decoding changes.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-		ctx->fast_forward)
+		ctx->decoding_mode == DECODING_MODE_FAST_FORWARD)
 		return;
 
 	switch (info)
@@ -475,7 +475,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * point in decoding data changes.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-		ctx->fast_forward)
+		ctx->decoding_mode == DECODING_MODE_FAST_FORWARD)
 		return;
 
 	switch (info)
@@ -604,7 +604,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * point in decoding messages.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-		ctx->fast_forward)
+		ctx->decoding_mode == DECODING_MODE_FAST_FORWARD)
 		return;
 
 	message = (xl_logical_message *) XLogRecGetData(r);
@@ -621,6 +621,20 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
 		return;
 
+	/*
+	 * Set output_skipped flag to notify the message's existence to the
+	 * caller side. Usually, the flag is set when either the COMMIT or ABORT
+	 * records are decoded, but this must be turned on here because the
+	 * non-transactional logical message is decoded without waiting for these
+	 * records.
+	 */
+	if (ctx->decoding_mode == DECODING_MODE_SILENT &&
+		!message->transactional)
+	{
+		ctx->output_skipped = true;
+		return;
+	}
+
 	/*
 	 * If this is a non-transactional change, get the snapshot we're expected
 	 * to use. We only get here when the snapshot is consistent, and the
@@ -1279,13 +1293,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
  *	  are restarting or if we haven't assembled a consistent snapshot yet.
  * 2) The transaction happened in another database.
  * 3) The output plugin is not interested in the origin.
- * 4) We are doing fast-forwarding
+ * 4) We are not in the normal decoding mode.
+ *
+ * Also, set output_skipped flag if we are in the slient mode.
  */
 static bool
 DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 				  Oid txn_dbid, RepOriginId origin_id)
 {
-	return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
-			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+	bool		need_skip;
+
+	need_skip = (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+				 (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+				 ctx->decoding_mode != DECODING_MODE_NORMAL ||
+				 FilterByOrigin(ctx, origin_id));
+
+	/* Set a flag if we are in the slient mode */
+	if (ctx->decoding_mode == DECODING_MODE_SILENT)
+		ctx->output_skipped = true;
+
+	return need_skip;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..0f4b1c6323 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -41,6 +41,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 
 /* data for errcontext callback */
@@ -150,7 +151,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogRecPtr start_lsn,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
-					   bool fast_forward,
+					   DecodingMode decoding_mode,
 					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
@@ -176,7 +177,7 @@ StartupDecodingContext(List *output_plugin_options,
 	 * (re-)load output plugins, so we detect a bad (removed) output plugin
 	 * now.
 	 */
-	if (!fast_forward)
+	if (decoding_mode == DECODING_MODE_NORMAL)
 		LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
 
 	/*
@@ -294,7 +295,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->output_plugin_options = output_plugin_options;
 
-	ctx->fast_forward = fast_forward;
+	ctx->decoding_mode = decoding_mode;
 
 	MemoryContextSwitchTo(old_context);
 
@@ -437,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
-								 need_full_snapshot, false,
+								 need_full_snapshot, DECODING_MODE_NORMAL,
 								 xl_routine, prepare_write, do_write,
 								 update_progress);
 
@@ -473,8 +474,8 @@ CreateInitDecodingContext(const char *plugin,
  * output_plugin_options
  *		options passed to the output plugin.
  *
- * fast_forward
- *		bypass the generation of logical changes.
+ * decoding_mode
+ *		See the definition of DecodingMode for details.
  *
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
@@ -493,7 +494,7 @@ CreateInitDecodingContext(const char *plugin,
 LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
-					  bool fast_forward,
+					  DecodingMode decoding_mode,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
@@ -573,8 +574,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 fast_forward, xl_routine, prepare_write,
-								 do_write, update_progress);
+								 decoding_mode, xl_routine,
+								 prepare_write, do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -773,7 +774,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -801,7 +802,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -835,7 +836,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -867,7 +868,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -905,7 +906,11 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the two-phase callbacks are
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when two-phase commits are supported */
 	Assert(ctx->twophase);
@@ -950,7 +955,11 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the two-phase callbacks are
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when two-phase commits are supported */
 	Assert(ctx->twophase);
@@ -995,7 +1004,11 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the two-phase callbacks are
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when two-phase commits are supported */
 	Assert(ctx->twophase);
@@ -1041,7 +1054,11 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the two-phase callbacks are
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when two-phase commits are supported */
 	Assert(ctx->twophase);
@@ -1087,7 +1104,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1126,7 +1143,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	if (!ctx->callbacks.truncate_cb)
 		return;
@@ -1168,7 +1185,11 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 	ErrorContextCallback errcallback;
 	bool		ret;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, the filter-prepare callback is
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1199,7 +1220,11 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	ErrorContextCallback errcallback;
 	bool		ret;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, the filter-by-origin callback is
+	 * not set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1232,7 +1257,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	if (ctx->callbacks.message_cb == NULL)
 		return;
@@ -1268,7 +1293,11 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1317,7 +1346,11 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1366,7 +1399,11 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1407,7 +1444,11 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/*
 	 * We're only supposed to call this when streaming and two-phase commits
@@ -1452,7 +1493,11 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1493,7 +1538,11 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1543,7 +1592,11 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1584,7 +1637,11 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	/*
+	 * In both silent and fast-forward mode, all the stream callbacks are not
+	 * set, so the wrapper will not be called.
+	 */
+	Assert(ctx->decoding_mode == DECODING_MODE_NORMAL);
 
 	/* We're only supposed to call this when streaming is supported. */
 	Assert(ctx->streaming);
@@ -1630,7 +1687,7 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
 
-	Assert(!ctx->fast_forward);
+	Assert(ctx->decoding_mode != DECODING_MODE_FAST_FORWARD);
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1949,3 +2006,45 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * Read from the decoding slot and return true when meaningful changes are
+ * processed. Otherwise false.
+ *
+ * Currently, the function is used only for upgrading purposes, but there are
+ * no reasons to restrict it. So, the IsBinaryUpgrade is not checked here.
+ */
+bool
+DecodingContextHasdecodedItems(LogicalDecodingContext *ctx,
+							   XLogRecPtr end_of_wal)
+{
+	Assert(MyReplicationSlot);
+
+	/*
+	 * Start reading at the slot's restart_lsn, which we know to point to a
+	 * valid record.
+	 */
+	XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+	/* Invalidate non-timetravel entries */
+	InvalidateSystemCaches();
+
+	/* Loop until the end of WAL or some changes are processed */
+	while (!ctx->output_skipped && ctx->reader->EndRecPtr < end_of_wal)
+	{
+		XLogRecord *record;
+		char	   *errm = NULL;
+
+		record = XLogReadRecord(ctx->reader, &errm);
+
+		if (errm)
+			elog(ERROR, "could not find record for logical decoding: %s", errm);
+
+		if (record != NULL)
+			LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	return ctx->output_skipped;
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 197169d6b0..d3f8e22bf6 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -207,7 +207,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* restart at slot's confirmed_flush */
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									options,
-									false,
+									DECODING_MODE_NORMAL,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 7e5ec500d8..9980e2fd79 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1423,6 +1423,18 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 		SpinLockRelease(&s->mutex);
 
+		/*
+		 * The logical replication slots shouldn't be invalidated as
+		 * max_slot_wal_keep_size GUC is set to -1 during the upgrade.
+		 *
+		 * The following is just a sanity check.
+		 */
+		if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
+		{
+			Assert(max_slot_wal_keep_size_mb == -1);
+			elog(ERROR, "replication slots must not be invalidated during the upgrade");
+		}
+
 		if (active_pid != 0)
 		{
 			/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..89b9d03d1a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -114,7 +114,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
  * caller's responsibility to ensure it's set to something sensible.
  */
-static void
+void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
 								XLogRecPtr restart_lsn,
@@ -163,6 +163,9 @@ create_logical_replication_slot(char *name, char *plugin,
 
 /*
  * SQL function for creating a new logical replication slot.
+ *
+ * If you change this function, please see
+ * binary_upgrade_create_logical_replication_slot as well.
  */
 Datum
 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
@@ -485,7 +488,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		 */
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									NIL,
-									true,	/* fast_forward */
+									DECODING_MODE_FAST_FORWARD,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e250b0567e..b3b819d996 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1283,7 +1283,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * are reported early.
 	 */
 	logical_decoding_ctx =
-		CreateDecodingContext(cmd->startpoint, cmd->options, false,
+		CreateDecodingContext(cmd->startpoint, cmd->options, DECODING_MODE_NORMAL,
 							  XL_ROUTINE(.page_read = logical_read_xlog_page,
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f..9ccf530ed3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,14 +11,20 @@
 
 #include "postgres.h"
 
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
+#include "funcapi.h"
 #include "miscadmin.h"
+#include "replication/logical.h"
+#include "replication/slot.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
 
 
 #define CHECK_IS_BINARY_UPGRADE									\
@@ -261,3 +267,147 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+/*
+ * Verify the given slot has already been consumed with all the changes.
+ *
+ * Returns true if there are no changes after the confirmed_flush_lsn.
+ * Otherwise false.
+ *
+ * This is a special purpose function to ensure the given slot can be upgraded
+ * without data loss.
+ */
+Datum
+binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
+{
+	Name		slot_name;
+	XLogRecPtr	end_of_wal;
+	LogicalDecodingContext *ctx = NULL;
+	bool		has_record;
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	/* Quick exit if the input is NULL */
+	if (PG_ARGISNULL(0))
+		PG_RETURN_BOOL(false);
+	else
+		slot_name = PG_GETARG_NAME(0);
+
+	/*
+	 * Acquire the given slot. The error would not be happened because the
+	 * caller has already checked the existence of slot.
+	 */
+	ReplicationSlotAcquire(NameStr(*slot_name), true);
+
+	/*
+	 * It's caller's responsibility to check the health of slot.  Upcoming
+	 * functions assume the restart_lsn points a valid record.
+	 */
+	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+
+	/*
+	 * We use silent mode here to decode all changes without outputting them,
+	 * allowing us to detect all the records that could be sent downstream.
+	 */
+	ctx = CreateDecodingContext(InvalidXLogRecPtr,
+								NIL,
+								DECODING_MODE_SILENT,
+								XL_ROUTINE(.page_read = read_local_xlog_page,
+										   .segment_open = wal_segment_open,
+										   .segment_close = wal_segment_close),
+								NULL, NULL, NULL);
+
+	end_of_wal = GetFlushRecPtr(NULL);
+
+	has_record = DecodingContextHasdecodedItems(ctx, end_of_wal);
+
+	/* Clean up */
+	FreeDecodingContext(ctx);
+	ReplicationSlotRelease();
+
+	PG_RETURN_BOOL(!has_record);
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ *
+ * This function is almost same as pg_create_logical_replication_slot(), but
+ * the restart_lsn is set to the startpoint of next wal segment.
+ *
+ * This function returns the slot name, confirmed_flush_lsn, and the filename,
+ * which are pointed by restart_lsn.
+ */
+Datum
+binary_upgrade_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name;
+	Name		plugin;
+
+	/* Temporary slots is never handled in this function */
+	bool		two_phase;
+	XLogSegNo	xlogsegno;
+	char		xlogfilename[MAXFNAMELEN];
+	XLogRecPtr	restart_lsn;
+
+	Datum		result;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[3];
+	bool		nulls[3];
+
+	CHECK_IS_BINARY_UPGRADE;
+
+	if (PG_ARGISNULL(0) ||
+		PG_ARGISNULL(1) ||
+		PG_ARGISNULL(2))
+		elog(ERROR,
+			 "null argument to binary_upgrade_create_logical_replication_slot is not allowed");
+
+	CheckSlotPermissions();
+
+	CheckLogicalDecodingRequirements();
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Conditions seem OK, accept arguments */
+	name = PG_GETARG_NAME(0);
+	plugin = PG_GETARG_NAME(1);
+	two_phase = PG_GETARG_BOOL(2);
+
+	/* Calculate the next WAL segment and its LSN */
+	XLByteToPrevSeg(GetFlushRecPtr(NULL), xlogsegno, wal_segment_size);
+	XLogFileName(xlogfilename, (TimeLineID) 1, xlogsegno + 1,
+				 wal_segment_size);
+
+	/* And use the startpoint as restart_lsn */
+	XLogSegNoOffsetToRecPtr(xlogsegno + 1, 0, wal_segment_size, restart_lsn);
+
+	/*
+	 * Create a required logical replication slot. confirmed_flush is the same
+	 * as restart_lsn for now.
+	 */
+	create_logical_replication_slot(NameStr(*name),
+									NameStr(*plugin),
+									false,
+									two_phase,
+									restart_lsn,
+									false);
+
+	MyReplicationSlot->data.confirmed_flush = restart_lsn;
+
+	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+	values[2] = CStringGetTextDatum(xlogfilename);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	result = HeapTupleGetDatum(tuple);
+
+	/* ok, slot is now fully created, mark it as persistent */
+	ReplicationSlotPersist();
+	ReplicationSlotRelease();
+
+	PG_RETURN_DATUM(result);
+}
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 21a0ff9e42..5d35646481 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -33,6 +33,9 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
+static void check_old_cluster_for_valid_slots(bool live_check);
+static void create_consistent_snapshot(void);
 
 
 /*
@@ -89,8 +92,11 @@ check_and_dump_old_cluster(bool live_check)
 	if (!live_check)
 		start_postmaster(&old_cluster, true);
 
-	/* Extract a list of databases and tables from the old cluster */
-	get_db_and_rel_infos(&old_cluster);
+	/*
+	 * Extract a list of databases, tables, and logical replication slots from
+	 * the old cluster.
+	 */
+	get_db_rel_and_slot_infos(&old_cluster, live_check);
 
 	init_tablespaces();
 
@@ -107,6 +113,13 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	/*
+	 * Logical replication slots can be migrated since PG17. See comments atop
+	 * get_old_cluster_logical_slot_infos().
+	 */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+		check_old_cluster_for_valid_slots(live_check);
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -200,7 +213,7 @@ check_and_dump_old_cluster(bool live_check)
 void
 check_new_cluster(void)
 {
-	get_db_and_rel_infos(&new_cluster);
+	get_db_rel_and_slot_infos(&new_cluster, false);
 
 	check_new_cluster_is_empty();
 
@@ -223,6 +236,8 @@ check_new_cluster(void)
 	check_for_prepared_transactions(&new_cluster);
 
 	check_for_new_tablespace_dir();
+
+	check_new_cluster_logical_replication_slots();
 }
 
 
@@ -245,6 +260,27 @@ report_clusters_compatible(void)
 }
 
 
+/*
+ * Log the details of the current snapshot to the WAL, allowing the snapshot
+ * state to be reconstructed for logical decoding on the upgraded slots.
+ */
+static void
+create_consistent_snapshot(void)
+{
+	DbInfo	   *db = &new_cluster.dbarr.dbs[0];
+	PGconn	   *conn;
+
+	prep_status("Creating a consistent snapshot on new cluster");
+
+	conn = connectToServer(&new_cluster, db->db_name);
+
+	PQclear(executeQueryOrDie(conn, "SELECT pg_log_standby_snapshot();"));
+	PQfinish(conn);
+
+	check_ok();
+}
+
+
 void
 issue_warnings_and_set_wal_level(void)
 {
@@ -256,6 +292,14 @@ issue_warnings_and_set_wal_level(void)
 	 */
 	start_postmaster(&new_cluster, true);
 
+	/*
+	 * Also, we must execute pg_log_standby_snapshot() when logical replication
+	 * slots are migrated. Because RUNNING_XACTS record is required to create
+	 * a consistent snapshot.
+	 */
+	if (count_old_cluster_logical_slots())
+		create_consistent_snapshot();
+
 	/* Reindex hash indexes for old < 10.0 */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
 		old_9_6_invalidate_hash_indexes(&new_cluster, false);
@@ -1451,3 +1495,158 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Verify that there are no logical replication slots on the new cluster and
+ * that the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+	PGresult   *res;
+	PGconn	   *conn;
+	int			nslots_on_old;
+	int			nslots_on_new;
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* Logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+		return;
+
+	nslots_on_old = count_old_cluster_logical_slots();
+
+	/* Quick return if there are no logical slots to be migrated. */
+	if (nslots_on_old == 0)
+		return;
+
+	conn = connectToServer(&new_cluster, "template1");
+
+	prep_status("Checking for new cluster logical replication slots");
+
+	res = executeQueryOrDie(conn, "SELECT count(*) "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE slot_type = 'logical' AND "
+							"temporary IS FALSE;");
+
+	if (PQntuples(res) != 1)
+		pg_fatal("could not count the number of logical replication slots");
+
+	nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+
+	if (nslots_on_new)
+		pg_fatal("Expected 0 logical replication slots but found %d.",
+				 nslots_on_new);
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings "
+							"WHERE name IN ('max_replication_slots', 'wal_level') "
+							"ORDER BY name DESC;");
+
+	if (PQntuples(res) != 2)
+		pg_fatal("could not determine parameter settings on new cluster");
+
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	max_replication_slots = atoi(PQgetvalue(res, 1, 0));
+
+	if (nslots_on_old > max_replication_slots)
+		pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+				 "logical replication slots (%d) on the old cluster",
+				 max_replication_slots, nslots_on_old);
+
+	PQclear(res);
+	PQfinish(conn);
+
+	check_ok();
+}
+
+/*
+ * check_old_cluster_for_valid_slots()
+ *
+ * Verify that all the logical slots are usable and have consumed all the WAL
+ * before shutdown. The check has already been done in
+ * get_old_cluster_logical_slot_infos(), so this function reads the result and
+ * reports to the user.
+ */
+static void
+check_old_cluster_for_valid_slots(bool live_check)
+{
+	int			dbnum;
+	char		output_path[MAXPGPATH];
+	FILE	   *script = NULL;
+
+	prep_status("Checking for valid logical replication slots");
+
+	/* */
+	snprintf(output_path, sizeof(output_path), "%s/%s",
+			 log_opts.basedir,
+			 "invalid_logical_relication_slots.txt");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		int			slotnum;
+		LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot = &slot_arr->slots[slotnum];
+
+			/* Is the slot usable? */
+			if (slot->invalid)
+			{
+				if (script == NULL &&
+					(script = fopen_priv(output_path, "w")) == NULL)
+					pg_fatal("could not open file \"%s\": %s",
+							 output_path, strerror(errno));
+
+				fprintf(script, "The slot \"%s\" is invalid\n",
+						slot->slotname);
+
+				/* No need to check this slot, seek to new one */
+				continue;
+			}
+
+			/*
+			 * Do additional check to ensure that all logical replication
+			 * slots have consumed all the WAL before shutdown.
+			 *
+			 * Note: This can be satisfied only when the old cluster has been
+			 * shut down, so we skip this for live checks.
+			 */
+			if (!live_check && !slot->caught_up)
+			{
+				if (script == NULL &&
+					(script = fopen_priv(output_path, "w")) == NULL)
+					pg_fatal("could not open file \"%s\": %s",
+							 output_path, strerror(errno));
+
+				fprintf(script,
+						"The slot \"%s\" has not consumed the WAL yet\n",
+						slot->slotname);
+			}
+		}
+	}
+
+	if (script)
+	{
+		fclose(script);
+
+		pg_log(PG_REPORT, "fatal");
+		pg_fatal("Your installation contains logical replication slots that cannot be upgraded.\n"
+				 "These slots can't be copied, so this cluster cannot be upgraded.\n"
+				 "Consider removing invalid slots and/or consuming the pending WAL if any,\n"
+				 "and then restart the upgrade.\n"
+				 "A list of all such logical replication slots is in the file:\n"
+				 "    %s", output_path);
+	}
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..c0f5e58fa2 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,9 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries containing either C-language functions
+ *	or are corresponding to logical replication output plugins.
+ *
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -55,6 +57,7 @@ get_loadable_libraries(void)
 	PGresult  **ress;
 	int			totaltups;
 	int			dbnum;
+	int			n_libinfos;
 
 	ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
 	totaltups = 0;
@@ -81,7 +84,12 @@ get_loadable_libraries(void)
 		PQfinish(conn);
 	}
 
-	os_info.libraries = (LibraryInfo *) pg_malloc(totaltups * sizeof(LibraryInfo));
+	/*
+	 * Allocate memory for required libraries and logical replication output
+	 * plugins.
+	 */
+	n_libinfos = totaltups + count_old_cluster_logical_slots();
+	os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
 	totaltups = 0;
 
 	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
@@ -89,6 +97,8 @@ get_loadable_libraries(void)
 		PGresult   *res = ress[dbnum];
 		int			ntups;
 		int			rowno;
+		int			slotno;
+		LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
 
 		ntups = PQntuples(res);
 		for (rowno = 0; rowno < ntups; rowno++)
@@ -101,6 +111,23 @@ get_loadable_libraries(void)
 			totaltups++;
 		}
 		PQclear(res);
+
+		/*
+		 * Store the names of output plugins as well. There is a possibility
+		 * that duplicated plugins are set, but the consumer function
+		 * check_loadable_libraries() will avoid checking the same library, so
+		 * we do not have to consider their uniqueness here.
+		 */
+		for (slotno = 0; slotno < slot_arr->nslots; slotno++)
+		{
+			if (slot_arr->slots[slotno].invalid)
+				continue;
+
+			os_info.libraries[totaltups].name = pg_strdup(slot_arr->slots[slotno].plugin);
+			os_info.libraries[totaltups].dbnum = dbnum;
+
+			totaltups++;
+		}
 	}
 
 	pg_free(ress);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d6..755ea3bde7 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,8 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
 
 
 /*
@@ -266,13 +268,15 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 }
 
 /*
- * get_db_and_rel_infos()
+ * get_db_rel_and_slot_infos()
  *
  * higher level routine to generate dbinfos for the database running
  * on the given "port". Assumes that server is already running.
+ *
+ * live_check would be used only when the target is the old cluster.
  */
 void
-get_db_and_rel_infos(ClusterInfo *cluster)
+get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 {
 	int			dbnum;
 
@@ -283,7 +287,17 @@ get_db_and_rel_infos(ClusterInfo *cluster)
 	get_db_infos(cluster);
 
 	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-		get_rel_infos(cluster, &cluster->dbarr.dbs[dbnum]);
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_rel_infos(cluster, pDbInfo);
+
+		/*
+		 * Retrieve the logical replication slots infos for the old cluster.
+		 */
+		if (cluster == &old_cluster)
+			get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+	}
 
 	if (cluster == &old_cluster)
 		pg_log(PG_VERBOSE, "\nsource databases:");
@@ -600,6 +614,127 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_old_cluster_logical_slot_infos()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the
+ * database referred to by "dbinfo". The status of each logical slot is gotten
+ * here, but they are used at the checking phase. See
+ * check_old_cluster_for_valid_slots().
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not saved at shutdown, so
+ * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
+ * which can lead to data loss. It is still not guaranteed for manually created
+ * slots in PG17, so subsequent checks done in
+ * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
+ * are included.
+ */
+static void
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+	int			num_slots = 0;
+
+	/* Logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
+	{
+		dbinfo->slot_arr.slots = slotinfos;
+		dbinfo->slot_arr.nslots = num_slots;
+		return;
+	}
+
+	conn = connectToServer(&old_cluster, dbinfo->db_name);
+
+	/*
+	 * Fetch the logical replication slot information. The check whether the
+	 * slot is considered caught up is done by an upgrade function. This
+	 * regards the slot is caught up if any changes are not found while
+	 * decoding. See binary_upgrade_validate_wal_logical_end().
+	 *
+	 * Note that we can't ensure whether the slot is caught up during
+	 * live_check as the new WAL records could be generated.
+	 *
+	 * We intentionally skip checking the WALs for invalidated slots as the
+	 * corresponding WALs could have been removed for such slots.
+	 *
+	 * The temporary slots are explicitly ignored while checking because such
+	 * slots cannot exist after the upgrade. During the upgrade, clusters are
+	 * started and stopped several times causing any temporary slots to be
+	 * removed.
+	 */
+	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
+							"%s as caught_up, conflicting as invalid "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE slot_type = 'logical' AND "
+							"database = current_database() AND "
+							"temporary IS FALSE;",
+							live_check ? "FALSE" :
+							"(CASE WHEN conflicting THEN FALSE "
+							"ELSE (SELECT pg_catalog.binary_upgrade_validate_wal_logical_end(slot_name)) "
+							"END)");
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+		int			i_caught_up;
+		int			i_invalid;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+		i_caught_up = PQfnumber(res, "caught_up");
+		i_invalid = PQfnumber(res, "invalid");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+			curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
+			curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+
+/*
+ * count_old_cluster_logical_slots()
+ *
+ * Returns the number of logical replication slots for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather slot information only for cluster versions greater than or
+ * equal to PG17. See get_old_cluster_logical_slot_infos().
+ */
+int
+count_old_cluster_logical_slots(void)
+{
+	int			dbnum;
+	int			slot_count = 0;
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+		slot_count += old_cluster.dbarr.dbs[dbnum].slot_arr.nslots;
+
+	return slot_count;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -642,8 +777,11 @@ print_db_infos(DbInfoArr *db_arr)
 
 	for (dbnum = 0; dbnum < db_arr->ndbs; dbnum++)
 	{
-		pg_log(PG_VERBOSE, "Database: \"%s\"", db_arr->dbs[dbnum].db_name);
-		print_rel_infos(&db_arr->dbs[dbnum].rel_arr);
+		DbInfo	   *pDbInfo = &db_arr->dbs[dbnum];
+
+		pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+		print_rel_infos(&pDbInfo->rel_arr);
+		print_slot_infos(&pDbInfo->slot_arr);
 	}
 }
 
@@ -660,3 +798,25 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	/* Quick return if there are no logical slots. */
+	if (slot_arr->nslots == 0)
+		return;
+
+	pg_log(PG_VERBOSE, "Logical replication slots within the database:");
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+	{
+		LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_info->slotname,
+			   slot_info->plugin,
+			   slot_info->two_phase);
+	}
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..2c4f38d865 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_upgrade_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 96bfb67167..03c5f5909f 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static char *create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -81,6 +82,8 @@ main(int argc, char **argv)
 {
 	char	   *deletion_script_file_name = NULL;
 	bool		live_check = false;
+	char	   *xlogfilename = NULL;
+	PQExpBufferData resetwal_options;
 
 	/*
 	 * pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -175,6 +178,20 @@ main(int argc, char **argv)
 	transfer_all_new_tablespaces(&old_cluster.dbarr, &new_cluster.dbarr,
 								 old_cluster.pgdata, new_cluster.pgdata);
 
+	/*
+	 * If the old cluster has logical slots, migrate them to a new cluster.
+	 *
+	 * The function returns the next wal segment file which must be passed to
+	 * upcoming pg_resetwal command.
+	 */
+
+	if (count_old_cluster_logical_slots())
+	{
+		start_postmaster(&new_cluster, true);
+		xlogfilename = create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	/*
 	 * Assuming OIDs are only used in system tables, there is no need to
 	 * restore the OID counter because we have not transferred any OIDs from
@@ -182,10 +199,23 @@ main(int argc, char **argv)
 	 * because there is no need to have the schema load use new oids.
 	 */
 	prep_status("Setting next OID for new cluster");
+
+	/*
+	 * Construct an option string. If the next wal segment is given, also use
+	 * it.
+	 */
+	initPQExpBuffer(&resetwal_options);
+	appendPQExpBuffer(&resetwal_options, "-o %u \"%s\"",
+					  old_cluster.controldata.chkpnt_nxtoid,
+					  new_cluster.pgdata);
+	if (xlogfilename)
+		appendPQExpBuffer(&resetwal_options, " -l %s", xlogfilename);
+
 	exec_prog(UTILITY_LOG_FILE, NULL, true, true,
-			  "\"%s/pg_resetwal\" -o %u \"%s\"",
-			  new_cluster.bindir, old_cluster.controldata.chkpnt_nxtoid,
-			  new_cluster.pgdata);
+			  "\"%s/pg_resetwal\" %s",
+			  new_cluster.bindir, resetwal_options.data);
+
+	termPQExpBuffer(&resetwal_options);
 	check_ok();
 
 	if (user_opts.do_sync)
@@ -593,7 +623,7 @@ create_new_objects(void)
 		set_frozenxids(true);
 
 	/* update new_cluster info now that we have objects in the databases */
-	get_db_and_rel_infos(&new_cluster);
+	get_db_rel_and_slot_infos(&new_cluster, false);
 }
 
 /*
@@ -862,3 +892,77 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static char *
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+	char	   *xlogfilename = NULL;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PGconn	   *conn;
+		PQExpBuffer query;
+		int			slotnum;
+		char		log_file_name[MAXPGPATH];
+
+		/* Skip this database if there are no slots */
+		if (slot_arr->nslots == 0)
+			continue;
+
+		conn = connectToServer(&new_cluster, old_db->db_name);
+		query = createPQExpBuffer();
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			PGresult   *res;
+			LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+			/*
+			 * Constructs a query for creating logical replication slots. An
+			 * upgrade function is used to set restart_lsn appropriately.
+			 */
+			appendPQExpBuffer(query,
+							  "SELECT * FROM "
+							  "pg_catalog.binary_upgrade_create_logical_replication_slot(");
+			appendStringLiteralConn(query, slot_info->slotname, conn);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteralConn(query, slot_info->plugin, conn);
+			appendPQExpBuffer(query, ", %s);",
+							  slot_info->two_phase ? "true" : "false");
+
+			res = executeQueryOrDie(conn, "%s", query->data);
+
+			Assert(PQntuples(res) == 1 && PQnfields(res) == 3);
+
+			if (xlogfilename == NULL)
+				xlogfilename = pg_strdup(PQgetvalue(res, 0, 2));
+
+			PQclear(res);
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(query);
+	}
+
+	end_progress_output();
+	check_ok();
+
+	return xlogfilename;
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 842f3b6cd3..69b0eb5346 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -150,6 +150,24 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information.
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* can the slot decode 2PC? */
+	bool		caught_up;		/* Is the slot caught up to latest changes? */
+	bool		invalid;		/* if true, the slot is unusable */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;			/* number of logical slot infos */
+	LogicalSlotInfo *slots;		/* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +194,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,7 +419,8 @@ void		check_loadable_libraries(void);
 FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
-void		get_db_and_rel_infos(ClusterInfo *cluster);
+void		get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
+int			count_old_cluster_logical_slots(void);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index 0bc3d2806b..d7f6c268ef 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -201,6 +201,7 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 	PGconn	   *conn;
 	bool		pg_ctl_return = false;
 	char		socket_string[MAXPGPATH + 200];
+	PQExpBufferData pgoptions;
 
 	static bool exit_hook_registered = false;
 
@@ -227,23 +228,41 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 				 cluster->sockdir);
 #endif
 
+	initPQExpBuffer(&pgoptions);
+
 	/*
-	 * Use -b to disable autovacuum.
+	 * Construct a parameter string which is passed to the server process.
 	 *
 	 * Turn off durability requirements to improve object creation speed, and
 	 * we only modify the new cluster, so only use it there.  If there is a
 	 * crash, the new cluster has to be recreated anyway.  fsync=off is a big
 	 * win on ext4.
 	 */
+	if (cluster == &new_cluster)
+		appendPQExpBufferStr(&pgoptions, " -c synchronous_commit=off -c fsync=off -c full_page_writes=off");
+
+	/*
+	 * Use max_slot_wal_keep_size as -1 to prevent the WAL removal by the
+	 * checkpointer process.  If WALs required by logical replication slots
+	 * are removed, the slots are unusable.  This setting prevents the
+	 * invalidation of slots during the upgrade. We set this option when
+	 * cluster is PG17 or later because logical replication slots can only be
+	 * migrated since then. Besides, max_slot_wal_keep_size is added in PG13.
+	 */
+	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
+		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
+
+	/* Use -b to disable autovacuum. */
 	snprintf(cmd, sizeof(cmd),
 			 "\"%s/pg_ctl\" -w -l \"%s/%s\" -D \"%s\" -o \"-p %d -b%s %s%s\" start",
 			 cluster->bindir,
 			 log_opts.logdir,
 			 SERVER_LOG_FILE, cluster->pgconfig, cluster->port,
-			 (cluster == &new_cluster) ?
-			 " -c synchronous_commit=off -c fsync=off -c full_page_writes=off" : "",
+			 pgoptions.data,
 			 cluster->pgopts ? cluster->pgopts : "", socket_string);
 
+	termPQExpBuffer(&pgoptions);
+
 	/*
 	 * Don't throw an error right away, let connecting throw the error because
 	 * it might supply a reason for the failure.
diff --git a/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
new file mode 100644
index 0000000000..c90edc12de
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
@@ -0,0 +1,325 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Find qw(find);
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'replica');
+
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when new cluster wal_level is not 'logical'
+
+# Preparations for the subsequent test:
+# 1. Create a slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# pg_upgrade will fail because the new cluster wal_level is 'replica'
+command_checks_all(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode,
+	],
+	1,
+	[qr/wal_level must be \"logical\", but is set to \"replica\"/],
+	[qr//],
+	'run of pg_upgrade where the new cluster has the wrong wal_level');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when max_replication_slots on a new cluster is
+#		too low
+
+# Preparations for the subsequent test:
+# 1. Create a second slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
+);
+
+# 2. Consume WAL records to avoid another type of upgrade failure. It will be
+#	 tested in subsequent cases.
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);"
+);
+$old_publisher->stop;
+
+# 3. max_replication_slots is set to smaller than the number of slots (2)
+#	 present on the old cluster
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# 4. wal_level is set correctly on the new cluster
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+
+# pg_upgrade will fail because the new cluster has insufficient max_replication_slots
+command_checks_all(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode,
+	],
+	1,
+	[
+		qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+	],
+	[qr//],
+	'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+# Preparations for the subsequent test:
+# 1. Remove the slot 'test_slot2', leaving only 1 slot on the old cluster, so
+#    the new cluster config  max_replication_slots=1 will now be enough.
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot2');");
+
+# 2. Generate extra WAL records. Because these WAL records do not get consumed
+#	 it will cause the upcoming pg_upgrade test to fail.
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;");
+$old_publisher->stop;
+
+# pg_upgrade will fail because the slot still has unconsumed WAL records
+command_checks_all(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode,
+	],
+	1,
+	[
+		qr/Your installation contains logical replication slots that cannot be upgraded./
+	],
+	[qr//],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Verify the reason why the logical replication slot cannot be upgraded
+my $log_path = $new_publisher->data_dir . "/pg_upgrade_output.d";
+my $slots_filename;
+
+# Find a txt file that contains a list of logical replication slots that cannot
+# be upgraded. We cannot predict the file's path because the output directory
+# contains a milliseconds timestamp. File::Find::find must be used.
+find(
+	sub {
+		if ($File::Find::name =~ m/invalid_logical_relication_slots\.txt/)
+		{
+			$slots_filename = $File::Find::name;
+		}
+	},
+	$new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# And check the content. The failure should be because there are unconsumed
+# WALs after confirmed_flush_lsn of test_slot1.
+like(
+	slurp_file($slots_filename),
+	qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+	'the previous test failed due to unconsumed WALs');
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+# Consume remained WAL records
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL);");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when there are non-transactional changes
+
+# Preparations for the subsequent test:
+# 1. Emit a non-transactional message
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');");
+$old_publisher->stop;
+
+# pg_upgrade will fail because there is a non-transactional change
+command_checks_all(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode,
+	],
+	1,
+	[
+		qr/Your installation contains logical replication slots that cannot be upgraded./
+	],
+	[qr//],
+	'run of pg_upgrade of old cluster with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Verify the reason why the logical replication slot cannot be upgraded
+my $log_path = $new_publisher->data_dir . "/pg_upgrade_output.d";
+my $slots_filename;
+
+# Find a txt file that contains a list of logical replication slots that cannot
+# be upgraded.
+find(
+	sub {
+		if ($File::Find::name =~ m/invalid_logical_relication_slots\.txt/)
+		{
+			$slots_filename = $File::Find::name;
+		}
+	},
+	$new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# And check the content. The content of the file is same as the previous test
+like(
+	slurp_file($slots_filename),
+	qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+	'the previous test failed due to unconsumed WALs');
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+# Remove the remaining slot
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot1');");
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES;");
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
+
+# 2. Temporarily disable the subscription
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub DISABLE");
+$old_publisher->stop;
+
+# Dry run, successful check is expected. This is not a live check, so a
+# shutdown checkpoint record would be inserted. We want to test that a
+# subsequent upgrade is successful by skipping such an expected WAL record.
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode, '--check'
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Actual run, successful upgrade is expected
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d', $old_publisher->data_dir,
+		'-D', $new_publisher->data_dir,
+		'-b', $bindir,
+		'-B', $bindir,
+		'-s', $new_publisher->host,
+		'-p', $old_publisher->port,
+		'-P', $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check that the slot 'regress_sub' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
+
+# Update the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql(
+	'postgres', qq[
+	ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
+	ALTER SUBSCRIPTION regress_sub ENABLE;
+]);
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('regress_sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are replicated to the subscriber');
+
+# Clean up
+$subscriber->stop();
+$new_publisher->stop();
+
+done_testing();
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f0b7b9cbd8..eacd91eb67 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11370,6 +11370,18 @@
   proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_validate_wal_logical_end', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'bool',
+  proargtypes => 'name',
+  prosrc => 'binary_upgrade_validate_wal_logical_end' },
+{ oid => '8047', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_create_logical_replication_slot', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'record',
+  proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn,text}',
+  proargmodes => '{i,i,i,o,o,o}',
+  prosrc => 'binary_upgrade_create_logical_replication_slot' },
 
 # conversion functions
 { oid => '4302',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..d0f9dda6c5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,6 +30,24 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
 														 bool skipped_xact
 );
 
+typedef enum DecodingMode
+{
+	/* Decode and output the changes if needed using the output plugin */
+	DECODING_MODE_NORMAL,
+
+	/*
+	 * Fast-forward decoding mode: Skips loading the output plugin and
+	 * bypasses decoding most changes in a transaction.
+	 */
+	DECODING_MODE_FAST_FORWARD,
+
+	/*
+	 * Silent decoding mode: Skips loading the output plugin and decodes all
+	 * changes without emitting any output.
+	 */
+	DECODING_MODE_SILENT
+} DecodingMode;
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -44,11 +62,11 @@ typedef struct LogicalDecodingContext
 	struct SnapBuild *snapshot_builder;
 
 	/*
-	 * Marks the logical decoding context as fast forward decoding one. Such a
-	 * context does not have plugin loaded so most of the following properties
-	 * are unused.
+	 * For DECODING_MODE_FAST_FORWARD and DECODING_MODE_SILENT, the context
+	 * does not have plugin loaded so most of the following properties are
+	 * unused.
 	 */
-	bool		fast_forward;
+	DecodingMode decoding_mode;
 
 	OutputPluginCallbacks callbacks;
 	OutputPluginOptions options;
@@ -109,6 +127,13 @@ typedef struct LogicalDecodingContext
 	TransactionId write_xid;
 	/* Are we processing the end LSN of a transaction? */
 	bool		end_xact;
+
+	/*
+	 * Did the logical decoding context skip outputting any changes?
+	 *
+	 * This flag is used only when the context is in the silent mode.
+	 */
+	bool		output_skipped;
 } LogicalDecodingContext;
 
 
@@ -124,7 +149,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 														 LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
-													 bool fast_forward,
+													 DecodingMode decoding_mode,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
@@ -145,4 +170,7 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
+extern bool DecodingContextHasdecodedItems(LogicalDecodingContext *ctx,
+										   XLogRecPtr end_of_wal);
+
 #endif
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 758ca79a81..6559d3f014 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -227,6 +227,10 @@ extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
 extern void ReplicationSlotMarkDirty(void);
+extern void create_logical_replication_slot(char *name, char *plugin,
+											bool temporary, bool two_phase,
+											XLogRecPtr restart_lsn,
+											bool find_startpoint);
 
 /* misc stuff */
 extern void ReplicationSlotInitialize(void);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8de90c4958..b75a69f543 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -562,6 +562,7 @@ DeallocateStmt
 DeclareCursorStmt
 DecodedBkpBlock
 DecodedXLogRecord
+DecodingMode
 DecodingOutputState
 DefElem
 DefElemAction
@@ -1503,6 +1504,8 @@ LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

