From 19048129914418b7f26856b930597e81ed5e0ddb Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 6 Aug 2021 08:50:03 -0400
Subject: [PATCH v12] Skip empty transactions for logical replication.

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit these empty transactions.

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN / COMMIT messages for transactions that are empty.

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 100 +++++++++++++++++++++++++++-
 src/test/subscription/t/020_messages.pl     |   5 +-
 2 files changed, 101 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 286119c..13f0278 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -132,6 +132,17 @@ typedef struct RelationSyncEntry
 	TupleConversionMap *map;
 } RelationSyncEntry;
 
+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This makes it possible to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+	bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -396,15 +407,40 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 }
 
 /*
- * BEGIN callback
+ * BEGIN callback.
+ *
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
  */
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														sizeof(PGOutputTxnData));
+
+	txndata->sent_begin_txn = false;
+	txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
 
+	Assert(txndata);
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
+	txndata->sent_begin_txn = true;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -419,8 +455,26 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	bool            skip;
+
+	Assert(txndata);
+
+	/*
+	 * If a BEGIN message was not yet sent, then it means there were no relevant
+	 * changes encountered, so we can skip the COMMIT message too.
+	 */
+	skip = !txndata->sent_begin_txn;
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
 	OutputPluginUpdateProgress(ctx);
 
+	if (skip)
+	{
+		elog(DEBUG1, "skipping replication of an empty transaction");
+		return;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -433,6 +487,8 @@ static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														 sizeof(PGOutputTxnData));
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin_prepare(ctx->out, txn);
@@ -441,6 +497,8 @@ pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 					 send_replication_origin);
 
 	OutputPluginWrite(ctx, true);
+	txndata->sent_begin_txn = true;
+	txn->output_plugin_private = txndata;
 }
 
 /*
@@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				Relation relation, ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	TransactionId xid = InvalidTransactionId;
 	Relation	ancestor = NULL;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	Assert(in_streaming || txndata);
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -668,6 +730,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * Output BEGIN if we haven't yet, unless streaming.
+	 */
+	if (!in_streaming && !txndata->sent_begin_txn)
+	{
+		pgoutput_begin(ctx, txn);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -770,6 +840,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	int			i;
@@ -777,6 +848,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	Assert(in_streaming || txndata);
+
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -813,6 +887,15 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/*
+		 * output BEGIN if we haven't yet,
+		 * while streaming no need to send BEGIN / BEGIN PREPARE.
+		 */
+		if (!in_streaming && !txndata->sent_begin_txn)
+		{
+			pgoutput_begin(ctx, txn);
+		}
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -845,6 +928,21 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/*
+	 * Output BEGIN if we haven't yet.
+	 * Avoid for streaming and non-transactional messages
+	 */
+	if (!in_streaming && transactional)
+	{
+		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		Assert(txndata);
+		if (!txndata->sent_begin_txn)
+		{
+			pgoutput_begin(ctx, txn);
+		}
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index ecf9b19..7a586b0 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql(
 			'publication_names', 'tap_pub')
 ));
 
-# 66 67 == B C == BEGIN COMMIT
-is( $result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
 	'option messages defaults to false so message (M) is not available on slot'
 );
 
-- 
1.8.3.1

