From e96b27cedec641255dcc395739b490da7f288910 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v4 1/2] Skip printing empty stream in test decoding

---
 contrib/test_decoding/expected/stream.out |  5 +--
 contrib/test_decoding/test_decoding.c     | 55 +++++++++++++++++++++----------
 2 files changed, 38 insertions(+), 22 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index d7e32f8..e1c3bc8 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,10 +29,7 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
- opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
- closing a streamed block for transaction
- aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
-(27 rows)
+(24 rows)
 
 -- streaming test for toast changes
 ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 3474515..e60ab34 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  Size sz, const char *message);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+								   TestDecodingData *data,
+								   ReorderBufferTXN *txn,
+								   bool last_write);
 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
-	OutputPluginPrepareWrite(ctx, true);
+	data->xact_wrote_changes = false;
+	if (data->skip_empty_xacts)
+		return;
+	pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+	OutputPluginPrepareWrite(ctx, last_write);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
 	else
 		appendStringInfo(ctx->out, "opening a streamed block for transaction");
-	OutputPluginWrite(ctx, true);
+	OutputPluginWrite(ctx, last_write);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 
 	if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	/* output stream start if we haven't yet */
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+	data->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+	data->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
-- 
1.8.3.1

