From 7ff9027886d933b1aacf3fdb4aa2227ea3e6d406 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Sat, 15 Jun 2024 15:46:18 +0530
Subject: [PATCH v7 2/2] Support replication of generated column during initial
 sync

During initial sync the data is replicated from publisher to subscriber
using COPY command. But normally COPY of generated column is not
supported. So instead, we can use the syntax
'COPY (SELECT column_name FROM table_name) TO STDOUT' for COPY.

With this patch, if 'include_generated_columns' and 'copy_data' options
are 'true' during 'CREATE SUBSCRIPTION', then the generated columns data
is replicated from publisher to the subscriber during inital sync.

While making column list for COPY command we donot include a column if it
is a generated column on the subscriber side. And the data corresponding
to that column will not be replicated instead, that column will be filled
as normal with the subscriber-side computed or default data
---
 doc/src/sgml/ref/create_subscription.sgml   |  11 +-
 src/backend/commands/subscriptioncmds.c     |  14 ---
 src/backend/replication/logical/relation.c  |   5 +-
 src/backend/replication/logical/tablesync.c | 107 +++++++++++++++-----
 src/include/replication/logicalrelation.h   |   3 +-
 src/test/regress/expected/subscription.out  |   3 -
 src/test/regress/sql/subscription.sql       |   3 -
 src/test/subscription/t/011_generated.pl    |  58 +++++++++++
 8 files changed, 152 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f072a13d2c..9513e7752b 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -443,11 +443,12 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
 
          <para>
-          This parameter can only be set true if <literal>copy_data</literal> is
-          set to <literal>false</literal>. If the subscriber-side column is also a
-          generated column then this option has no effect; the replicated data will
-          be ignored and the subscriber column will be filled as normal with the
-          subscriber-side computed or default data.
+          If the subscriber-side column is also a generated column then this option
+          has no effect; the replicated data will be ignored and the subscriber
+          column will be filled as normal with the subscriber-side computed or
+          default data. And during table synchronization, the data corresponding to
+          the generated column on subscriber-side will not be sent from the
+          publisher to the subscriber.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3709e1047f..1cefed0fa4 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -459,20 +459,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
-
-	/*
-	 * Do additional checking for disallowed combination when copy_data and
-	 * include_generated_columns are true. COPY of generated columns is not supported
-	 * yet.
-	 */
-	if (opts->copy_data && opts->include_generated_columns)
-	{
-		ereport(ERROR,
-				errcode(ERRCODE_SYNTAX_ERROR),
-		/*- translator: both %s are strings of the form "option = value" */
-					errmsg("%s and %s are mutually exclusive options",
-						"copy_data = true", "include_generated_columns = true"));
-	}
 }
 
 /*
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 5de1531567..92b225fba8 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -205,7 +205,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
  *
  * Returns -1 if not found.
  */
-static int
+int
 logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
 {
 	int			i;
@@ -421,7 +421,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 			int			attnum;
 			Form_pg_attribute attr = TupleDescAttr(desc, i);
 
-			if (attr->attisdropped)
+			if (attr->attisdropped ||
+				(!MySubscription->includegencol && attr->attgenerated))
 			{
 				entry->attrmap->attnums[i] = -1;
 				continue;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b00267f042..bacf0fd2fa 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -118,6 +118,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -692,20 +693,56 @@ process_syncing_tables(XLogRecPtr current_lsn)
 }
 
 /*
- * Create list of columns for COPY based on logical relation mapping.
+ * Create list of columns for COPY based on logical relation mapping. Do not
+ * include generated columns, of the subscription table, in the column list.
  */
 static List *
-make_copy_attnamelist(LogicalRepRelMapEntry *rel)
+make_copy_attnamelist(LogicalRepRelMapEntry *rel, bool *attgenlist)
 {
 	List	   *attnamelist = NIL;
+	List	   *gencollist = NIL;
 	int			i;
+	int			j = 0;
+	TupleDesc	desc;
 
-	for (i = 0; i < rel->remoterel.natts; i++)
+	desc = RelationGetDescr(rel->localrel);
+
+	for (i = 0; i < desc->natts; i++)
 	{
-		attnamelist = lappend(attnamelist,
-							  makeString(rel->remoterel.attnames[i]));
+		int			attnum;
+		Form_pg_attribute attr = TupleDescAttr(desc, i);
+
+		if (!attr->attgenerated)
+			continue;
+
+		attnum = logicalrep_rel_att_by_name(&rel->remoterel,
+											NameStr(attr->attname));
+
+		/*
+		 * Check if subscription table have a generated column with same
+		 * column name as a non-generated column in the corresponding
+		 * publication table.
+		 */
+		if (attnum >=0 && !attgenlist[attnum])
+			ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("logical replication target relation \"%s.%s\" is missing replicated column: \"%s\"",
+				 rel->remoterel.nspname, rel->remoterel.relname, NameStr(attr->attname))));
+
+		if (attnum >= 0)
+			gencollist = lappend_int(gencollist, attnum);
 	}
 
+	for (i = 0; i < rel->remoterel.natts; i++)
+	{
+
+		if (gencollist != NIL && j < gencollist->length &&
+			list_nth_int(gencollist, j) == i)
+			j++;
+		else
+			attnamelist = lappend(attnamelist,
+								  makeString(rel->remoterel.attnames[i]));
+	}
 
 	return attnamelist;
 }
@@ -791,16 +828,17 @@ copy_read_data(void *outbuf, int minread, int maxread)
  * qualifications to be used in the COPY command.
  */
 static void
-fetch_remote_table_info(char *nspname, char *relname,
+fetch_remote_table_info(char *nspname, char *relname, bool **attgenlist,
 						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
+	bool	   *attgenlist_res;
 	int			natt;
 	ListCell   *lc;
 	Bitmapset  *included_cols = NULL;
@@ -948,18 +986,24 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "SELECT a.attnum,"
 					 "       a.attname,"
 					 "       a.atttypid,"
-					 "       a.attnum = ANY(i.indkey)"
+					 "       a.attnum = ANY(i.indkey),"
+					 "		 a.attgenerated != ''"
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
 					 " WHERE a.attnum > 0::pg_catalog.int2"
-					 "   AND NOT a.attisdropped %s"
+					 "   AND NOT a.attisdropped", lrel->remoteid);
+
+	if ((walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 &&
+		walrcv_server_version(LogRepWorkerWalRcvConn) <= 160000) ||
+		!MySubscription->includegencol)
+			appendStringInfo(&cmd, " AND a.attgenerated = ''");
+
+	appendStringInfo(&cmd,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
-					 lrel->remoteid,
-					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
-					  "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
 					  lengthof(attrRow), attrRow);
 
@@ -973,6 +1017,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
+	attgenlist_res = palloc0(MaxTupleAttributeNumber * sizeof(bool));
 
 	/*
 	 * Store the columns as a list of names.  Ignore those that are not
@@ -1005,6 +1050,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
+		attgenlist_res[natt] = DatumGetBool(slot_getattr(slot, 5, &isnull));
+
 		/* Should never happen. */
 		if (++natt >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1015,7 +1062,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	ExecDropSingleTupleTableSlot(slot);
 
 	lrel->natts = natt;
-
+	*attgenlist = attgenlist_res;
 	walrcv_clear_result(res);
 
 	/*
@@ -1123,10 +1170,12 @@ copy_table(Relation rel)
 	List	   *attnamelist;
 	ParseState *pstate;
 	List	   *options = NIL;
+	bool 	   *attgenlist;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &attgenlist,
+							&lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1135,11 +1184,17 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	attnamelist = make_copy_attnamelist(relmapentry, attgenlist);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 
-	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	/*
+	 * Regular table with no row filter and 'include_generated_columns' us not
+	 * specified as 'true' during creation of subscription.
+	 */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL &&
+		!MySubscription->includegencol)
 	{
 		appendStringInfo(&cmd, "COPY %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1169,17 +1224,22 @@ copy_table(Relation rel)
 	else
 	{
 		/*
-		 * For non-tables and tables with row filters, we need to do COPY
-		 * (SELECT ...), but we can't just do SELECT * because we need to not
-		 * copy generated columns. For tables with any row filters, build a
-		 * SELECT query with OR'ed row filters for COPY.
+		 * For non-tables and tables with row filters and when
+		 * 'include_generated_columns' is specified as 'true', we need to do
+		 * COPY (SELECT ...), as normal COPY of generated column is not
+		 * supported. For tables with any row filters, build a SELECT query
+		 * with OR'ed row filters for COPY.
 		 */
+		int i = 0;
+		ListCell *l;
+
 		appendStringInfoString(&cmd, "COPY (SELECT ");
-		for (int i = 0; i < lrel.natts; i++)
+		foreach(l, attnamelist)
 		{
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
-			if (i < lrel.natts - 1)
+			appendStringInfoString(&cmd, quote_identifier(strVal(lfirst(l))));
+			if (i < attnamelist->length - 1)
 				appendStringInfoString(&cmd, ", ");
+			i++;
 		}
 
 		appendStringInfoString(&cmd, " FROM ");
@@ -1237,7 +1297,6 @@ copy_table(Relation rel)
 	(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
 										 NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
 	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index e687b40a56..797e66dfdb 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -41,7 +41,8 @@ typedef struct LogicalRepRelMapEntry
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
-
+extern int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel,
+									  const char *attname);
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
 extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2e67509ccd..0f2a25cdc1 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -99,9 +99,6 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU
 ERROR:  subscription with slot_name = NONE must also set create_slot = false
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, create_slot = false);
 ERROR:  subscription with slot_name = NONE must also set enabled = false
--- fail - copy_data and include_generated_columns are mutually exclusive options
-CREATE SUBSCRIPTION sub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (include_generated_columns = true);
-ERROR:  copy_data = true and include_generated_columns = true are mutually exclusive options
 -- ok - with slot_name = NONE
 CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
 WARNING:  subscription was created, but is not connected
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index eefd1dea7b..3e5ba4cb8c 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -60,9 +60,6 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = false);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, create_slot = false);
 
--- fail - copy_data and include_generated_columns are mutually exclusive options
-CREATE SUBSCRIPTION sub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (include_generated_columns = true);
-
 -- ok - with slot_name = NONE
 CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
 -- fail
diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl
index 11d356bf29..297816a4a7 100644
--- a/src/test/subscription/t/011_generated.pl
+++ b/src/test/subscription/t/011_generated.pl
@@ -16,6 +16,8 @@ $node_publisher->start;
 
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"max_logical_replication_workers = 10");
 $node_subscriber->start;
 
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -32,6 +34,14 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab3 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 10) STORED)"
 );
 
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab4 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED, c int GENERATED ALWAYS AS (a * 2) STORED)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab5 (a int PRIMARY KEY, b int)"
+);
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED, c int)"
 );
@@ -44,12 +54,25 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab3 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 20) STORED)"
 );
 
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int GENERATED ALWAYS AS (a * 22) STORED)"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab5 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)"
+);
+
+
 # data for initial sync
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1 (a) VALUES (1), (2), (3)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab2 (a) VALUES (1), (2), (3)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab4 (a) VALUES (1), (2), (3)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab5 (a, b) VALUES (1, 1), (2, 2), (3, 3)");
 
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub1 FOR TABLE tab1");
@@ -57,6 +80,10 @@ $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub2 FOR TABLE tab2");
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub3 FOR TABLE tab3");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub4 FOR TABLE tab4");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub5 FOR TABLE tab5");
 
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
@@ -69,6 +96,10 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (include_generated_columns = true, copy_data = false)"
 	);
 
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub4 CONNECTION '$publisher_connstr' PUBLICATION pub4 WITH (include_generated_columns = true)"
+	);
+
 # Wait for initial sync of all subscriptions
 $node_subscriber->wait_for_subscription_sync;
 
@@ -108,6 +139,33 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab3");
 is( $result, qq(4|24
 5|25), 'generated columns replicated to non-generated column on subscriber');
 
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (4), (5)");
+
+$node_publisher->wait_for_catchup('sub4');
+
+# gen-col 'b' in publisher replicating to NOT gen-col 'b' on subscriber
+# gen-col 'c' in publisher not replicating to gen-col 'c' on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab4 ORDER BY a");
+is( $result, qq(1|2|22
+2|4|44
+3|6|66
+4|8|88
+5|10|110), 'replicate generated column with initial sync');
+
+# NOT gen-col 'b' in publisher not replicating to gen-col 'b' on subscriber
+my $offset = -s $node_subscriber->logfile;
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub5 CONNECTION '$publisher_connstr' PUBLICATION pub5 WITH (include_generated_columns = true)"
+	);
+
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? logical replication target relation "public.tab5" is missing replicated column: "b"/, $offset);
+
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION sub5"
+	);
+
 # try it with a subscriber-side trigger
 
 $node_subscriber->safe_psql(
-- 
2.34.1

