From b433eb1312b86bd15ce5a0727db7d571c20b3b4f Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v1] Skip printing empty stream in test decoding

---
 contrib/test_decoding/test_decoding.c | 34 +++++++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9..39e29e13d6 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  Size sz, const char *message);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+								   TestDecodingData *data,
+								   ReorderBufferTXN *txn,
+								   bool last_write);
 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -593,6 +597,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	data->xact_wrote_changes = false;
+	if (data->skip_empty_xacts)
+		return;
+	pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
@@ -611,6 +624,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -630,6 +646,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -649,6 +668,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 
 	if (data->include_xids)
@@ -676,6 +698,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	/* output stream start if we haven't yet */
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+	data->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +751,11 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
-- 
2.23.0

