From 11087d85a7a86c5066b99163f4b595575e3708f0 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Tue, 10 Jan 2023 13:35:45 +0000
Subject: [PATCH v14] Time-delayed logical replication subscriber

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (particularly to fix
errors that might cause data loss).

If the subscription sets min_apply_delay parameter, the logical
replication worker will delay the transaction commit for min_apply_delay
milliseconds.

The delay is calculated between the WAL time stamp and the current time
on the subscriber.

The delay occurs only on WAL records for transaction begins. The main
reason is to avoid keeping a transaction open for a long time. Regular
and prepared transactions are covered. Streamed transactions are also
covered.

Author: Euler Taveira
Discussion: https://postgr.es/m/CAB-JLwYOYwL=XTyAXKiH5CtM_Vm8KjKh7aaitCKvmCh4rzr5pQ@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                    |   9 +
 doc/src/sgml/config.sgml                      |  12 ++
 doc/src/sgml/logical-replication.sgml         |   7 +
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  59 +++++-
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   7 +-
 src/backend/commands/subscriptioncmds.c       |  87 +++++++-
 .../replication/logical/applyparallelworker.c |   3 +-
 src/backend/replication/logical/worker.c      | 161 +++++++++++++--
 src/backend/utils/adt/timestamp.c             |  29 +++
 src/bin/pg_dump/pg_dump.c                     |  16 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   9 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/datatype/timestamp.h              |   2 +
 src/include/replication/worker_internal.h     |   2 +-
 src/include/utils/timestamp.h                 |   2 +
 src/test/regress/expected/subscription.out    | 185 +++++++++++-------
 src/test/regress/sql/subscription.sql         |  25 +++
 src/test/subscription/meson.build             |   1 +
 src/test/subscription/t/032_apply_delay.pl    | 151 ++++++++++++++
 23 files changed, 679 insertions(+), 102 deletions(-)
 create mode 100644 src/test/subscription/t/032_apply_delay.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..bf3c05241c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7873,6 +7873,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subminapplydelay</structfield> <type>int8</type>
+      </para>
+      <para>
+       The length of time (ms) to delay the application of changes.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subname</structfield> <type>name</type>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 77574e2d4e..223c07b06f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4753,6 +4753,18 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
        the <filename>postgresql.conf</filename> file or on the server
        command line.
       </para>
+      <para>
+       For time delayed logical replication, the apply worker sends a Standby
+       Status Update message to the corresponding publisher per the indicated
+       time of this parameter. Therefore, if this parameter is longer than
+       <literal>wal_sender_timeout</literal> on the publisher, then the
+       walsender doesn't get any update message during the delay and repeatedly
+       terminates due to the timeout errors. Hence, make sure this parameter
+       shorter than the <literal>wal_sender_timeout</literal> of the publisher.
+       If this parameter is set to zero with time delayed replication, the
+       apply worker doesn't send any feedback messages during the
+       <literal>min_apply_delay</literal>.
+      </para>
       </listitem>
      </varlistentry>
 
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 54f48be87f..6407804547 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -247,6 +247,13 @@
    target table.
   </para>
 
+  <para>
+   The subscriber replication can be instructed to lag behind the publisher
+   side changes by specifying the <literal>min_apply_delay</literal>
+   subscription parameter. See <xref linkend="sql-createsubscription"/> for
+   details.
+  </para>
+
   <sect2 id="logical-replication-subscription-slot">
    <title>Replication Slot Management</title>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 1e8d72062b..d63aff1b90 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_apply_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index eba72c6af6..d95ff8944e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,7 +349,47 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_apply_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, the subscriber applies changes as soon as possible. As
+          with the physical replication feature
+          (<xref linkend="guc-recovery-min-apply-delay"/>), it can be useful to
+          have a time-delayed logical replica. This parameter lets the user to
+          delay the application of changes by a specified amount of time. If this
+          value is specified without units, it is taken as milliseconds. The
+          default is zero(no delay).
+         </para>
+         <para>
+          The delay occurs only on WAL records for transaction begins and after
+          the initial table synchronization. It is possible that the
+          replication delay between publisher and subscriber exceeds the value
+          of this parameter, in which case no delay is added. Note that the
+          delay is calculated between the WAL time stamp as written on
+          publisher and the current time on the subscriber. Time spent in logical
+          decoding and in transferring the transaction may reduce the actual wait
+          time. If the system clocks on publisher and subscriber are not
+          synchronized, this may lead to apply changes earlier than expected,
+          but this is not a major issue because this parameter is typically much
+          larger than the time deviations between servers. Note that if this
+          parameter is set to a long delay, the replication will stop if the
+          replication slot falls behind the current LSN by more than
+          <link linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</literal></link>.
+         </para>
+         <warning>
+           <para>
+            Delaying the replication can mean there is a much longer time between making
+            a change on the publisher, and that change being committed on the subscriber.
+            This can have a big impact on synchronous replication.
+            See <xref linkend="guc-synchronous-commit"/>.
+           </para>
+         </warning>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
@@ -413,6 +453,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    published with different column lists are not supported.
   </para>
 
+  <para>
+   Setting streaming to <literal>parallel</literal> mode and <literal>min_apply_delay</literal>
+   at the same time is not supported.
+  </para>
+
   <para>
    We allow non-existent publications to be specified so that users can add
    those later. This means
@@ -472,6 +517,18 @@ CREATE SUBSCRIPTION mysub
         PUBLICATION insert_only
                WITH (enabled = false);
 </programlisting></para>
+
+  <para>
+   Create a subscription to a remote server that replicates tables in
+   the <literal>mypub</literal> publication and starts replicating immediately
+   on commit. Pre-existing data is not copied. The application of changes is
+   delayed by 4 hours.
+<programlisting>
+CREATE SUBSCRIPTION mysub
+         CONNECTION 'host=192.168.1.50 port=5432 user=foo dbname=foodb'
+        PUBLICATION mypub
+               WITH (copy_data = false, min_apply_delay = '4h');
+</programlisting></para>
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..c767cc1c3a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -64,6 +64,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->oid = subid;
 	sub->dbid = subform->subdbid;
 	sub->skiplsn = subform->subskiplsn;
+	sub->minapplydelay = subform->subminapplydelay;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 447c9b970f..4004fcd0c4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1299,9 +1299,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subminapplydelay, subname, subowner,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00dd74..d55864a2ce 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_APPLY_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int64		min_apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY))
+		opts->min_apply_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,43 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+				 strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			char	   *val,
+					   *tmp;
+			Interval   *interval;
+			int64		ms;
+
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
+			tmp = defGetString(defel);
+
+			/*
+			 * If no unit was specified, then explicitly add 'ms' otherwise
+			 * the interval_in function would assume 'seconds'.
+			 */
+			if (strspn(tmp, "-0123456789 ") == strlen(tmp))
+				val = psprintf("%sms", tmp);
+			else
+				val = tmp;
+
+			interval = DatumGetIntervalP(DirectFunctionCall3(interval_in,
+															 CStringGetDatum(val),
+															 ObjectIdGetDatum(InvalidOid),
+															 Int32GetDatum(-1)));
+
+			ms = interval2ms(interval);
+			if (ms < 0 || ms > INT_MAX)
+				ereport(ERROR,
+						errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+						errmsg(INT64_FORMAT " ms is outside the valid range for parameter \"%s\"",
+							   ms, "min_apply_delay"));
+
+			opts->min_apply_delay = ms;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/* Test the combination of streaming mode and min_apply_delay */
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+		opts->min_apply_delay > 0)
+	{
+		if (opts->streaming == LOGICALREP_STREAM_PARALLEL)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("%s and %s are mutually exclusive options",
+						   "min_apply_delay > 0", "streaming = parallel"));
+	}
 }
 
 /*
@@ -560,7 +612,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -625,6 +678,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
 	values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
 	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+	values[Anum_pg_subscription_subminapplydelay - 1] = Int64GetDatum(opts.min_apply_delay);
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
@@ -1054,7 +1108,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_APPLY_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1098,6 +1152,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
+					/*
+					 * Test the combination of streaming mode and
+					 * min_apply_delay
+					 */
+					if (opts.streaming == LOGICALREP_STREAM_PARALLEL &&
+						sub->minapplydelay > 0)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set %s for subscription with %s",
+									   "streaming = parallel", "min_apply_delay > 0"));
+
 					values[Anum_pg_subscription_substream - 1] =
 						CharGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
@@ -1111,6 +1176,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				{
+					/*
+					 * Test the combination of streaming mode and
+					 * min_apply_delay
+					 */
+					if (opts.min_apply_delay > 0 &&
+						sub->stream == LOGICALREP_STREAM_PARALLEL)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set %s for subscription with %s",
+									   "min_apply_delay > 0", "streaming = parallel"));
+
+					values[Anum_pg_subscription_subminapplydelay - 1] =
+						Int64GetDatum(opts.min_apply_delay);
+					replaces[Anum_pg_subscription_subminapplydelay - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 2e5914d5d9..72e6f5ce84 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void)
 	{
 		apply_spooled_messages(&MyParallelShared->fileset,
 							   MyParallelShared->xid,
-							   InvalidXLogRecPtr);
+							   InvalidXLogRecPtr,
+							   0);
 		pa_set_fileset_state(MyParallelShared, FS_EMPTY);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39445..5d11cc5662 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -316,6 +316,17 @@ static List *on_commit_wakeup_workers_subids = NIL;
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
+/*
+ * In order to avoid walsender's timeout during time delayed replication,
+ * it's necessary to keep sending feedback messages during the delay from the
+ * worker process. Meanwhile, the feature delays the apply before starting the
+ * transaction and thus we don't write WALs for the suspended changes during
+ * the wait. Hence, in the case the worker process sends a feedback during the
+ * delay, avoid having positions of the flushed and apply LSN overwritten by
+ * the latest LSN.
+ */
+static XLogRecPtr last_received = InvalidXLogRecPtr;
+
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
 
@@ -386,10 +397,13 @@ static void stream_write_change(char action, StringInfo s);
 static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
 static void stream_close_file(void);
 
-static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply,
+						  bool in_delaying_apply);
 
 static void DisableSubscriptionAndExit(void);
 
+static void maybe_delay_apply(TimestampTz ts);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -996,6 +1010,94 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * When min_apply_delay parameter is set on the subscriber, we wait long enough
+ * to make sure a transaction is applied at least that interval behind the
+ * publisher.
+ *
+ * While the physical replication applies the delay at commit time, this
+ * feature applies the delay for the next transaction but before starting the
+ * transaction. This is mainly because keeping a transaction that conducted
+ * write operations open for a long time results in some issues such as bloat
+ * and locks.
+ *
+ * The min_apply_delay parameter will take effect only after all tables are in
+ * READY state.
+ */
+static void
+maybe_delay_apply(TimestampTz ts)
+{
+	/* Nothing to do if no delay set */
+	if (MySubscription->minapplydelay <= 0)
+		return;
+
+	/*
+	 * The min_apply_delay parameter is ignored until all tablesync workers
+	 * have reached READY state. If we allow the delay during the catchup
+	 * phase, once we reach the limit of tablesync workers, it will impose a
+	 * delay for each subsequent worker. It means it will take a long time to
+	 * finish the initial table synchronization.
+	 */
+	if (!AllTablesyncsReady())
+		return;
+
+	while (true)
+	{
+		long		diffms;
+
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/*
+		 * Before calculating the time duration, reload the catalog if needed.
+		 */
+		if (!in_remote_transaction && !in_streamed_transaction)
+		{
+			AcceptInvalidationMessages();
+			maybe_reread_subscription();
+		}
+
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+												 TimestampTzPlusMilliseconds(ts, MySubscription->minapplydelay));
+
+		/*
+		 * Exit without arming the latch if it's already past time to apply
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		elog(DEBUG2, "logical replication apply delay: %ld ms", diffms);
+
+		/*
+		 * Call send_feedback() to prevent the publisher from exiting by
+		 * timeout during the delay, when wal_receiver_status_interval is
+		 * available.
+		 */
+		if (wal_receiver_status_interval > 0
+			&& diffms > wal_receiver_status_interval * 1000)
+		{
+			WaitLatch(MyLatch,
+					  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					  (long) wal_receiver_status_interval * 1000,
+					  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+			send_feedback(last_received, true, false, true);
+		}
+		else
+			WaitLatch(MyLatch,
+					  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					  diffms,
+					  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+	}
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -1007,6 +1109,9 @@ apply_handle_begin(StringInfo s)
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
+	/* Should we delay the current transaction? */
+	maybe_delay_apply(begin_data.committime);
+
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1061,6 +1166,9 @@ apply_handle_begin_prepare(StringInfo s)
 	logicalrep_read_begin_prepare(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
+	/* Should we delay the current prepared transaction? */
+	maybe_delay_apply(begin_data.prepare_time);
+
 	remote_final_lsn = begin_data.prepare_lsn;
 
 	maybe_start_skipping_changes(begin_data.prepare_lsn);
@@ -1308,7 +1416,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
-								   prepare_data.xid, prepare_data.prepare_lsn);
+								   prepare_data.xid, prepare_data.prepare_lsn,
+								   prepare_data.prepare_time);
 
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
@@ -2001,7 +2110,7 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
  */
 void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-					   XLogRecPtr lsn)
+					   XLogRecPtr lsn, TimestampTz ts)
 {
 	StringInfoData s2;
 	int			nchanges;
@@ -2012,6 +2121,24 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
+	/*
+	 * Should we delay the current transaction?
+	 *
+	 * Unlike the regular (non-streamed) cases, the delay is applied in a
+	 * STREAM COMMIT/STREAM PREPARE message for streamed transactions. The
+	 * STREAM START message does not contain a commit/prepare time (it will be
+	 * available when the in-progress transaction finishes). Hence, it's not
+	 * appropriate to apply a delay at that time.
+	 *
+	 * It's not allowed to execute time delayed replication with parallel
+	 * apply feature.
+	 */
+	if (!am_parallel_apply_worker())
+	{
+		Assert(ts > 0);
+		maybe_delay_apply(ts);
+	}
+
 	if (!am_parallel_apply_worker())
 		maybe_start_skipping_changes(lsn);
 
@@ -2171,7 +2298,7 @@ apply_handle_stream_commit(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
-								   commit_data.commit_lsn);
+								   commit_data.commit_lsn, commit_data.committime);
 
 			apply_handle_commit_internal(&commit_data);
 
@@ -3444,7 +3571,7 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
  * Apply main loop.
  */
 static void
-LogicalRepApplyLoop(XLogRecPtr last_received)
+LogicalRepApplyLoop(void)
 {
 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 	bool		ping_sent = false;
@@ -3565,7 +3692,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 						if (last_received < end_lsn)
 							last_received = end_lsn;
 
-						send_feedback(last_received, reply_requested, false);
+						send_feedback(last_received, reply_requested, false, false);
 						UpdateWorkerStats(last_received, timestamp, true);
 					}
 					/* other message types are purposefully ignored */
@@ -3578,7 +3705,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		}
 
 		/* confirm all writes so far */
-		send_feedback(last_received, false, false);
+		send_feedback(last_received, false, false, false);
 
 		if (!in_remote_transaction && !in_streamed_transaction)
 		{
@@ -3675,7 +3802,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 			}
 
-			send_feedback(last_received, requestReply, requestReply);
+			send_feedback(last_received, requestReply, requestReply, false);
 
 			/*
 			 * Force reporting to ensure long idle periods don't lead to
@@ -3705,7 +3832,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
  * to send a response to avoid timeouts.
  */
 static void
-send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
+send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, bool in_delaying_apply)
 {
 	static StringInfo reply_message = NULL;
 	static TimestampTz send_time = 0;
@@ -3735,8 +3862,15 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	/*
 	 * No outstanding transactions to flush, we can report the latest received
 	 * position. This is important for synchronous replication.
+	 *
+	 * During the delay of time delayed replication, do not tell the publisher
+	 * that the received latest LSN is already applied and flushed at this
+	 * stage, since we don't apply the transaction yet. If we do so, it leads
+	 * to a wrong assumption of logical replication progress on the publisher
+	 * side. Here, we just send a feedback message to avoid publisher's
+	 * timeout during the delay.
 	 */
-	if (!have_pending_txes)
+	if (!have_pending_txes && !in_delaying_apply)
 		flushpos = writepos = recvpos;
 
 	if (writepos < last_writepos)
@@ -4362,11 +4496,11 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
  * of system resource error and are not repeatable.
  */
 static void
-start_apply(XLogRecPtr origin_startpos)
+start_apply(void)
 {
 	PG_TRY();
 	{
-		LogicalRepApplyLoop(origin_startpos);
+		LogicalRepApplyLoop();
 	}
 	PG_CATCH();
 	{
@@ -4653,7 +4787,8 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	start_apply(origin_startpos);
+	last_received = origin_startpos;
+	start_apply();
 
 	proc_exit(0);
 }
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 928c330897..422e6ad0fa 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -2431,6 +2431,35 @@ interval_cmp_internal(const Interval *interval1, const Interval *interval2)
 	return int128_compare(span1, span2);
 }
 
+/*
+ * Returns the number of milliseconds in the specified Interval.
+ */
+int64
+interval2ms(const Interval *interval)
+{
+	int64		days;
+	int64		ms;
+	int64		result;
+
+	days = interval->month * INT64CONST(30);
+	days += interval->day;
+
+	/* Detect whether the value of interval can cause an overflow */
+	if (pg_mul_s64_overflow(days, MSECS_PER_DAY, &result))
+		ereport(ERROR,
+				errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				errmsg("bigint out of range"));
+
+	/* Adds portion time (in ms) to the previous result */
+	ms = interval->time / INT64CONST(1000);
+	if (pg_add_s64_overflow(result, ms, &result))
+		ereport(ERROR,
+				errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				errmsg("bigint out of range"));
+
+	return result;
+}
+
 Datum
 interval_eq(PG_FUNCTION_ARGS)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 5e800dc79a..d9b4c8b7f0 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subminapplydelay;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.subminapplydelay\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+	{
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBufferStr(query, " 0 AS subminapplydelay\n");
+	}
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4582,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subminapplydelay = PQfnumber(res, "subminapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4613,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subminapplydelay =
+			strtoi64(PQgetvalue(res, i, i_subminapplydelay), NULL, 10);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4696,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subminapplydelay > 0)
+		appendPQExpBuffer(query, ", min_apply_delay = '" INT64_FORMAT " ms'", subinfo->subminapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..e2525f70ab 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int64		subminapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..8a27063bed 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Origin and min_apply_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subminapplydelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Min apply delay (ms)"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 23750ea5fb..19d2f90dc0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3247,7 +3247,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..078495cbf0 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	XLogRecPtr	subskiplsn;		/* All changes finished at this LSN are
 								 * skipped */
 
+	int64		subminapplydelay;	/* Replication apply delay */
+
 	NameData	subname;		/* Name of the subscription */
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
@@ -120,6 +122,7 @@ typedef struct Subscription
 								 * in */
 	XLogRecPtr	skiplsn;		/* All changes finished at this LSN are
 								 * skipped */
+	int64		minapplydelay;	/* Replication apply delay */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 21a37e21e9..8b368af299 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -127,6 +127,8 @@ struct pg_itm_in
 #define SECS_PER_MINUTE 60
 #define MINS_PER_HOUR	60
 
+#define MSECS_PER_DAY	INT64CONST(86400000)
+
 #define USECS_PER_DAY	INT64CONST(86400000000)
 #define USECS_PER_HOUR	INT64CONST(3600000000)
 #define USECS_PER_MINUTE INT64CONST(60000000)
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8a..4bdb6b7de3 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -255,7 +255,7 @@ extern void stream_stop_internal(TransactionId xid);
 
 /* Common streaming function to apply all the spooled messages */
 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-								   XLogRecPtr lsn);
+								   XLogRecPtr lsn, TimestampTz ts);
 
 extern void apply_dispatch(StringInfo s);
 
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 42f802bb9d..534051fe13 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -102,6 +102,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 									   TimestampTz stop_time,
 									   int msec);
 
+extern int64 interval2ms(const Interval *interval);
+
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1ed6f4c39c..ff52daed3e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                     List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                     List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -135,10 +135,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -155,10 +155,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -167,10 +167,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -202,10 +202,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                           List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |                    0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -239,19 +239,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -263,27 +263,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -298,10 +298,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -316,10 +316,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -355,10 +355,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -367,10 +367,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -380,10 +380,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -396,20 +396,61 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    |                    0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+ERROR:  invalid input syntax for type interval: "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+ERROR:  -1 ms is outside the valid range for parameter "min_apply_delay"
+-- fail - utilizing streaming = parallel with time delayed replication is not supported.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = '1 day');
+ERROR:  min_apply_delay > 0 and streaming = parallel are mutually exclusive options
+-- success -- value without unit is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |                  123 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- success -- interval is converted into ms and stored as integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '4h 27min 35s');
+\dRs+
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |             16055000 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- fail - alter subscription with streaming = parallel failed when time delayed replication is set.
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+ERROR:  cannot set streaming = parallel for subscription with min_apply_delay > 0
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - alter subscription with min_apply_delay failed when streaming = parallel is set.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123);
+ERROR:  cannot set min_apply_delay > 0 for subscription with streaming = parallel
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7991abfe8f..80f4966797 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -279,6 +279,31 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+-- fail - utilizing streaming = parallel with time delayed replication is not supported.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = '1 day');
+
+-- success -- value without unit is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+\dRs+
+
+-- success -- interval is converted into ms and stored as integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '4h 27min 35s');
+\dRs+
+
+-- fail - alter subscription with streaming = parallel failed when time delayed replication is set.
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
+-- fail - alter subscription with min_apply_delay failed when streaming = parallel is set.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..a186876eb4 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_apply_delay.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl
new file mode 100644
index 0000000000..8f8ce23f1b
--- /dev/null
+++ b/src/test/subscription/t/032_apply_delay.pl
@@ -0,0 +1,151 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test replication apply delay
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"log_min_messages = debug2");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher.
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar, c timestamptz DEFAULT now())");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber.
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# column c must not be published because we want to compare the time difference.
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab (a, b)");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, min_apply_delay = '3s')"
+);
+
+# Wait for initial table sync to finish.
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Check log starting now for logical replication apply delay.
+my $log_location = -s $node_subscriber->logfile;
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(2|1|2), 'check initial data was copied to subscriber');
+
+# New row to trigger apply delay.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (3, 'baz')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (4, 'abc')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (5, 'def')");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(5|1|5), 'check if the new rows were applied to subscriber');
+
+check_apply_delay_log("logical replication apply delay");
+
+check_apply_delay_time('5', '3');
+
+# Test streamed transaction.
+# Insert, update and delete enough rows to exceed 64kB limit.
+$node_publisher->safe_psql(
+	'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a, 2) = 0;
+DELETE FROM test_tab WHERE mod(a, 3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(3334|1|5000), 'check if the new rows were applied to subscriber');
+
+check_apply_delay_log("logical replication apply delay");
+
+check_apply_delay_time('5000', '3');
+
+# Test ALTER SUBSCRIPTION. Delay 86460 seconds (1 day 1 minute).
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET (min_apply_delay = 86460000)"
+);
+
+# New row to trigger apply delay.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (0, 'foobar')");
+
+# Disable subscription. worker should die immediately.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub DISABLE;"
+);
+
+# Wait until worker dies.
+my $sub_query =
+  "SELECT count(1) = 0 FROM pg_stat_subscription WHERE subname = 'tap_sub' AND pid IS NOT NULL;";
+$node_subscriber->poll_query_until('postgres', $sub_query)
+  or die "Timed out while waiting for subscriber to die";
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
+
+sub check_apply_delay_log
+{
+	my $message          = shift;
+	my $old_log_location = $log_location;
+
+	$log_location = $node_subscriber->wait_for_log(qr/$message/, $log_location);
+
+	cmp_ok($log_location, '>', $old_log_location,
+		"logfile contains triggered logical replication apply delay"
+	);
+}
+
+sub check_apply_delay_time
+{
+	my ($primary_key, $expected_diffs) = @_;
+
+	my $inserted_time_on_pub = $node_publisher->safe_psql('postgres', qq[
+		SELECT extract(epoch from c) FROM test_tab WHERE a = $primary_key;
+	]);
+
+	my $inserted_time_on_sub = $node_subscriber->safe_psql('postgres', qq[
+		SELECT extract(epoch from c) FROM test_tab WHERE a = $primary_key;
+	]);
+
+	cmp_ok($inserted_time_on_sub - $inserted_time_on_pub, '>', $expected_diffs,
+		"The tuple on the subscriber was modified later than the publisher");
+}
-- 
2.27.0

