From 4711ee538a35c7a5c4eb4f23258e3bd8a3ab0bb4 Mon Sep 17 00:00:00 2001
From: rahila <rahilasyed.90@gmail.com>
Date: Mon, 7 Jun 2021 16:27:21 +0530
Subject: [PATCH] Add column filtering to logical replication

Add capability to specifiy column names while linking
the table to a publication, at the time of CREATE or ALTER
publication. This will allow replicating only the specified
columns. Rest of the columns on the subscriber will be populated
locally. This facilitates replication to a table on subscriber
containing only the subscribed/filtered columns.
If column names are not specified, all the columns are
replicated. REPLICA IDENTITY columns are always replicated
irrespective of the column filters.
Add a tap test for the same in src/test/subscription.
---
 src/backend/catalog/pg_publication.c         |  20 +++-
 src/backend/commands/copyfromparse.c         |   1 -
 src/backend/commands/publicationcmds.c       |  50 +++++---
 src/backend/nodes/copyfuncs.c                |  13 +++
 src/backend/nodes/equalfuncs.c               |  12 ++
 src/backend/nodes/outfuncs.c                 |  12 ++
 src/backend/nodes/readfuncs.c                |  16 +++
 src/backend/parser/gram.y                    |  27 ++++-
 src/backend/replication/logical/proto.c      |  86 +++++++++++---
 src/backend/replication/logical/relation.c   |   1 -
 src/backend/replication/logical/tablesync.c  |  96 ++++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c  |  95 ++++++++++++---
 src/include/catalog/pg_publication.h         |   9 +-
 src/include/catalog/pg_publication_rel.h     |   4 +
 src/include/nodes/nodes.h                    |   1 +
 src/include/nodes/parsenodes.h               |   6 +
 src/include/replication/logicalproto.h       |   6 +-
 src/test/subscription/t/021_column_filter.pl | 116 +++++++++++++++++++
 18 files changed, 499 insertions(+), 72 deletions(-)
 create mode 100644 src/test/subscription/t/021_column_filter.pl

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 2a2fe03c13..ad04ffe04b 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -141,18 +141,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ListCell *lc;
+	List *target_cols = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -172,10 +174,10 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -188,6 +190,14 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	foreach(lc, targetrel->columns)
+	{
+		char *colname;
+		colname = strVal(lfirst(lc));
+		target_cols = lappend(target_cols, colname);
+	}
+	values[Anum_pg_publication_rel_prattrs - 1] =
+		PointerGetDatum(strlist_to_textarray(target_cols));
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -209,7 +219,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	table_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index fdf57f1556..515728df67 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,7 +839,6 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 					 errmsg("extra data after last expected column")));
-
 		fieldno = 0;
 
 		/* Loop to read the user attributes on the line. */
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8487eeb7e6..aee5645e31 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -393,7 +393,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				PublicationRelationInfo	*newpubrel = (PublicationRelationInfo *) lfirst(newlc);
+				Relation newrel = newpubrel->relation;
 
 				if (RelationGetRelid(newrel) == oldrelid)
 				{
@@ -401,13 +402,20 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 					break;
 				}
 			}
-
+			/* Not yet in the list, open it and add to the list */
 			if (!found)
 			{
 				Relation	oldrel = table_open(oldrelid,
 												ShareUpdateExclusiveLock);
-
-				delrels = lappend(delrels, oldrel);
+				/*
+				 * Wrap relation into PublicationRelationInfo
+				 */
+				PublicationRelationInfo *pubrel = palloc(sizeof(PublicationRelationInfo));
+				pubrel->relation = oldrel;
+				pubrel->relid = oldrelid;
+				 /* This is not needed to delete a table */
+				pubrel->columns = NIL;
+				delrels = lappend(delrels, pubrel);
 			}
 		}
 
@@ -498,9 +506,9 @@ RemovePublicationRelById(Oid proid)
 }
 
 /*
- * Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
- * add them to a publication.
+ * Open relations specified by a PublicationTable list.
+ * In the returned list of PublicationRelationInfo, tables are locked
+ * in ShareUpdateExclusiveLock mode in order to add them to a publication.
  */
 static List *
 OpenTableList(List *tables)
@@ -514,10 +522,12 @@ OpenTableList(List *tables)
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = lfirst_node(RangeVar, lc);
+		PublicationTable *t = lfirst(lc);
+		RangeVar   *rv = castNode(RangeVar, t->relation);
 		bool		recurse = rv->inh;
 		Relation	rel;
 		Oid			myrelid;
+		PublicationRelationInfo	*pub_rel;
 
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
@@ -538,7 +548,11 @@ OpenTableList(List *tables)
 			continue;
 		}
 
-		rels = lappend(rels, rel);
+		pub_rel = palloc(sizeof(PublicationRelationInfo));
+		pub_rel->relation = rel;
+		pub_rel->relid = myrelid;
+		pub_rel->columns = t->columns;
+		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
 		/*
@@ -571,7 +585,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				pub_rel = palloc(sizeof(PublicationRelationInfo));
+				pub_rel->relation = rel;
+				pub_rel->relid = childrelid;
+				pub_rel->columns = t->columns;
+				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -592,9 +610,9 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc);
 
-		table_close(rel, NoLock);
+		table_close(pub_rel->relation, NoLock);
 	}
 }
 
@@ -611,7 +629,8 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc);
+		Relation	rel = pub_rel->relation;
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
@@ -619,7 +638,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
 						   RelationGetRelationName(rel));
 
-		obj = publication_add_relation(pubid, rel, if_not_exists);
+		obj = publication_add_relation(pubid, pub_rel, if_not_exists);
 		if (stmt)
 		{
 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
@@ -643,7 +662,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo	*pubrel = (PublicationRelationInfo *) lfirst(lc);
+		Relation rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 29020c908e..0763802502 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4951,6 +4951,16 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from)
 	return newnode;
 }
 
+static PublicationTable *
+_copyPublicationTable(const PublicationTable *from)
+{
+	PublicationTable  *newnode = makeNode(PublicationTable);
+
+	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
+
+	return newnode;
+}
 
 /*
  * copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h
@@ -5866,6 +5876,9 @@ copyObjectImpl(const void *from)
 		case T_PartitionCmd:
 			retval = _copyPartitionCmd(from);
 			break;
+		case T_PublicationTable:
+			retval = _copyPublicationTable(from);
+			break;
 
 			/*
 			 * MISCELLANEOUS NODES
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 8a1762000c..b0f37b2ceb 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -3114,6 +3114,15 @@ _equalValue(const Value *a, const Value *b)
 	return true;
 }
 
+static bool
+_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
+{
+	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
+
+	return true;
+}
+
 /*
  * equal
  *	  returns whether two nodes are equal
@@ -3862,6 +3871,9 @@ equal(const void *a, const void *b)
 		case T_PartitionCmd:
 			retval = _equalPartitionCmd(a, b);
 			break;
+		case T_PublicationTable:
+			retval = _equalPublicationTable(a, b);
+			break;
 
 		default:
 			elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 87561cbb6f..f04eb536c9 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -3821,6 +3821,15 @@ _outPartitionRangeDatum(StringInfo str, const PartitionRangeDatum *node)
 	WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outPublicationTable(StringInfo str, const PublicationTable *node)
+{
+	WRITE_NODE_TYPE("PUBLICATIONTABLE");
+
+	WRITE_NODE_FIELD(relation);
+	WRITE_NODE_FIELD(columns);
+}
+
 /*
  * outNode -
  *	  converts a Node into ascii string and append it to 'str'
@@ -4520,6 +4529,9 @@ outNode(StringInfo str, const void *obj)
 			case T_PartitionRangeDatum:
 				_outPartitionRangeDatum(str, obj);
 				break;
+			case T_PublicationTable:
+				_outPublicationTable(str, obj);
+				break;
 
 			default:
 
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 77d082d8b4..6b2d8efb01 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2702,6 +2702,20 @@ _readPartitionRangeDatum(void)
 	READ_DONE();
 }
 
+/*
+ * _readPublicationTable
+ */
+static PublicationTable *
+_readPublicationTable(void)
+{
+	READ_LOCALS(PublicationTable);
+
+	READ_NODE_FIELD(relation);
+	READ_NODE_FIELD(columns);
+
+	READ_DONE();
+}
+
 /*
  * parseNodeString
  *
@@ -2973,6 +2987,8 @@ parseNodeString(void)
 		return_value = _readPartitionBoundSpec();
 	else if (MATCH("PARTITIONRANGEDATUM", 19))
 		return_value = _readPartitionRangeDatum();
+	else if (MATCH("PUBLICATIONTABLE", 16))
+		return_value = _readPublicationTable();
 	else
 	{
 		elog(ERROR, "badly formatted node string \"%.32s\"...", token);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 39a2849eba..2c9af95db8 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
 				vacuum_relation_list opt_vacuum_relation_list
-				drop_option_list
+				drop_option_list publication_table_list
 
 %type <node>	opt_routine_body
 %type <groupclause> group_clause
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9620,7 +9620,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9630,6 +9630,21 @@ publication_for_tables:
 				}
 		;
 
+publication_table_list:
+			publication_table
+					{ $$ = list_make1($1); }
+		| publication_table_list ',' publication_table
+				{ $$ = lappend($1, $3); }
+		;
+
+publication_table: relation_expr opt_column_list
+		{
+			PublicationTable *n = makeNode(PublicationTable);
+			n->relation = $1;
+			n->columns = $2;
+			$$ = (Node *) n;
+		}
+	;
 
 /*****************************************************************************
  *
@@ -9651,7 +9666,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9659,7 +9674,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9667,7 +9682,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 52b65e9572..8bfecf44ca 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,9 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+					HeapTuple tuple, bool binary, Bitmapset *att_map);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, att_map);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map)
 {
 	char	   *relname;
 
@@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, att_map);
 }
 
 /*
@@ -749,20 +749,37 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary,
+							Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
 	bool		isnull[MaxTupleAttributeNumber];
 	int			i;
 	uint16		nliveatts = 0;
+	Bitmapset  *idattrs = NULL;
+	bool		replidentfull;
+	Form_pg_attribute att;
 
 	desc = RelationGetDescr(rel);
 
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	for (i = 0; i < desc->natts; i++)
 	{
+		att = TupleDescAttr(desc, i);
 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
 			continue;
+		/*
+		 * Do not increment count of attributes if not a part of column filters
+		 * except for replica identity columns or if replica identity is full.
+		 */
+		if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map)
+			&& !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -800,6 +817,16 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 			continue;
 		}
 
+		/*
+		 * Do not send attribute data if it is not a part of column filters,
+		 * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is
+		 * full, send the data.
+		 */
+		if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map)
+			&& !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
+
 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
 		if (!HeapTupleIsValid(typtup))
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
@@ -904,7 +931,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +941,34 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
+	/* fetch bitmap of REPLICATION IDENTITY attributes */
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	/* send number of live attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+		if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+		{
+			nliveatts++;
+			continue;
+		}
+		/* Skip sending if not a part of column filter */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map))
 			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
 
-	/* fetch bitmap of REPLICATION IDENTITY attributes */
-	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
-	if (!replidentfull)
-		idattrs = RelationGetIdentityKeyBitmap(rel);
-
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -937,6 +978,13 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/* Exlude filtered columns, REPLICA IDENTITY COLUMNS CAN'T BE EXCLUDED */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map) && !bms_is_member(att->attnum
+					- FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
@@ -944,7 +992,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
 
 		pq_sendbyte(out, flags);
-
 		/* attribute name */
 		pq_sendstring(out, NameStr(att->attname));
 
@@ -953,6 +1000,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 		/* attribute mode */
 		pq_sendint32(out, att->atttypmod);
+
 	}
 
 	bms_free(idattrs);
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index c37e2a7e29..d7a7b00841 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -354,7 +354,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 
 			attnum = logicalrep_rel_att_by_name(remoterel,
 												NameStr(attr->attname));
-
 			entry->attrmap->attnums[i] = attnum;
 			if (attnum >= 0)
 				missingatts = bms_del_member(missingatts, attnum);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..f336a384a1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -695,19 +696,27 @@ fetch_remote_table_info(char *nspname, char *relname,
 						LogicalRepRelation *lrel)
 {
 	WalRcvExecResult *res;
+	WalRcvExecResult *res_pub;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
+	TupleTableSlot *slot_pub;
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			pubRow[] = {TEXTARRAYOID};
 	bool		isnull;
-	int			natt;
+	int			natt,i;
+	Datum *elems;
+	int nelems;
+	List *pub_columns = NIL;
+	ListCell *lc;
+	bool	am_partition = false;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,6 +746,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
@@ -774,11 +784,78 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+
+	/*
+	 * Now, fetch the values of publications' column filters
+	 * For a partition, use pg_inherit to find the parent,
+	 * as the pg_publication_rel contains only the topmost parent
+	 * table entry in case of table partitioning.
+	 *
+	 * XXX Modify the join query to be able to fetch topmost parent,
+	 * Currently it fetches immediate parent of the partition.
+	 */
+	resetStringInfo(&cmd);
+	if (!am_partition)
+		appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel"
+					" WHERE prrelid = %u", lrel->remoteid);
+	else
+		appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel pb, pg_inherits pinh"
+					" WHERE pb.prrelid = pinh.inhparent AND pinh.inhrelid = %u", lrel->remoteid);
+
+	res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+					  lengthof(pubRow), pubRow);
+
+	if (res_pub->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res_pub->err)));
+	slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple);
+
+	while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub))
+	{
+		deconstruct_array(DatumGetArrayTypePCopy(slot_getattr(slot_pub, 1, &isnull)),
+				TEXTOID, -1, false, 'i',
+					&elems, NULL, &nelems);
+		for (i = 0; i < nelems; i++)
+			pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i]));
+		ExecClearTuple(slot_pub);
+	}
+	ExecDropSingleTupleTableSlot(slot_pub);
+	walrcv_clear_result(res_pub);
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding
+	 * to those specficied in column filters.
+	 */
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
+		char * rel_colname =
 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		bool found = false;
 		Assert(!isnull);
+		if (pub_columns != NIL)
+		{
+			foreach(lc, pub_columns)
+			{
+				char *pub_colname = lfirst(lc);
+				if(!strcmp(pub_colname, rel_colname))
+				{
+					found = true;
+					lrel->attnames[natt] = rel_colname;
+					break;
+				}
+			}
+		}
+		else
+		{
+			found = true;
+			lrel->attnames[natt] = rel_colname;
+		}
+		if (!found)
+			continue;
+
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
@@ -829,8 +906,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd93..033f36e00c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,12 +15,14 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -81,10 +83,12 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *att_map);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
+static Bitmapset* get_table_columnset(Oid relid, List *columns, Bitmapset *att_map);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -130,6 +134,7 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+	Bitmapset *att_map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->att_map);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+							Bitmapset *att_map)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -609,14 +615,24 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
-
+		/*
+		 * Do not send type information if attribute is
+		 * not present in column filter.
+		 * XXX Allow sending type information for REPLICA
+		 * IDENTITY COLUMNS with user created type.
+		 * even when they are not mentioned in column filters.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map))
+			continue;
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, att_map);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -690,10 +706,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					if (relentry->map)
 						tuple = execute_attr_map_tuple(tuple, relentry->map);
 				}
-
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -719,10 +734,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 														  relentry->map);
 					}
 				}
-
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1119,6 +1133,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid		ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1139,8 +1154,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
-		entry->map = NULL;		/* will be set by maybe_send_schema() if
-								 * needed */
+		entry->att_map = NULL;
+		entry->map = NULL;	/* will be set by maybe_send_schema() if needed */
 	}
 
 	/* Validate the entry */
@@ -1171,6 +1186,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1181,7 +1197,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
 
 				/*
 				 * For a partition, check if any of the ancestors are
@@ -1206,6 +1221,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1224,15 +1240,41 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				int                             nelems, i;
+				bool isnull;
+				Datum *elems;
+				HeapTuple pub_rel_tuple;
+				Datum pub_rel_cols;
+				List *columns = NIL;
+
+				if (ancestor_published)
+					pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(ancestor_id),
+								ObjectIdGetDatum(pub->oid));
+				else
+					pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid),
+								ObjectIdGetDatum(pub->oid));
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, pub_rel_tuple, Anum_pg_publication_rel_prattrs, &isnull);
+					if (!isnull)
+					{
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols),
+									TEXTOID, -1, false, 'i',
+								&elems, NULL, &nelems);
+						for (i = 0; i < nelems; i++)
+							columns = lappend(columns, TextDatumGetCString(elems[i]));
+						entry->att_map = get_table_columnset(publish_as_relid, columns, entry->att_map);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1244,6 +1286,25 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	return entry;
 }
 
+/*
+ * Return a bitmapset of attributes given the list of column names
+ */
+static Bitmapset*
+get_table_columnset(Oid relid, List *columns, Bitmapset *att_map)
+{
+	ListCell *cell;
+	foreach(cell, columns)
+	{
+		const char *attname = lfirst(cell);
+		int attnum = get_attnum(relid, attname);
+
+		if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_map))
+			att_map = bms_add_member(att_map,
+					attnum - FirstLowInvalidHeapAttributeNumber);
+	}
+	return att_map;
+}
+
 /*
  * Cleanup list of streamed transactions and update the schema_sent flag.
  *
@@ -1328,6 +1389,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->att_map);
+		entry->att_map = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f332bad4d4..7bdc9bb9b8 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -83,6 +83,13 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationInfo
+{
+	Oid			relid;
+	Relation	relation;
+	List	*columns;
+} PublicationRelationInfo;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 											  bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..d1d4eec2c0 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	text			prattrs[1]; /* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8895, 8896);
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..56d13ff022 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -490,6 +490,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e28248af32..bbdfaa2f45 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3624,6 +3624,12 @@ typedef struct AlterTSConfigurationStmt
 	bool		missing_ok;		/* for DROP - skip error if missing? */
 } AlterTSConfigurationStmt;
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;   /* relation to be published */
+	List	*columns;	/* List of columns in a publication table */
+} PublicationTable;
 
 typedef struct CreatePublicationStmt
 {
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 2e29513151..cb47341b6c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *att_map);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..f78fdbf52f
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,116 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+# setup
+
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|{a,B}
+tab3|{a',c'}
+test_part|{b}), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column c is not replicated');
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+#sleep(5);
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2");
+is($result, qq(1|abc), 'insert on column c is not replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 1");
+is($result, qq(1|abc), 'update on column c is not replicated');
-- 
2.17.2 (Apple Git-113)

