From 35d6a2a4f3ffe42aeebb56f219cd7004777baf10 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 12:39:12 +0000
Subject: [PATCH v4 3/3] Abort prepared transactions while altering two_phase
 to false

---
 doc/src/sgml/ref/alter_subscription.sgml      | 10 +++++-
 src/backend/access/transam/twophase.c         | 19 +++++-----
 src/backend/commands/subscriptioncmds.c       | 27 +++++++++++---
 src/include/access/twophase.h                 |  3 +-
 src/test/subscription/t/099_twophase_added.pl | 35 +++++++++++++++++++
 5 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 20b45e36e0..cfd2a18e2c 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -231,7 +231,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
       <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
-      <literal>two_phase</literal> can be altered only for disabled subscription.
      </para>
 
      <para>
@@ -253,6 +252,15 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
       option is enabled.
      </para>
+
+     <para>
+      <literal>two_phase</literal> can be altered only for disabled
+      subscriptions. When altering the parameter from <literal>true</literal>
+      to <literal>false</literal>,  the backend process checks prepared
+      transactions done by the logical replication worker and aborts them. If
+      prepared transactions are found, the parameter cannot be altered to
+      <literal>false</literal> inside a transaction block.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 495f99a357..9121195725 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2702,13 +2702,13 @@ checkGid(char *gid, Oid subid)
 }
 
 /*
- * LookupGXactBySubid
- *		Check if the prepared transaction done by apply worker exists.
+ * GetGidListBySubid
+ *      Get a list of GIDs which is PREPARE'd by the given subscription.
  */
-bool
-LookupGXactBySubid(Oid subid)
+List *
+GetGidListBySubid(Oid subid)
 {
-	bool		found = false;
+	List *list = NIL;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2717,11 +2717,10 @@ LookupGXactBySubid(Oid subid)
 
 		/* Ignore not-yet-valid GIDs. */
 		if (gxact->valid && checkGid(gxact->gid, subid))
-		{
-			found = true;
-			break;
-		}
+			list = lappend(list, pstrdup(gxact->gid));
+
 	}
 	LWLockRelease(TwoPhaseStateLock);
-	return found;
+
+	return list;
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0d80d6e110..cf3e5c64b0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1178,6 +1178,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* XXX */
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
+					List *prepared_xacts = NIL;
+
 					/*
 					 * two_phase can be only changed for disabled
 					 * subscriptions
@@ -1194,13 +1196,28 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 */
 					logicalrep_workers_stop(subid);
 
-					/* Check whether the number of prepared transactions */
+					/*
+					 * If two phase was enabled, there is a possibility the
+					 * transactions has already been PREPARE'd.
+					 */
 					if (!opts.twophase &&
 						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
-						LookupGXactBySubid(subid))
-						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
+						(prepared_xacts = GetGidListBySubid(subid)) != NIL)
+					{
+						ListCell	*cell;
+
+						/* Must not be in the transaction */
+						PreventInTransactionBlock(isTopLevel,
+												  "ALTER SUBSCRIPTION ... SET (two_phase = ...)");
+
+						/* Abort all listed transactions */
+						foreach(cell, prepared_xacts)
+						{
+							FinishPreparedTransaction((char *) lfirst(cell),
+													  false);
+							prepared_xacts = list_delete_cell(prepared_xacts, cell);
+						}
+					}
 
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index d493ed24c5..95770bbd69 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "nodes/pg_list.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -63,6 +64,6 @@ extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
 
-extern bool LookupGXactBySubid(Oid subid);
+extern List *GetGidListBySubid(Oid subid);
 
 #endif							/* TWOPHASE_H */
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index c13a37675a..a8135b671c 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -69,4 +69,39 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, q(5),
    "prepared transactions done before altering can be replicated");
 
+######
+# Check the case that prepared transactions exist on subscriber node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(6, 10));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "transaction has been prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = off);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "prepared transaction done by worker is aborted");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(10) FROM tab_full;");
+is($result, q(10),
+   "prepared transactions on publisher can be replicated");
+
 done_testing();
-- 
2.43.0

