From 683c4fc3e767a068eca0c85561186c1322287d69 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH] pg_createsubscriber: Drop pre-existing subscriptions from the
 converted instance

Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node at the end of this command.
---
 src/bin/pg_basebackup/pg_createsubscriber.c   | 167 +++++++++++++++++-
 .../t/041_pg_createsubscriber_added.pl        | 118 +++++++++++++
 2 files changed, 283 insertions(+), 2 deletions(-)
 create mode 100644 src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 1138c20e56..b5f62c7c54 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -57,6 +57,9 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	int			num_subscriptions;	/* number of pre-existing subscriptions */
+	char	  **pre_subnames;		/* subscription name */
+	char	  **pre_slotnames;		/* used replication slots name */
 };
 
 static void cleanup_objects_atexit(void);
@@ -78,7 +81,7 @@ static bool server_is_in_recovery(PGconn *conn);
 static char *generate_object_name(PGconn *conn);
 static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
-static void check_subscriber(const struct LogicalRepInfo *dbinfo);
+static void check_subscriber(struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
 							 const char *consistent_lsn);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
@@ -103,6 +106,11 @@ static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 
+static void obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo);
+static void disable_subscription(PGconn *conn, const char *subname);
+static void enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo);
+static void drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo);
+
 #define	USEC_PER_SEC	1000000
 #define	WAIT_INTERVAL	1		/* 1 second */
 
@@ -912,6 +920,152 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 		exit(1);
 }
 
+/*
+ * Connect to the primary to obtain a list of subscriptions and disable them.
+ * They will be enabled again on the primary and dropped on the converted node.
+ */
+static void
+obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer query = createPQExpBuffer();
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		int			ntups;
+
+		/* Connect to publisher */
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+
+		appendPQExpBuffer(query,
+						  "SELECT s.subname, s.subslotname FROM pg_catalog.pg_subscription s "
+						  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+						  "WHERE d.datname = '%s'",
+						  dbinfo[i].dbname);
+
+		res = PQexec(conn, query->data);
+		ntups = PQntuples(res);
+
+		dbinfo[i].num_subscriptions = ntups;
+
+		if (ntups > 0)
+		{
+			dbinfo[i].pre_subnames = pg_malloc_array(char *, ntups);
+			dbinfo[i].pre_slotnames = pg_malloc_array(char *, ntups);
+
+			for (int j = 0; j < ntups; j++)
+			{
+				/*
+				 * Store name of subscriptions and specified slots. They are
+				 * used to enable them again on the primary node.
+				 */
+				dbinfo[i].pre_subnames[j] = pg_strdup(PQgetvalue(res, j, 0));
+				dbinfo[i].pre_slotnames[j] = pg_strdup(PQgetvalue(res, j, 1));
+
+				disable_subscription(conn, dbinfo[i].pre_subnames[j]);
+			}
+		}
+
+		resetPQExpBuffer(query);
+		PQclear(res);
+		disconnect_database(conn, false);
+	}
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * Disable the given subscription. After that, the subscription will be altered
+ * to set slot_name = NONE. This is needed for the standby server to allow
+ * subscription drops.
+ */
+static void
+disable_subscription(PGconn *conn, const char *subname)
+{
+	PQExpBuffer query = createPQExpBuffer();
+
+	Assert(conn != NULL);
+
+	/* Disable the given subscription */
+	appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE", subname);
+	PQclear(PQexec(conn, query->data));
+	resetPQExpBuffer(query);
+
+	/* ...and alter the slot_name to NONE */
+	appendPQExpBuffer(query,
+					  "ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
+					  subname);
+	PQclear(PQexec(conn, query->data));
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * Enable pre-existing subscriptions on the primary
+ */
+static void
+enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer query = createPQExpBuffer();
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn				 *conn;
+		struct LogicalRepInfo info = dbinfo[i];
+
+		/* Connect to publisher */
+		conn = connect_database(info.pubconninfo, false);
+
+		for (int j = 0; j < info.num_subscriptions; j++)
+		{
+			/* Restore the slot_name parameter */
+			appendPQExpBuffer(query,
+							  "ALTER SUBSCRIPTION %s SET (slot_name = %s)",
+							  info.pre_subnames[j], info.pre_slotnames[j]);
+			PQclear(PQexec(conn, query->data));
+			resetPQExpBuffer(query);
+
+			/* ...and them enable the subscription */
+			appendPQExpBuffer(query,
+							  "ALTER SUBSCRIPTION %s ENABLE",
+							  info.pre_subnames[j]);
+			PQclear(PQexec(conn, query->data));
+			resetPQExpBuffer(query);
+		}
+	}
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * Drop pre-existing subscriptions on the standby
+ */
+static void
+drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer query = createPQExpBuffer();
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn				 *conn;
+		struct LogicalRepInfo info = dbinfo[i];
+
+		/* Connect to subscriber */
+		conn = connect_database(info.subconninfo, false);
+
+		for (int j = 0; j < info.num_subscriptions; j++)
+		{
+			appendPQExpBuffer(query,
+							  "DROP SUBSCRIPTION %s;", info.pre_subnames[j]);
+			PQexec(conn, query->data);
+			resetPQExpBuffer(query);
+		}
+	}
+
+	destroyPQExpBuffer(query);
+}
+
 /*
  * Is the standby server ready for logical replication?
  *
@@ -924,7 +1078,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
  * will be broken at the end of this process (due to pg_resetwal).
  */
 static void
-check_subscriber(const struct LogicalRepInfo *dbinfo)
+check_subscriber(struct LogicalRepInfo *dbinfo)
 {
 	PGconn	   *conn;
 	PGresult   *res;
@@ -2067,6 +2221,9 @@ main(int argc, char **argv)
 	/* Check if the primary server is ready for logical replication */
 	check_publisher(dbinfo);
 
+	/* Check the existence of pre-existing subscriptions, and disable once */
+	obtain_and_disable_pre_existing_subscriptions(dbinfo);
+
 	/*
 	 * Stop the target server. The recovery process requires that the server
 	 * reaches a consistent state before targeting the recovery stop point.
@@ -2109,9 +2266,15 @@ main(int argc, char **argv)
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
 
+	/* Enable pre-subscriptions on primary */
+	enable_subscirptions_on_publisher(dbinfo);
+
 	/* Remove failover replication slots if they exist on subscriber */
 	drop_failover_replication_slots(dbinfo);
 
+	/* Drop pre-subscriptions on standby */
+	drop_pre_existing_subscriptions(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
new file mode 100644
index 0000000000..59cddb2fc1
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
@@ -0,0 +1,118 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Construct a cascading replication system like:
+#
+# node_a --(logical replication)--> node_b --(streaming replication)--> node_c
+#
+
+# Set up node A as publisher
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->start;
+
+# On node A
+# - create databases
+# - create test tables
+# - insert a row
+# - create publications
+$node_a->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_a->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_a->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_a->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub1_pg1 FOR ALL TABLES');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub2_pg1');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub1_pg2 FOR ALL TABLES');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub2_pg2');
+
+# Set up node B as subscriber/primary
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+my $bconnstr = $node_b->connstr;
+$node_b->init(allows_streaming => 'logical');
+$node_b->start;
+
+# On node B
+# - create databases
+# - create subscriptions
+$node_b->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_b->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_b->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_b->safe_psql('pg1',
+    "CREATE SUBSCRIPTION sub1_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub1_pg1");
+$node_b->safe_psql('pg1',
+    "CREATE SUBSCRIPTION sub2_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub2_pg1");
+$node_b->safe_psql('pg2',
+    "CREATE SUBSCRIPTION sub1_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub1_pg2");
+$node_b->safe_psql('pg2',
+    "CREATE SUBSCRIPTION sub2_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub2_pg2");
+
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg1');
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg2');
+
+# Confirms logical replication works well
+my $result = $node_b->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check logical replication works well');
+
+# Set up node C as standby
+$node_b->backup('backup_1');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_b, 'backup_1', has_streaming => 1);
+$node_c->append_conf(
+	'postgresql.conf', qq[
+primary_conninfo = '$bconnstr'
+]);
+$node_c->set_standby_mode();
+$node_c->start;
+
+$node_b->wait_for_replay_catchup($node_c);
+
+# Confirms streaming replication works well
+$result = $node_c->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check streaming replication works well');
+
+$node_c->stop;
+
+# Run pg_createsubscriber
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+		'--verbose',
+        '--pgdata', $node_c->data_dir,
+        '--publisher-server', $bconnstr,
+        '--socket-directory', $node_c->host,
+        '--subscriber-port', $node_c->port,
+        '--database', 'pg1',
+        '--database', 'pg2'
+	],
+	'run pg_createsubscriber on node S');
+
+# Confirms pre-existing subscriptions are removed from the converted node
+$node_c->start;
+$result = $node_c->safe_psql('pg1',
+    "SELECT subname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%';");
+is($result, '', 'check subscriptions are removed');
+
+# Confirms pre-existing subscriptions still exist on the primary node
+$result = $node_b->safe_psql('pg1',
+    "SELECT subname, subenabled, subslotname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%' ORDER BY subname;");
+is($result, 'sub1_pg1|t|sub1_pg1
+sub1_pg2|t|sub1_pg2
+sub2_pg1|t|sub2_pg1
+sub2_pg2|t|sub2_pg2', 'check subscriptions still exist');
+
+done_testing();
-- 
2.43.0

