From 8f48798bd6c6dbf839a54b6f0ccf3b8521e0f429 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Wed, 18 Jan 2023 13:45:32 +0800
Subject: [PATCH v4] Fix the logical replication timeout during processing of
 DDL.

The problem is when there is a DDL in a transaction that generates lots of
temporary data due to rewrite rules, these temporary data will not be processed
by the pgoutput plugin. The previous commit (f95d53e) only fixed timeouts
caused by filtering out changes in pgoutput. Therefore, the previous fix for
DML had no impact on this case.

To fix this, we introduced a new ReorderBuffer callback -
'ReorderBufferUpdateProgressTxnCB'. This callback is called to try to update
the process whenever more than CHANGES_THRESHOLD changes are encountered while
sending data of a transaction (and its subtransactions) to the output plugin.
---
 src/backend/replication/logical/logical.c     | 61 +++++++++++++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/pgoutput/pgoutput.c   | 54 ++--------------
 src/include/replication/reorderbuffer.h       | 12 ++++
 src/tools/pgindent/typedefs.list              |  1 +
 5 files changed, 103 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd7649..da96d7d042 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
+/* update progress txn callback */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+										   ReorderBufferTXN *txn,
+										   ReorderBufferChange *change);
+
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
 /*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
+	/*
+	 * Callback to support updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -1584,6 +1595,56 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+/*
+ * Update progress callback while processing a transaction.
+ *
+ * Try to update progress and send a keepalive message during sending data of a
+ * transaction (and its subtransactions) to the output plugin.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are either not published or
+ * got filtered out.
+ */
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "update_progress_txn";
+	state.report_location = change->lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * Report this change's lsn so replies from clients can give an up-to-date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = change->lsn;
+
+	ctx->end_xact = false;
+
+	OutputPluginUpdateProgress(ctx, false);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 54ee824e6c..9083b5a380 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2105,6 +2105,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	PG_TRY();
 	{
+		/*
+		 * Static variable used to accumulate the number of changes while
+		 * processing txn.
+		 */
+		static int	changes_count = 0;
+
+		/*
+		 * Sending keepalive messages after every change has some overhead, but
+		 * testing showed there is no noticeable overhead if keepalive is only
+		 * sent after every ~100 changes.
+		 */
+#define CHANGES_THRESHOLD 100
+
 		ReorderBufferChange *change;
 
 		if (using_subtxn)
@@ -2446,6 +2459,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
+
+			/*
+			 * Try to send a keepalive message after every CHANGES_THRESHOLD
+			 * changes.
+			 */
+			if (++changes_count >= CHANGES_THRESHOLD)
+			{
+				rb->update_progress_txn(rb, txn, change);
+				changes_count = 0;
+			}
 		}
 
 		/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1a80d67bb9..0ba6b6b39c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
-										bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	update_replication_progress(ctx, !sent_begin_txn);
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -625,7 +623,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	update_replication_progress(ctx, false);
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	if (!data->messages)
 		return;
 
@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
-	static int	changes_count = 0;
-
-	/*
-	 * We don't want to try sending a keepalive message after processing each
-	 * change as that can have overhead. Tests revealed that there is no
-	 * noticeable overhead in doing it after continuously processing 100 or so
-	 * changes.
-	 */
-#define CHANGES_THRESHOLD 100
-
-	/*
-	 * If we are at the end of transaction LSN, update progress tracking.
-	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
-	 * try to send a keepalive message if required.
-	 */
-	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-	{
-		OutputPluginUpdateProgress(ctx, skipped_xact);
-		changes_count = 0;
-	}
-}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f6c4dd75db..3641f03326 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -525,6 +525,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
+/* update progress txn callback signature */
+typedef void (*ReorderBufferUpdateProgressTxnCB) (
+												  ReorderBuffer *rb,
+												  ReorderBufferTXN *txn,
+												  ReorderBufferChange *change);
+
 struct ReorderBuffer
 {
 	/*
@@ -588,6 +594,12 @@ struct ReorderBuffer
 	ReorderBufferStreamMessageCB stream_message;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
+	/*
+	 * Callback to be called when updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ReorderBufferUpdateProgressTxnCB update_progress_txn;
+
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 09316039e4..92f6e7c5ad 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2309,6 +2309,7 @@ ReorderBufferToastEnt
 ReorderBufferTupleBuf
 ReorderBufferTupleCidEnt
 ReorderBufferTupleCidKey
+ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
 ReparameterizeForeignPathByChild_function
-- 
2.23.0.windows.1

