From 6aa798c884ed8fdc56cdf3b6d1f0fe5b70a9ce33 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 7 Feb 2023 05:38:20 +0000
Subject: [PATCH v4 2/2] Extend START_REPLICATION command to accept walsender
 options

This commit extends START_REPLICATION to accept options of walsender. Currently,
only one option exit_before_confirming is accepted.

For physical replication, the grammer of START_REPLICATION is extended to accept
options. Note that in the normal phyical replication the added option is never
used.

For logical replication, the option list for logical decoding plugin is reused for
storing walsender options. When the min_apply_delay parameter is set for a
subscription, the apply worker related with it will send START_REPLICATION query
with exit_before_confirming = true to publisher node.

This option allows primay servers to shut down even if there are pending WALs to
be sent or sent WALs are not flushed on the secondary. This may be useful to
shut down the primary even when the walreceiver/worker is stuck.

Author: Hayato Kuroda
Discussion: https://postgr.es/m/TYAPR01MB586668E50FC2447AD7F92491F5E89%40TYAPR01MB5866.jpnprd01.prod.outlook.com
---
 doc/src/sgml/protocol.sgml                    | 18 ++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  7 ++
 src/backend/replication/logical/worker.c      | 13 ++-
 src/backend/replication/repl_gram.y           | 12 ++-
 src/backend/replication/repl_scanner.l        |  1 +
 src/backend/replication/walreceiver.c         |  1 +
 src/backend/replication/walsender.c           | 80 ++++++++++++++++++-
 src/include/nodes/replnodes.h                 |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/001_rep_changes.pl    | 10 ++-
 src/tools/pgindent/typedefs.list              |  2 +
 11 files changed, 138 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 93fc7167d4..2622b084ed 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2500,7 +2500,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
     </varlistentry>
 
     <varlistentry id="protocol-replication-start-replication-slot-logical">
-     <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</replaceable> [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term>
+     <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>SHUTDOWN_MODE</literal> <replaceable class="parameter">shutdown_mode</replaceable> ] [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term>
      <listitem>
       <para>
        Instructs server to start streaming WAL for logical replication,
@@ -2555,6 +2555,22 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>SHUTDOWN_MODE { 'wait_flush' | 'immediate' }</literal></term>
+        <listitem>
+         <para>
+          Decides the condition for exiting the walsender process.
+          <literal>'wait_flush'</literal>, which is the default, the walsender
+          will wait for all the sent WALs to be flushed on the subscriber side,
+          before exiting the process. <literal>'immediate'</literal> will exit
+          without confirming the remote flush. This may break the consistency
+          between publisher and subscriber, but it may be useful for a system
+          that has a high-latency network to reduce the amount of time for
+          shutdown.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><replaceable class="parameter">option_name</replaceable></term>
         <listitem>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..18f6e09cfd 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -403,6 +403,12 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		List	   *pubnames;
 		char	   *pubnames_literal;
 
+		/* Add SHUTDOWN_MODE option if needed */
+		if (options->shutdown_mode &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfo(&cmd, " SHUTDOWN_MODE '%s'",
+							 options->shutdown_mode);
+
 		appendStringInfoString(&cmd, " (");
 
 		appendStringInfo(&cmd, "proto_version '%u'",
@@ -449,6 +455,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, " TIMELINE %u",
 						 options->proto.physical.startpointTLI);
 
+
 	/* Start streaming. */
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c574531040..feffccfd47 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4034,7 +4034,9 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		/* minapplydelay affects SHUTDOWN_MODE option */
+		(newsub->minapplydelay == 0) != (MySubscription->minapplydelay == 0))
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4718,6 +4720,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.logical = true;
 	options.startpoint = origin_startpos;
 	options.slotname = myslotname;
+	options.shutdown_mode = NULL;
 
 	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 	options.proto.logical.proto_version =
@@ -4756,6 +4759,14 @@ ApplyWorkerMain(Datum main_arg)
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * time-delayed logical replication does not support tablesync
+		 * workers, so only the leader apply worker can request walsenders to
+		 * exit before confirming remote flush.
+		 */
+		if (server_version >= 160000 && MySubscription->minapplydelay > 0)
+			options.shutdown_mode = pstrdup("immediate");
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..54450a041a 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,6 +76,7 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_SHUTDOWN_MODE
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <str>	opt_shutdown_mode
 
 %%
 
@@ -270,20 +272,22 @@ start_replication:
 					cmd->slotname = $2;
 					cmd->startpoint = $4;
 					cmd->timeline = $5;
+					cmd->shutdownmode = NULL;
 					$$ = (Node *) cmd;
 				}
 			;
 
 /* START_REPLICATION SLOT slot LOGICAL %X/%X options */
 start_logical_replication:
-			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
+			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR opt_shutdown_mode plugin_options
 				{
 					StartReplicationCmd *cmd;
 					cmd = makeNode(StartReplicationCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $3;
 					cmd->startpoint = $5;
-					cmd->options = $6;
+					cmd->shutdownmode = $6;
+					cmd->options = $7;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -336,6 +340,10 @@ opt_timeline:
 				| /* EMPTY */			{ $$ = 0; }
 			;
 
+opt_shutdown_mode:
+			K_SHUTDOWN_MODE SCONST			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
 
 plugin_options:
 			'(' plugin_opt_list ')'			{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index cb467ca46f..fcc6f6feda 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -136,6 +136,7 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+SHUTDOWN_MODE		{ return K_SHUTDOWN_MODE; }
 
 {space}+		{ /* do nothing */ }
 
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f6446da2d6..cfce9d93ef 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -409,6 +409,7 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
+		options.shutdown_mode = NULL;
 		options.proto.physical.startpointTLI = startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4ed3747e3f..65d08bdc95 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -219,6 +219,25 @@ typedef struct
 
 static LagTracker *lag_tracker;
 
+/* Indicator for specifying the shutdown mode */
+typedef enum
+{
+	WALSND_SHUTDOWN_MODE_WAIT_FLUSH = 0,
+	WALSND_SHUTDOWN_MODE_IMMIDEATE
+} WalSndShutdownMode;
+
+/*
+ * Options for controlling the behavior of the walsender. Options can be
+ * specified in the START_STREAMING replication command. Currently only one
+ * option is allowed.
+ */
+typedef struct
+{
+	WalSndShutdownMode shutdown_mode;
+} WalSndOptions;
+
+static WalSndOptions *my_options = NULL;
+
 /* Signal handlers */
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
@@ -260,6 +279,8 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
 
+static void CheckWalSndOptions(const StartReplicationCmd *cmd);
+static void ParseShutdownMode(char *shutdownmode);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -1272,6 +1293,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		got_STOPPING = true;
 	}
 
+	/* Initialize an option holder */
+	my_options = (WalSndOptions *) palloc0(sizeof(WalSndOptions));
+
+	/* Check given options and set value to the holder */
+	CheckWalSndOptions(cmd);
+
 	/*
 	 * Create our decoding context, making it start at the previously ack'ed
 	 * position.
@@ -1450,6 +1477,16 @@ ProcessPendingWrites(void)
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
+
+		/*
+		 * In this function, there is a possibility that the walsender is
+		 * stuck. It is caused when the opposite worker is stuck and then the
+		 * send-buffer of the walsender becomes full. Therefore, we must add
+		 * an additional path for shutdown for immediate shutdown mode.
+		 */
+		if (my_options->shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE &&
+			got_STOPPING)
+			WalSndDone(XLogSendLogical);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -3114,19 +3151,26 @@ WalSndDone(WalSndSendDataCallback send_data)
 	 * To figure out whether all WAL has successfully been replicated, check
 	 * flush location if valid, write otherwise. Tools like pg_receivewal will
 	 * usually (unless in synchronous mode) return an invalid flush location.
+	 *
+	 * If we are in the immediate shutdown mode, flush location and output
+	 * buffer is not checked. This may break the consistency between nodes,
+	 * but it may be useful for the system that has high-latency network to
+	 * reduce the amount of time for shutdown.
 	 */
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
-	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
-		!pq_is_send_pending())
+	if (WalSndCaughtUp &&
+		((my_options &&
+		  my_options->shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE) ||
+		 (sentPtr == replicatedPtr && !pq_is_send_pending())))
 	{
 		QueryCompletion qc;
 
 		/* Inform the standby that XLOG streaming is done */
 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
 		EndCommand(&qc, DestRemote, false);
-		pq_flush();
+		pq_flush_if_writable();
 
 		proc_exit(0);
 	}
@@ -3849,3 +3893,33 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * Check options for walsender itself and set a value to an option holder.
+ *
+ * Currently only one option is accepted.
+ */
+static void
+CheckWalSndOptions(const StartReplicationCmd *cmd)
+{
+	if (cmd->shutdownmode)
+		ParseShutdownMode(cmd->shutdownmode);
+}
+
+/*
+ * Parse given shutdown mode.
+ *
+ * Currently two values are accepted - "wait_flush" and "immediate"
+ */
+static void
+ParseShutdownMode(char *shutdownmode)
+{
+	if (pg_strcasecmp(shutdownmode, "wait_flush") == 0)
+		my_options->shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
+	else if (pg_strcasecmp(shutdownmode, "immediate") == 0)
+		my_options->shutdown_mode = WALSND_SHUTDOWN_MODE_IMMIDEATE;
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("SHUTDOWN_MODE requires \"wait_flush\" or \"immediate\""));
+}
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..c96e85e859 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -83,6 +83,7 @@ typedef struct StartReplicationCmd
 	char	   *slotname;
 	TimeLineID	timeline;
 	XLogRecPtr	startpoint;
+	char	   *shutdownmode;
 	List	   *options;
 } StartReplicationCmd;
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..ef6297da52 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -170,6 +170,7 @@ typedef struct
 								 * false if physical stream.  */
 	char	   *slotname;		/* Name of the replication slot or NULL. */
 	XLogRecPtr	startpoint;		/* LSN of starting point. */
+	char	   *shutdown_mode;	/* Name of specified shutdown name */
 
 	union
 	{
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index f94819672b..d7a6fd0e38 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -523,9 +523,17 @@ $node_publisher->poll_query_until('postgres',
 # changes are replicated on subscriber.
 my $delay = 3;
 
-# Set min_apply_delay parameter to 3 seconds
+# check restart on changing min_apply_delay to 3 seconds
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+);
 $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')");
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing min_apply_delay to non-zero value";
 
 # Make new content on publisher and check its presence in subscriber depending
 # on the delay applied above. Before doing the insertion, get the
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d3224dfc36..837cbf8d9d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2969,7 +2969,9 @@ WalReceiverConn
 WalReceiverFunctionsType
 WalSnd
 WalSndCtlData
+WalSndOptions
 WalSndSendDataCallback
+WalSndShutdownMode
 WalSndState
 WalTimeSample
 WalUsage
-- 
2.27.0

