From 6272466ff10069a2079e0d700e6a6e12b6bfb7d5 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Mon, 17 Mar 2025 09:07:37 +0530
Subject: [PATCH v18] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--remove' option in the 'pg_createsubscriber'
utility to specify the object types to be removed from the subscriber.
This patch adds supports to specify 'publications' as an object type.

This feature ensures a clean and streamlined setup of logical replication by
removing publications on the subscriber that were originally replicated from
the primary server during physical replication.
These publications become redundant once the setup transitions to logical
replication and serve no further purpose.

This cleanup process removes all publications from the subscriber, regardless
of their origin. Users should back up any manually created publications before
running this command. By default, publications are preserved to avoid
unintended data loss.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  19 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 110 +++++++++++++++---
 .../t/040_pg_createsubscriber.pl              |  25 +++-
 3 files changed, 135 insertions(+), 19 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index b4b996236e4..9d7426fede0 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -146,6 +146,25 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-r</option></term>
+     <term><option>--remove</option></term>
+     <listitem>
+      <para>
+       Remove all objects of the specified type from specified databases on the
+       target server. Multiple object types can be specified by writing multiple
+       <option>--remove</option> switches. As of now, the only supported object
+       type is <literal>publications</literal>. This option is useful when
+       transitioning from physical replication to logical replication as
+       existing objects may no longer be needed. Before using this option,
+       back up any manually created publications using
+       <application>pg_dump</application> to avoid data loss.
+       If multiple publications exist, they will be logged individually for
+       better traceability.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-s <replaceable class="parameter">dir</replaceable></option></term>
      <term><option>--socketdir=<replaceable class="parameter">dir</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 6baf92e8024..48111dbd0d9 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -29,6 +29,7 @@
 #include "getopt_long.h"
 
 #define	DEFAULT_SUB_PORT	"50432"
+#define	OBJECT_PUBLICATIONS  0x1
 
 /* Command-line options */
 struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	SimpleStringList remove_objects;	/* list of objects to remove */
 };
 
 /* per-database publication/subscription info */
@@ -68,6 +70,7 @@ struct LogicalRepInfos
 {
 	struct LogicalRepInfo *dbinfo;
 	bool		two_phase;		/* enable-two-phase option */
+	bits32		remove_objects; /* flag to remove objects on subscriber */
 };
 
 static void cleanup_objects_atexit(void);
@@ -109,7 +112,9 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo,
+							 const char *pubname);
+static void drop_all_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -194,7 +199,7 @@ cleanup_objects_atexit(void)
 			if (conn != NULL)
 			{
 				if (dbinfo->made_publication)
-					drop_publication(conn, dbinfo);
+					drop_publication(conn, dbinfo, dbinfo->pubname);
 				if (dbinfo->made_replslot)
 					drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
 				disconnect_database(conn, false);
@@ -241,6 +246,8 @@ usage(void)
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
 	printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
+	printf(_("  -r, --remove=OBJECT             all objects to be removed from specified databases\n"
+			 "                                  on the subscriber\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
 	printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
@@ -1183,7 +1190,7 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		PGconn	   *conn;
 
 		/* Connect to subscriber. */
-		conn = connect_database(dbinfo[i].subconninfo, true);
+		conn = connect_database(dbinfos.dbinfo[i].subconninfo, true);
 
 		/*
 		 * We don't need the pre-existing subscriptions on the newly formed
@@ -1191,22 +1198,27 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 * get some unwarranted data or can lead to ERRORs in connecting to
 		 * such nodes.
 		 */
-		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+		check_and_drop_existing_subscriptions(conn, &dbinfos.dbinfo[i]);
 
 		/*
-		 * Since the publication was created before the consistent LSN, it is
-		 * available on the subscriber when the physical replica is promoted.
-		 * Remove publications from the subscriber because it has no use.
+		 * Since the publications were created before the consistent LSN, they
+		 * remain on the subscriber even after the physical replica is
+		 * promoted. Remove these publications from the subscriber because
+		 * they have no use. Additionally, if requested, drop all pre-existing
+		 * publications.
 		 */
-		drop_publication(conn, &dbinfo[i]);
+		if (dbinfos.remove_objects & OBJECT_PUBLICATIONS)
+			drop_all_publications(conn, &dbinfos.dbinfo[i]);
+		else
+			drop_publication(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].pubname);
 
-		create_subscription(conn, &dbinfo[i]);
+		create_subscription(conn, &dbinfos.dbinfo[i]);
 
 		/* Set the replication progress to the correct LSN */
-		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+		set_replication_progress(conn, &dbinfos.dbinfo[i], consistent_lsn);
 
 		/* Enable subscription */
-		enable_subscription(conn, &dbinfo[i]);
+		enable_subscription(conn, &dbinfos.dbinfo[i]);
 
 		disconnect_database(conn, false);
 	}
@@ -1663,10 +1675,10 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Drop the specified publication of the given database.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *pubname)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
@@ -1674,10 +1686,10 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
 	Assert(conn != NULL);
 
-	pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+	pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
 	pg_log_info("dropping publication \"%s\" in database \"%s\"",
-				dbinfo->pubname, dbinfo->dbname);
+				pubname, dbinfo->dbname);
 
 	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
@@ -1691,7 +1703,7 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
 			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+						 pubname, dbinfo->dbname, PQresultErrorMessage(res));
 			dbinfo->made_publication = false;	/* don't try again. */
 
 			/*
@@ -1708,6 +1720,38 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	destroyPQExpBuffer(str);
 }
 
+/*
+ * Drop all publications on the database.
+ */
+static void
+drop_all_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping all existing publications in database \"%s\"",
+				dbinfo->dbname);
+
+	/* Fetch all publication names */
+	res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		disconnect_database(conn, false);
+		return;
+	}
+
+	/* Drop each publication */
+	for (int i = 0; i < PQntuples(res); i++)
+		drop_publication(conn, dbinfo, PQgetvalue(res, i, 0));
+
+	PQclear(res);
+	pg_log_info("dropped all publications in database \"%s\"", dbinfo->dbname);
+}
+
 /*
  * Create a subscription with some predefined options.
  *
@@ -1914,6 +1958,7 @@ main(int argc, char **argv)
 		{"dry-run", no_argument, NULL, 'n'},
 		{"subscriber-port", required_argument, NULL, 'p'},
 		{"publisher-server", required_argument, NULL, 'P'},
+		{"remove", required_argument, NULL, 'r'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
 		{"enable-two-phase", no_argument, NULL, 'T'},
@@ -1995,7 +2040,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:r:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2025,6 +2070,17 @@ main(int argc, char **argv)
 			case 'P':
 				opt.pub_conninfo_str = pg_strdup(optarg);
 				break;
+			case 'r':
+				if (!simple_string_list_member(&opt.remove_objects, optarg))
+				{
+					simple_string_list_append(&opt.remove_objects, optarg);
+				}
+				else
+				{
+					pg_log_error("object \"%s\" specified more than once", optarg);
+					exit(1);
+				}
+				break;
 			case 's':
 				opt.socket_dir = pg_strdup(optarg);
 				canonicalize_path(opt.socket_dir);
@@ -2189,6 +2245,26 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	/* Verify the objects specified for removal from the subscriber */
+	dbinfos.remove_objects = 0x0;
+	if (opt.remove_objects.head != NULL)
+	{
+		for (SimpleStringListCell *cell = opt.remove_objects.head; cell; cell = cell->next)
+		{
+			if (pg_strcasecmp(cell->val, "publications") == 0 ||
+				pg_strcasecmp(cell->val, "all") == 0)
+			{
+				dbinfos.remove_objects |= OBJECT_PUBLICATIONS;
+			}
+			else
+			{
+				pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
+				pg_log_error_hint("The valid options are: \"all\", \"publications\"");
+				exit(1);
+			}
+		}
+	}
+
 	/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
 	pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
 	pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c35fa108ce3..2c9bd5bdb9e 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -329,6 +329,21 @@ $node_p->safe_psql($db1,
 	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
 $node_p->wait_for_replay_catchup($node_s);
+
+# Create user-defined publications, wait for streaming replication to sync them
+# to the standby, then verify that '--remove'
+# removes them.
+$node_p->safe_psql(
+	$db1, qq(
+	CREATE PUBLICATION test_pub1 FOR ALL TABLES;
+	CREATE PUBLICATION test_pub2 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s);
+
+ok($node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two pre-existing publications on subscriber');
+
 $node_s->stop;
 
 # dry run mode on node S
@@ -373,7 +388,8 @@ command_ok(
 
 # Run pg_createsubscriber on node S.  --verbose is used twice
 # to show more information.
-# In passing, also test the --enable-two-phase option
+# In passing, also test the --enable-two-phase option and
+# --remove option
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -389,7 +405,8 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
-		'--enable-two-phase'
+		'--enable-two-phase',
+		'--remove' => 'publications',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -408,6 +425,10 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+	'0', 'all publications on subscriber have been removed');
+
 # Verify that all subtwophase states are pending or enabled,
 # e.g. there are no subscriptions where subtwophase is disabled ('d')
 is( $node_s->safe_psql(
-- 
2.34.1

