From 082c3b00bc8770e24ba6069ea6b935c9e9806c2b Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 22 Dec 2022 09:14:02 +0530
Subject: [PATCH v5 1/2] Add 'logical_decoding_mode' GUC.

This enables streaming or serializing changes immediately in logical
decoding. This parameter is intended to be used to test logical decoding
and replication of large transactions for which otherwise we need to
generate the changes till logical_decoding_work_mem is reached.

This can help in reducing the timing of existing tests related to logical
replication of in-progress transaction and will help in writing tests for
for the upcoming feature for parallelely applying in-progress
transactions.

Author: Shi yu
Reviewed-by: Sawada Masahiko, Shveta Mallik, Amit Kapila, Dilip Kumar, Peter Smith
Discussion: https://postgr.es/m/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com
---
 contrib/test_decoding/expected/stream.out     | 13 ++++++++
 contrib/test_decoding/sql/stream.sql          |  8 +++++
 doc/src/sgml/config.sgml                      | 28 ++++++++++++++++
 .../replication/logical/reorderbuffer.c       | 33 +++++++++++++------
 src/backend/utils/misc/guc_tables.c           | 17 ++++++++++
 src/include/replication/reorderbuffer.h       |  8 +++++
 6 files changed, 97 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0..b7af8147a3 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -106,6 +106,19 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  committing streamed transaction
 (17 rows)
 
+-- streaming test with logical_decoding_mode=immediate
+SET logical_decoding_mode=immediate;
+TRUNCATE table stream_test;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                   data                   
+------------------------------------------
+ opening a streamed block for transaction
+ streaming truncate for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(4 rows)
+
+RESET logical_decoding_mode;
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 4feec62972..b184c13e6b 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -44,5 +44,13 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
+-- streaming test with logical_decoding_mode=immediate
+SET logical_decoding_mode=immediate;
+
+TRUNCATE table stream_test;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+RESET logical_decoding_mode;
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 9eedab652d..3071c8eace 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11597,6 +11597,34 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-mode" xreflabel="logical_decoding_mode">
+      <term><varname>logical_decoding_mode</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>logical_decoding_mode</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Allows streaming or serializing changes immediately in logical decoding.
+        The allowed values of <varname>logical_decoding_mode</varname> are
+        <literal>buffered</literal> and <literal>immediate</literal>. When set
+        to <literal>immediate</literal>, stream each change if
+        <literal>streaming</literal> option (see optional parameters set by
+        <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>)
+        is enabled, otherwise, serialize each change.  When set to
+        <literal>buffered</literal>, which is the default, decoding will stream
+        or serialize changes when <varname>logical_decoding_work_mem</varname>
+        is reached.
+       </para>
+       <para>
+        This parameter is intended to be used to test logical decoding and
+        replication of large transactions for which otherwise we need to
+        generate the changes till <varname>logical_decoding_work_mem</varname>
+        is reached.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
   </sect1>
   <sect1 id="runtime-config-short">
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b567b8b59e..547b17f838 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
 int			logical_decoding_work_mem;
 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 
+/* GUC variable */
+int			logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
- * disk until we reach under the memory limit.
+ * disk or send to the output plugin until we reach under the memory limit.
+ *
+ * If logical_decoding_mode is set to "immediate", stream or serialize the changes
+ * immediately.
  *
  * XXX At this point we select the transactions until we reach under the memory
  * limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
 
-	/* bail out if we haven't exceeded the memory limit */
-	if (rb->size < logical_decoding_work_mem * 1024L)
+	/*
+	 * Bail out if logical_decoding_mode is disabled and we haven't exceeded
+	 * the memory limit.
+	 */
+	if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED &&
+		rb->size < logical_decoding_work_mem * 1024L)
 		return;
 
 	/*
-	 * Loop until we reach under the memory limit.  One might think that just
-	 * by evicting the largest (sub)transaction we will come under the memory
-	 * limit based on assumption that the selected transaction is at least as
-	 * large as the most recent change (which caused us to go over the memory
-	 * limit). However, that is not true because a user can reduce the
-	 * logical_decoding_work_mem to a smaller value before the most recent
+	 * If logical_decoding_mode is immediate, loop until there's no change.
+	 * Otherwise, loop until we reach under the memory limit. One might think
+	 * that just by evicting the largest (sub)transaction we will come under
+	 * the memory limit based on assumption that the selected transaction is
+	 * at least as large as the most recent change (which caused us to go over
+	 * the memory limit). However, that is not true because a user can reduce
+	 * the logical_decoding_work_mem to a smaller value before the most recent
 	 * change.
 	 */
-	while (rb->size >= logical_decoding_work_mem * 1024L)
+	while (rb->size >= logical_decoding_work_mem * 1024L ||
+		   (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE &&
+			rb->size > 0))
 	{
 		/*
 		 * Pick the largest transaction (or subtransaction) and evict it from
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 436afe1d21..a37c9f9844 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
 	{NULL, 0, false}
 };
 
+static const struct config_enum_entry logical_decoding_mode_options[] = {
+	{"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
+	{"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
+	{NULL, 0, false}
+};
+
 StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
 				 "array length mismatch");
 
@@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("Allows streaming or serializing each change in logical decoding."),
+			NULL,
+			GUC_NOT_IN_SAMPLE
+		},
+		&logical_decoding_mode,
+		LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index c700b55b1c..3dfab1060e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -18,6 +18,14 @@
 #include "utils/timestamp.h"
 
 extern PGDLLIMPORT int logical_decoding_work_mem;
+extern PGDLLIMPORT int logical_decoding_mode;
+
+/* possible values for logical_decoding_mode */
+typedef enum
+{
+	LOGICAL_DECODING_MODE_BUFFERED,
+	LOGICAL_DECODING_MODE_IMMEDIATE
+}			LogicalDecodingMode;
 
 /* an individual tuple, stored in one chunk of memory */
 typedef struct ReorderBufferTupleBuf
-- 
2.31.1

