From d1c9dcb8ecc9e3138a995b7ada468e0a6d07ceba Mon Sep 17 00:00:00 2001
From: Shubham Khanna <shubham.khanna@fujitsu.com>
Date: Wed, 8 May 2024 10:53:52 +0530
Subject: [PATCH v1] Support capturing generated column data using pgoutput and
 test_decoding plugin.

Now if include_generated_columns option is specified, the generated
column information
and generated column data also will be sent.
Usage from pgoutput plugin:
SELECT * FROM pg_logical_slot_peek_binary_changes('slot1', NULL, NULL,
'proto_version', '1', 'publication_names', 'pub1',
'include_generated_columns', 'true');

Usage from test_decoding plugin:
SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1',
'include_generated_columns', '1');
---
 contrib/test_decoding/expected/ddl.out      | 14 +++++
 contrib/test_decoding/sql/ddl.sql           |  6 +++
 contrib/test_decoding/test_decoding.c       | 25 +++++++--
 src/backend/replication/logical/proto.c     | 60 ++++++++++++++-------
 src/backend/replication/pgoutput/pgoutput.c | 39 ++++++++++----
 src/include/replication/logicalproto.h      | 13 +++--
 src/include/replication/pgoutput.h          |  1 +
 7 files changed, 121 insertions(+), 37 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index bcd1f74b2b..07bac1f677 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -843,6 +843,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 data
 (0 rows)
 \pset format aligned
+-- check include_generated_columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ table public.gencoltable: INSERT: a[integer]:1 b[integer]:2
+ table public.gencoltable: INSERT: a[integer]:2 b[integer]:4
+ table public.gencoltable: INSERT: a[integer]:3 b[integer]:6
+ COMMIT
+(5 rows)
+
+DROP TABLE gencoltable;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 2f8e4e7f2c..d688775580 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -437,6 +437,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 \pset format aligned
 
+-- check include_generated_columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1');
+DROP TABLE gencoltable;
+
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 7c50d13969..f15ff93ac3 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -31,6 +31,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		include_generated_columns;
 } TestDecodingData;
 
 /*
@@ -259,6 +260,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include_generated_columns") == 0)
+		{
+			if (elem->arg == NULL)
+				continue;
+			else if (!parse_bool(strVal(elem->arg), &data->include_generated_columns))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -521,7 +532,8 @@ print_literal(StringInfo s, Oid typid, char *outputstr)
 
 /* print the tuple 'tuple' into the StringInfo s */
 static void
-tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple,
+					bool skip_nulls, bool include_generated_columns)
 {
 	int			natt;
 
@@ -544,6 +556,9 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 		if (attr->attisdropped)
 			continue;
 
+		if (attr->attgenerated && !include_generated_columns)
+			continue;
+
 		/*
 		 * Don't print system columns, oid will already have been printed if
 		 * present.
@@ -641,7 +656,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			appendStringInfoString(ctx->out, " UPDATE:");
@@ -650,7 +665,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				appendStringInfoString(ctx->out, " old-key:");
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns );
 				appendStringInfoString(ctx->out, " new-tuple:");
 			}
 
@@ -659,7 +674,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			appendStringInfoString(ctx->out, " DELETE:");
@@ -671,7 +686,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns);
 			break;
 		default:
 			Assert(false);
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..69a44a7122 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -30,10 +30,12 @@
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel,
-								   Bitmapset *columns);
+								   Bitmapset *columns,
+								   bool publish_generated_column);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary, Bitmapset *columns,
+								   bool publish_generated_column);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary, Bitmapset *columns,
+						bool publish_generated_column)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *columns,
+						bool publish_generated_column)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,13 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+							   publish_generated_column);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -532,7 +539,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 void
 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, bool binary,
-						Bitmapset *columns)
+						Bitmapset *columns, bool publish_generated_column)
 {
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -552,7 +559,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -668,7 +676,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  */
 void
 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
-					 Bitmapset *columns)
+					 Bitmapset *columns, bool publish_generated_column)
 {
 	char	   *relname;
 
@@ -690,7 +698,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel, columns);
+	logicalrep_write_attrs(out, rel, columns, publish_generated_column);
 }
 
 /*
@@ -767,7 +775,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary, Bitmapset *columns,
+					   bool publish_generated_column)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -781,10 +790,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		if (!column_in_column_list(att->attnum, columns) && !att->attgenerated)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		nliveatts++;
@@ -802,10 +814,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		Form_pg_type typclass;
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		if (!column_in_column_list(att->attnum, columns) && !att->attgenerated)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		if (isnull[i])
@@ -923,7 +938,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
+					   bool publish_generated_column)
 {
 	TupleDesc	desc;
 	int			i;
@@ -938,7 +954,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
@@ -959,7 +978,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 		uint8		flags = 0;
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d2b35cfb96..44d629a624 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -86,7 +86,8 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
-									Bitmapset *columns);
+									Bitmapset *columns,
+									bool publish_generated_column);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		generate_column_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->publish_generated_column = false;
 
 	foreach(lc, options)
 	{
@@ -396,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "include_generated_columns") == 0)
+		{
+			if (generate_column_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			generate_column_option_given = true;
+
+			data->publish_generated_column = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -731,11 +744,13 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	{
 		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
 
-		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns,
+								data->publish_generated_column);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns,
+							data->publish_generated_column);
 
 	if (data->in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -749,7 +764,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
 						LogicalDecodingContext *ctx,
-						Bitmapset *columns)
+						Bitmapset *columns, bool publish_generated_column)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -766,7 +781,10 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		if (att->atttypid < FirstGenbkiObjectId)
@@ -782,7 +800,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation, columns);
+	logicalrep_write_rel(ctx->out, xid, relation, columns, publish_generated_column);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -1531,15 +1549,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
-									new_slot, data->binary, relentry->columns);
+									new_slot, data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		default:
 			Assert(false);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..2676acefce 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -225,18 +225,22 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
 									Relation rel, TupleTableSlot *oldslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
 											  LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
@@ -247,7 +251,8 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel, Bitmapset *columns);
+								 Relation rel, Bitmapset *columns,
+								 bool publish_generated_column);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..c4773f60a3 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		publish_generated_column;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
-- 
2.34.1

