From 5c7710edbc55b8590ac977ca96cea41465a88e13 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 7 Jul 2023 12:20:25 +0900
Subject: [PATCH v2 2/2] PoC: intitial table schema synchronization in logical
 replication.

---
 src/backend/catalog/pg_publication.c        |  20 +-
 src/backend/catalog/system_views.sql        |   2 +-
 src/backend/commands/subscriptioncmds.c     | 299 +++++++++++++++++---
 src/backend/replication/logical/tablesync.c |   4 +-
 src/include/catalog/pg_proc.dat             |   8 +-
 src/include/catalog/pg_publication.h        |   2 +-
 src/test/regress/expected/rules.out         |   2 +-
 7 files changed, 285 insertions(+), 52 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index c488b6370b..b392458642 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -802,7 +802,7 @@ GetAllTablesPublications(void)
  * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(bool pubviaroot)
+GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
@@ -825,13 +825,13 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 		Oid			relid = relForm->oid;
 
 		if (is_publishable_class(relid, relForm) &&
-			!(relForm->relispartition && pubviaroot))
+			(include_all_partitions || !(relForm->relispartition && pubviaroot)))
 			result = lappend_oid(result, relid);
 	}
 
 	table_endscan(scan);
 
-	if (pubviaroot)
+	if (pubviaroot || include_all_partitions)
 	{
 		ScanKeyInit(&key[0],
 					Anum_pg_class_relkind,
@@ -846,7 +846,7 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 			Oid			relid = relForm->oid;
 
 			if (is_publishable_class(relid, relForm) &&
-				!relForm->relispartition)
+				(include_all_partitions || !relForm->relispartition))
 				result = lappend_oid(result, relid);
 		}
 
@@ -1057,6 +1057,7 @@ Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	4
+	bool		include_all_partitions = PG_GETARG_BOOL(0);
 	FuncCallContext *funcctx;
 	List	   *table_infos = NIL;
 
@@ -1081,7 +1082,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * Deconstruct the parameter into elements where each element is a
 		 * publication name.
 		 */
-		arr = PG_GETARG_ARRAYTYPE_P(0);
+		arr = PG_GETARG_ARRAYTYPE_P(1);
 		deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
 						  &elems, NULL, &nelems);
 
@@ -1101,17 +1102,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			 * those. Otherwise, get the partitioned table itself.
 			 */
 			if (pub_elem->alltables)
-				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot,
+																   include_all_partitions);
 			else
 			{
 				List	   *relids,
 						   *schemarelids;
 
 				relids = GetPublicationRelations(pub_elem->oid,
+												 include_all_partitions ?
+												 PUBLICATION_PART_ALL :
 												 pub_elem->pubviaroot ?
 												 PUBLICATION_PART_ROOT :
 												 PUBLICATION_PART_LEAF);
 				schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+																include_all_partitions ?
+																PUBLICATION_PART_ALL :
 																pub_elem->pubviaroot ?
 																PUBLICATION_PART_ROOT :
 																PUBLICATION_PART_LEAF);
@@ -1148,7 +1154,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * data of the child table to be double-published on the subscriber
 		 * side.
 		 */
-		if (viaroot)
+		if (viaroot && !include_all_partitions)
 			filter_partitions(table_infos);
 
 		/* Construct a tuple descriptor for the result rows. */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c18fea8362..1698dd5374 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -376,7 +376,7 @@ CREATE VIEW pg_publication_tables AS
         ) AS attnames,
         pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
     FROM pg_publication P,
-         LATERAL pg_get_publication_tables(P.pubname) GPT,
+         LATERAL pg_get_publication_tables(false, P.pubname) GPT,
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d4e798baeb..bdcfb8829e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -33,6 +33,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -71,6 +72,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_COPY_SCHEMA			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -87,6 +89,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
+	bool		copy_schema;
 	bool		copy_data;
 	bool		refresh;
 	bool		binary;
@@ -99,13 +102,16 @@ typedef struct SubOpts
 	XLogRecPtr	lsn;
 } SubOpts;
 
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications,
+							  bool all_partitions);
 static void check_publications_origin(WalReceiverConn *wrconn,
 									  List *publications, bool copydata,
 									  char *origin, Oid *subrel_local_oids,
 									  int subrel_count, char *subname);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
+static void check_pg_dump_available(void);
+static void synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
 
@@ -139,6 +145,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->enabled = true;
 	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
+	if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA))
+		opts->copy_schema = false;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
 		opts->copy_data = true;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
@@ -205,6 +213,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			else
 				ReplicationSlotValidateName(opts->slot_name, ERROR);
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) &&
+				 strcmp(defel->defname, "copy_schema") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_SCHEMA;
+			opts->copy_schema = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
 				 strcmp(defel->defname, "copy_data") == 0)
 		{
@@ -388,12 +405,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (opts->copy_schema &&
+			IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "copy_schema = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
+		opts->copy_schema = false;
 		opts->copy_data = false;
 	}
 
+	/*
+	 * The initial schema sync needs a snapshot that is created and exported
+	 * while creating a replication slot.
+	 */
+	if (!opts->create_slot && IsSet(supported_opts, SUBOPT_CONNECT) &&
+		opts->copy_schema)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("subscription with %s must also set %s",
+						"copy_schema = on", "create_slot = true")));
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
@@ -591,7 +626,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_COPY_SCHEMA);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -636,6 +671,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 errmsg("password_required=false is superuser-only"),
 				 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
 
+	if (opts.copy_schema)
+		check_pg_dump_available();
+
 	/*
 	 * If built with appropriate switch, whine when regression-testing
 	 * conventions for subscription names are violated.
@@ -750,31 +788,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			check_publications_origin(wrconn, publications, opts.copy_data,
 									  opts.origin, NULL, 0, stmt->subname);
 
-			/*
-			 * Set sync state based on if we were asked to do data copy or
-			 * not.
-			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
-
 			/*
 			 * Get the table list from publisher and build local table status
 			 * info.
 			 */
-			tables = fetch_table_list(wrconn, publications);
-			foreach(lc, tables)
-			{
-				RangeVar   *rv = (RangeVar *) lfirst(lc);
-				Oid			relid;
-
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
-
-				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
-			}
+			tables = fetch_table_list(wrconn, publications, false);
 
 			/*
 			 * If requested, create permanent slot for the subscription. We
@@ -783,10 +801,22 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 */
 			if (opts.create_slot)
 			{
+				List		*tables_schema_sync;
 				bool		twophase_enabled = false;
+				char		*snapshot_name = NULL;
 
 				Assert(opts.slot_name);
 
+				/*
+				 * Get the list of tables in publications including both partitioned
+				 * tables and theirs partitions. We have to fetch the list from the
+				 * publisher before creating the replication slot below. Otherwise,
+				 * the exported snapshot will be invalidated when fetching the table
+				 * list.
+				 */
+				if (opts.copy_schema)
+					tables_schema_sync = fetch_table_list(wrconn, publications, true);
+
 				/*
 				 * Even if two_phase is set, don't create the slot with
 				 * two-phase enabled. Will enable it once all the tables are
@@ -806,8 +836,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				if (opts.twophase && !opts.copy_data && tables != NIL)
 					twophase_enabled = true;
 
-				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
-								   CRS_NOEXPORT_SNAPSHOT, NULL);
+				snapshot_name = walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
+												   opts.copy_schema ? CRS_EXPORT_SNAPSHOT:
+												   CRS_NOEXPORT_SNAPSHOT, NULL);
+
+				/* Synchronize schemas of tables that will be subscribed */
+				if (opts.copy_schema)
+				{
+					Assert(snapshot_name != NULL);
+					synchronize_table_schema(conninfo, tables_schema_sync, snapshot_name);
+				}
 
 				if (twophase_enabled)
 					UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -816,6 +854,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 						(errmsg("created replication slot \"%s\" on publisher",
 								opts.slot_name)));
 			}
+
+			/*
+			 * Set sync state based on if we were asked to do data copy or
+			 * not.
+			 */
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			foreach(lc, tables)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+			}
+
 		}
 		PG_FINALLY();
 		{
@@ -843,7 +902,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
+AlterSubscription_refresh(Subscription *sub, bool copy_data, bool copy_schema,
 						  List *validate_publications)
 {
 	char	   *err;
@@ -868,6 +927,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (copy_schema)
+		check_pg_dump_available();
+
 	/* Try to connect to the publisher. */
 	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
@@ -883,7 +945,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the table list from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		pubrel_names = fetch_table_list(wrconn, sub->publications, false);
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
@@ -909,6 +971,35 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 								  sub->origin, subrel_local_oids,
 								  subrel_count, sub->name);
 
+		if (copy_schema)
+		{
+			List	   *rels_schema_sync = NIL;
+
+			foreach(lc, pubrel_names)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, true);
+
+				if (!bsearch(&relid, subrel_local_oids,
+							 subrel_count, sizeof(Oid), oid_cmp))
+					rels_schema_sync = lappend(rels_schema_sync, rv);
+			}
+
+			/*
+			 * Synchronize table schemas for tables that are not present
+			 * on the subscriber node.
+			 *
+			 * XXX: There is a window between creating the table and the
+			 * tablesync worker starts processing it. If there is a DDL
+			 * for the table, the data sync could fail.
+			 */
+			if (list_length(rels_schema_sync) > 0)
+				synchronize_table_schema(sub->conninfo, rels_schema_sync,
+										 NULL);
+		}
+
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
 		 * and origins corresponding to them.
@@ -1252,7 +1343,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+				supported_opts = SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA | SUBOPT_REFRESH;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1286,7 +1377,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data,
+					AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema,
 											  stmt->publication);
 				}
 
@@ -1299,7 +1390,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
+				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1345,7 +1436,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Refresh the new list of publications. */
 					sub->publications = publist;
 
-					AlterSubscription_refresh(sub, opts.copy_data,
+					AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema,
 											  validate_publications);
 				}
 
@@ -1360,7 +1451,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA, &opts);
 
 				/*
 				 * The subscription option "two_phase" requires that
@@ -1387,7 +1478,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data, NULL);
+				AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema, NULL);
 
 				break;
 			}
@@ -1957,7 +2048,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
 	appendStringInfoString(&cmd,
 						   "SELECT DISTINCT P.pubname AS pubname\n"
 						   "FROM pg_publication P,\n"
-						   "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "     LATERAL pg_get_publication_tables(false, P.pubname) GPT\n"
 						   "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
 						   "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
 						   "WHERE C.oid = GPT.relid AND P.pubname IN (");
@@ -2044,7 +2135,8 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
  * list and row filter are specified for different publications.
  */
 static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_table_list(WalReceiverConn *wrconn, List *publications,
+				 bool all_partitions)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
@@ -2081,10 +2173,11 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
 						 "       FROM pg_class c\n"
 						 "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
-						 "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+						 "         JOIN ( SELECT (pg_get_publication_tables(%s, VARIADIC array_agg(pubname::text))).*\n"
 						 "                FROM pg_publication\n"
 						 "                WHERE pubname IN ( %s )) AS gpt\n"
 						 "             ON gpt.relid = c.oid\n",
+						 all_partitions ? "true" : "false",
 						 pub_names.data);
 
 		pfree(pub_names.data);
@@ -2290,6 +2383,140 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
 	return oldpublist;
 }
 
+/*
+ * Check and raise an ERROR if table schema copy is requested but pg_dump command is
+ * neither not found nor executable.
+ */
+static void
+check_pg_dump_available(void)
+{
+	char path[MAXPGPATH];
+
+	if (find_my_exec("pg_dump", path) < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not find \"%s\" in executable path", "pg_dump")));
+}
+
+/*
+ * Fetch the table schema of the given table and restore into the subscriber.
+ *
+ * XXX currently it doesn't schema (IOW namespace) so the schema has to already
+ * be present on the subscriber, is that okay? or do we want to create it? But
+ * if we want to do that, we need to consider the case where the schema has
+ * non-default options.
+ */
+static void
+synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name)
+{
+	StringInfoData command;
+	FILE *handle;
+	char full_path[MAXPGPATH];
+	ListCell *lc;
+	int ret;
+
+	/*
+	 * We've checked the availability of pg_dump at a CREATE SUBSCRIPTION or a
+	 * ALTER SUBSCRIPTION ... REFRESH PUBLICATION time, but check it again in
+	 * case the pg_dump command becomes unavailable.
+	 */
+	if (find_my_exec("pg_dump", full_path) < 0)
+		ereport(ERROR,
+				(errmsg("could not find \"%s\" in executable path", "pg_dump")));
+
+	/*
+	 * Construct pg_dump command. We dump only the table definition without
+	 * any table-dependent objects such as indexes and triggers. Also, we specify
+	 * the snapshot that has been exported while creating the replication slot for
+	 * tablesync. The table name in --table option must be quoted to avoid the
+	 * table name from being interpreted as a regular expression.
+	 *
+	 * Since the publisher could be a different major version PostgreSQL, we
+	 * use --quote-all-identifiers option.
+	 *
+	 * The outputs are redirected to this backend's input and executed via SPI.
+	 *
+	 * XXX: who should be the owner of the new table?
+	 */
+	initStringInfo(&command);
+	appendStringInfo(&command,
+					 "%s --format=p --schema-only --username %s --dbname \"%s\" --no-table-dependents --quote-all-identifiers --file -",
+					 full_path, GetUserNameFromId(GetUserId(), true),
+					 conninfo);
+
+	if (snapshot_name)
+		appendStringInfo(&command, " --snapshot=%s", snapshot_name);
+
+	foreach(lc, tables)
+	{
+		RangeVar *rv = (RangeVar *) lfirst(lc);
+
+		/*
+		 * Error if the table is already present on the subscriber. Please note
+		 * that concurrent DDLs can create the table as we don't acquire any lock
+		 * on the table.
+		 *
+		 * XXX: do we want to overwrite it (or optionally)?
+		 */
+		if (OidIsValid(RangeVarGetRelid(rv, AccessShareLock, true)))
+			ereport(ERROR,
+					(errmsg("existing table %s cannot synchronize table schema",
+							rv->relname)));
+
+		appendStringInfo(&command, " --table '%s'",
+						 quote_qualified_identifier(rv->schemaname, rv->relname));
+	}
+
+	/* Open SPI context. */
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	PG_TRY();
+	{
+		char	buf[1024];
+		StringInfoData querybuf;
+
+		elog(DEBUG3, "executing pg_dump command \"%s\"", command.data);
+
+		handle = OpenPipeStream(command.data, "r");
+		if (handle == NULL)
+			elog(ERROR, "could not execute command \"%s\": %m", command.data);
+
+		initStringInfo(&querybuf);
+
+		/*
+		 * Gathering all commands into one string. Since we dump only schema of the
+		 * particular table, the command would not be long.
+		 */
+		while (fgets(buf, sizeof(buf), handle))
+			appendStringInfoString(&querybuf, buf);
+
+		/*
+		 * If the pg_dump command failed, there is no output in the result handle
+		 * and the pg_dump's error messages are written into the server log.
+		 */
+		if (querybuf.len == 0)
+			elog(ERROR, "failed to execute command \"%s\"", command.data);
+
+		elog(DEBUG5, "executing dumped DDLs %s", querybuf.data);
+		ret = SPI_exec(querybuf.data, 0);
+		if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT)
+			elog(ERROR, "SPI_exec returned %d: %s", ret, querybuf.data);
+	}
+	PG_FINALLY();
+	{
+		ClosePipeStream(handle);
+	}
+	PG_END_TRY();
+
+	/* Close SPI context */
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	/* make the newly created table visible to us */
+	CommandCounterIncrement();
+}
+
 /*
  * Extract the streaming mode value from a DefElem.  This is like
  * defGetBoolean() but also accepts the special value of "parallel".
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..d870a9f69c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -846,7 +846,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 						 "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
 						 "   THEN NULL ELSE gpt.attrs END)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
+						 "  LATERAL pg_get_publication_tables(false, p.pubname) gpt,"
 						 "  pg_class c"
 						 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
 						 "   AND p.pubname IN ( %s )",
@@ -1028,7 +1028,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		appendStringInfo(&cmd,
 						 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
+						 "  LATERAL pg_get_publication_tables(false, p.pubname) gpt"
 						 " WHERE gpt.relid = %u"
 						 "   AND p.pubname IN ( %s )",
 						 lrel->remoteid,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..ec55a22fe1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11823,10 +11823,10 @@
   descr => 'get information of the tables that are part of the specified publications',
   proname => 'pg_get_publication_tables', prorows => '1000',
   provariadic => 'text', proretset => 't', provolatile => 's',
-  prorettype => 'record', proargtypes => '_text',
-  proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}',
-  proargmodes => '{v,o,o,o,o}',
-  proargnames => '{pubname,pubid,relid,attrs,qual}',
+  prorettype => 'record', proargtypes => 'bool _text',
+  proallargtypes => '{bool,_text,oid,oid,int2vector,pg_node_tree}',
+  proargmodes => '{i,v,o,o,o,o}',
+  proargnames => '{include_all_partitions,pubname,pubid,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 6ecaa2a01e..dd3c27d319 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -132,7 +132,7 @@ typedef enum PublicationPartOpt
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions);
 extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetSchemaPublications(Oid schemaid);
 extern List *GetSchemaPublicationRelations(Oid schemaid,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7fd81e6a7d..fd805595bd 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname,
           WHERE ((a.attrelid = gpt.relid) AND (a.attnum = ANY ((gpt.attrs)::smallint[])))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual),
+    LATERAL pg_get_publication_tables(false, VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
-- 
2.31.1

