From f5db2b348df37490df288b7c4c2dc84d4492a529 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 7 Apr 2025 09:36:29 +0530
Subject: [PATCH v7] PG17 Approach 3 Fix slot synchronization for two_phase
 enables slots.

The issue is that the transactions prepared before two-phase decoding is
enabled can fail to replicate to the subscriber after being committed on a
promoted standby following a failover. This is because the two_phase_at
field of a slot, which tracks the LSN from which two-phase decoding
starts, is not synchronized to standby servers. Without two_phase_at, the
logical decoding might incorrectly identify prepared transaction as
already replicated to the subscriber after promotion of standby server,
causing them to be skipped.

To prevent the risk of losing prepared transactions, we disallow enabling
both failover and twophase during slot creation, but permits altering
failover to true once ensured that slot's restart_lsn > two_phase_at.

The fix enforces the following conditions:
1) Always disallow creating slots with two_phase=true and failover=true.
2) Always disallow creating subscriptions with (two_phase=true, failover=true).
3) Prevent altering the slot's failover to true if two_phase=true and restart_lsn
   is less than two_phase_at. Otherwise, allow changing failover to true.
4) Disallow altering slot's failover to true when two_phase state is 'pending'.
   User can try altering failover again when two_phase state is moved beyond 'pending'.
---
 contrib/test_decoding/expected/slot.out       |  2 +
 contrib/test_decoding/sql/slot.sql            |  1 +
 src/backend/commands/subscriptioncmds.c       | 39 +++++++++++++++
 src/backend/replication/logical/logical.c     | 12 +++++
 src/backend/replication/slot.c                | 37 +++++++++++++++
 src/bin/pg_upgrade/t/003_logical_slots.pl     |  6 +--
 .../t/040_standby_failover_slots_sync.pl      | 47 +++++++++++++++++++
 src/test/regress/expected/subscription.out    |  3 ++
 src/test/regress/sql/subscription.sql         |  4 ++
 9 files changed, 148 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 7de03c79f6f..8fd762dea85 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -427,6 +427,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', '
 
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
 ERROR:  cannot enable failover for a temporary replication slot
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_twophase_true_slot', 'test_decoding', false, true, true);
+ERROR:  "failover" and "two_phase" are mutually exclusive options
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3bef..a89fe712ff6 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -182,6 +182,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'tes
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_twophase_true_slot', 'test_decoding', false, true, true);
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
 
 SELECT slot_name, slot_type, failover FROM pg_replication_slots;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9467f58a23d..e93d02ed90c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -648,6 +648,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 errmsg("password_required=false is superuser-only"),
 				 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
 
+	/*
+	 * Do not allow users to enable the failover and two_phase options
+	 * together.
+	 *
+	 * See comments atop the similar check in ReplicationSlotCreate() for a
+	 * detailed reason.
+	 */
+	if (opts.twophase && opts.failover)
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("\"%s\" and \"%s\" are mutually exclusive options",
+					   "failover", "two_phase"));
+
 	/*
 	 * If built with appropriate switch, whine when regression-testing
 	 * conventions for subscription names are violated.
@@ -1245,6 +1258,32 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								 errmsg("cannot set %s for enabled subscription",
 										"failover")));
 
+					/*
+					 * Do not allow users to enable the failover when the
+					 * two_phase state is still pending i.e., the replication
+					 * slot’s two_phase option has not yet been finalized.
+					 *
+					 * In most cases, the restriction in
+					 * ReplicationSlotAlter() is sufficient to prevent
+					 * enabling failover for a slot with two_phase enabled.
+					 * However, this additional check is necessary to handle a
+					 * race condition, when a user runs CREATE SUBSCRIPTION
+					 * with two_phase=true, but the slot's two_phase flag
+					 * hasn't been set yet, a concurrent attempt to do ALTER
+					 * SUBSCRIPTION(failover=true) may bypass the check in
+					 * ReplicationSlotAlter().
+					 *
+					 * For a detailed explanation of why enforcing this
+					 * restriction is important when combining two_phase and
+					 * failover, refer to the comments atop similar check in
+					 * ReplicationSlotCreate().
+					 */
+					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+						opts.failover)
+						ereport(ERROR,
+								errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+								errmsg("cannot enable failover for a subscription with a pending two_phase state"));
+
 					/*
 					 * The changed failover option of the slot can't be rolled
 					 * back.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e941bb491d8..2421e03d088 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -613,6 +613,18 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	/* Mark slot to allow two_phase decoding if not already marked */
 	if (ctx->twophase && !slot->data.two_phase)
 	{
+		/*
+		 * Do not allow two-phase decoding for failover enabled slots.
+		 *
+		 * See comments atop the similar check in ReplicationSlotCreate() for
+		 * a detailed reason.
+		 */
+		if (slot->data.failover)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot enable two-phase decoding for failover enabled slot \"%s\"",
+							NameStr(slot->data.name))));
+
 		SpinLockAcquire(&slot->mutex);
 		slot->data.two_phase = true;
 		slot->data.two_phase_at = start_lsn;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a1d4768623f..5e0058bc328 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -343,6 +343,26 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 			ereport(ERROR,
 					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					errmsg("cannot enable failover for a temporary replication slot"));
+
+		/*
+		 * Do not allow users to enable both failover and two_phase for slots.
+		 *
+		 * This is because the two_phase_at field of a slot, which tracks the
+		 * LSN, from which two-phase decoding starts, is not synchronized to
+		 * standby servers. Without two_phase_at, the logical decoding might
+		 * incorrectly identify prepared transaction as already replicated to
+		 * the subscriber after promotion of standby server, causing them to
+		 * be skipped.
+		 *
+		 * However, both failover and two_phase enabled slots can be created
+		 * during slot synchronization because we need to retain the same
+		 * values as the remote slot.
+		 */
+		if (two_phase && !IsSyncingReplicationSlots())
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("\"%s\" and \"%s\" are mutually exclusive options",
+						   "failover", "two_phase"));
 	}
 
 	/*
@@ -848,6 +868,23 @@ ReplicationSlotAlter(const char *name, bool failover)
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("cannot enable failover for a temporary replication slot"));
 
+	/*
+	 * Do not allow users to enable failover for a two_phase enabled slot
+	 * where slot's restart_lsn is less than two_phase_at. In such cases,
+	 * there is a risk that transactions prepared before two_phase_at exist
+	 * and would be skipped during decoding.
+	 *
+	 * See comments atop the similar check in ReplicationSlotCreate() for a
+	 * detailed reason.
+	 */
+	if (failover && MyReplicationSlot->data.two_phase &&
+		MyReplicationSlot->data.restart_lsn < MyReplicationSlot->data.two_phase_at)
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot enable failover for a two-phase enabled replication slot"),
+				errdetail("The slot need to consume change upto %X/%X to enable failover.",
+						  LSN_FORMAT_ARGS(MyReplicationSlot->data.two_phase_at)));
+
 	if (MyReplicationSlot->data.failover != failover)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl
index 0a2483d3dfc..e329cc609ce 100644
--- a/src/bin/pg_upgrade/t/003_logical_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_slots.pl
@@ -173,7 +173,7 @@ $sub->start;
 $sub->safe_psql(
 	'postgres', qq[
 	CREATE TABLE tbl (a int);
-	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true', failover = 'true')
+	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
 ]);
 $sub->wait_for_subscription_sync($oldpub, 'regress_sub');
 
@@ -193,8 +193,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
 # Check that the slot 'regress_sub' has migrated to the new cluster
 $newpub->start;
 my $result = $newpub->safe_psql('postgres',
-	"SELECT slot_name, two_phase, failover FROM pg_replication_slots");
-is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster');
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
 
 # Update the connection
 my $new_connstr = $newpub->connstr . ' dbname=postgres';
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 823857bb329..dc122348f84 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -98,6 +98,53 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
 ok( $stderr =~ /ERROR:  cannot set failover for enabled subscription/,
 	"altering failover is not allowed for enabled subscription");
 
+##################################################
+# Test that the failover option can be enabled for a two_phase enabled
+# subscription.
+##################################################
+
+# Create a subscription with two_phase enabled
+$subscriber1->safe_psql('postgres',
+	"CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot, enabled = false, two_phase = true);"
+);
+
+# Enable failover for the subscription
+($result, $stdout, $stderr) = $subscriber1->psql('postgres',
+	"ALTER SUBSCRIPTION regress_mysub2 SET (failover = true)");
+
+# Confirm that the failover flag on the slot has been turned on
+is( $publisher->safe_psql(
+		'postgres',
+		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+	),
+	"t",
+	'logical slot has failover true on the publisher');
+
+# Drop the subscription
+$subscriber1->safe_psql(
+	'postgres', q{
+DROP SUBSCRIPTION regress_mysub2;
+});
+
+# Create a slot with two_phase enabled
+$publisher->psql('postgres',
+	"SELECT pg_create_logical_replication_slot('regress_mysub2', 'pgoutput', false, true, false);");
+
+# Create a subscription with two_phase enabled
+$subscriber1->safe_psql('postgres',
+	"CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (create_slot = false, copy_data = false, enabled = false, two_phase = true);"
+);
+
+# Enable failover for the subscription with two_phase in pending state
+($result, $stdout, $stderr) = $subscriber1->psql('postgres',
+	"ALTER SUBSCRIPTION regress_mysub2 SET (failover = true)");
+ok( $stderr =~ /ERROR:  cannot enable failover for a subscription with a pending two_phase state/,
+	"Enabling failover is not allowed for a two_phase pending subscription");
+
+# Drop the subscription
+$subscriber1->safe_psql(
+	'postgres', "DROP SUBSCRIPTION regress_mysub2;");
+
 ##################################################
 # Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
 ##################################################
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 0f2a25cdc19..bbd83244ba9 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -479,6 +479,9 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+ERROR:  "failover" and "two_phase" are mutually exclusive options
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c6..47fc1e5329b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -342,6 +342,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
+
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
-- 
2.34.1

