From d4d4503ea12bfd23325e78c8f2b75016f4c163c7 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 12:39:12 +0000
Subject: [PATCH v8 3/4] Abort prepared transactions while altering two_phase
 to off

If we alter the two_phase parameter from "on" to "off" and there are prepared
transactions on the subscriber, they won't be resolved. To avoid this issue, we
allow the backend to abort all prepared transactions while altering the
subscription.
---
 doc/src/sgml/ref/alter_subscription.sgml      |  9 +++-
 src/backend/access/transam/twophase.c         | 17 +++----
 src/backend/commands/subscriptioncmds.c       | 43 +++++++++++-------
 src/include/access/twophase.h                 |  3 +-
 src/test/subscription/t/099_twophase_added.pl | 44 +++++++++++++++++++
 5 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0c2894a94e..e81661aa04 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -233,8 +233,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
       <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
-      The <literal>two_phase</literal> parameter can only be altered when the
-      subscription is disabled.
      </para>
 
      <para>
@@ -256,6 +254,13 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
       option is enabled.
      </para>
+
+     <para>
+      The <literal>two_phase</literal> parameter can only be altered when the
+      subscription is disabled. When altering the parameter from <literal>on</literal>
+      to <literal>off</literal>, the backend process checks prepared
+      transactions done by the logical replication worker and aborts them.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 66fa591eb5..f384bd8c0a 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2720,13 +2720,13 @@ IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
 }
 
 /*
- * LookupGXactBySubid
- *		Check if the prepared transaction done by apply worker exists.
+ * GetGidListBySubid
+ *      Get a list of GIDs which is PREPARE'd by the given subscription.
  */
-bool
-LookupGXactBySubid(Oid subid)
+List *
+GetGidListBySubid(Oid subid)
 {
-	bool		found = false;
+	List *list = NIL;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2736,11 +2736,8 @@ LookupGXactBySubid(Oid subid)
 		/* Ignore not-yet-valid GIDs. */
 		if (gxact->valid &&
 			IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
-		{
-			found = true;
-			break;
-		}
+			list = lappend(list, pstrdup(gxact->gid));
 	}
 	LWLockRelease(TwoPhaseStateLock);
-	return found;
+	return list;
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6b2cb71dac..cabbf9eab9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1176,27 +1176,38 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					logicalrep_workers_stop(subid);
 
 					/*
-					 * two_phase cannot be disabled if there are any
-					 * uncommitted prepared transactions present.
-					 */
-					if (!opts.twophase &&
-						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
-						LookupGXactBySubid(subid))
-						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot disable two_phase when uncommitted prepared transactions present"),
-								 errhint("Resolve these transactions and try again")));
-
-					/*
-					 * Since the altering two_phase option of subscriptions
-					 * also leads to the change of slot option, this command
-					 * cannot be rolled back. So prevent we are in the
-					 * transaction block.
+					 * If two_phase was enabled, there is a possibility the
+					 * transactions has already been PREPARE'd. They must be
+					 * checked and rolled back.
 					 */
 					if (!opts.twophase)
+					{
+						List *prepared_xacts;
+
+						/*
+						 * Since the altering two_phase option of subscriptions
+						 * (especially on->off case) also leads to the
+						 * change of slot option, this command cannot be rolled
+						 * back. So prevent we are in the transaction block.
+						 */
 						PreventInTransactionBlock(isTopLevel,
 												  "ALTER SUBSCRIPTION ... SET (two_phase = off)");
 
+						/*
+						 * To prevent prepared transactions from being
+						 * isolated, they must manually be aborted.
+						 */
+						if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+							(prepared_xacts = GetGidListBySubid(subid)) != NIL)
+						{
+							/* Abort all listed transactions */
+							foreach_ptr(char, gid, prepared_xacts)
+								FinishPreparedTransaction(gid, false);
+
+							list_free_deep(prepared_xacts);
+						}
+					}
+
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
 						CharGetDatum(opts.twophase ?
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index d37e06fdee..f7a5cf0c12 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "nodes/pg_list.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -65,6 +66,6 @@ extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 
 extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid,
 								   int szgid);
-extern bool LookupGXactBySubid(Oid subid);
+extern List *GetGidListBySubid(Oid subid);
 
 #endif							/* TWOPHASE_H */
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index 62f4acad27..59baa3eadc 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -92,4 +92,48 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, q(5),
    "prepared transactions done before altering can be replicated");
 
+# Check the case that prepared transactions exist on the subscriber node
+#
+# If the two_phase is altering from "on" to "off" and there are prepared
+# transactions on the subscriber, they must be aborted. This test checks it.
+
+# Prepare a transaction to insert some tuples into the table
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(6, 10));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Verify the prepared transaction has been replicated to the subscriber because
+# two_phase is set to "on".
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "transaction has been prepared on subscriber");
+
+# Toggle the two_phase to "off" before the COMMIT PREPARED
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION regress_sub DISABLE;
+    ALTER SUBSCRIPTION regress_sub SET (two_phase = off);
+    ALTER SUBSCRIPTION regress_sub ENABLE;");
+
+# Verify the prepared transaction are aborted because two_phase is changed to
+# "off".
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "prepared transaction done by worker is aborted");
+
+# Do COMMIT PREPARED the prepared transaction
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Verify inserted tuples are replicated
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(10) FROM tab_full;");
+is($result, q(10),
+   "prepared transactions on publisher can be replicated");
+
 done_testing();
-- 
2.43.0

