From e05b4966f4bbf05e086274ae19d3fcc65e469064 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Tue, 17 Jan 2023 09:18:14 +0530
Subject: [PATCH v2 1/2] Improve the code to decide the apply action.

The code missed to handle non-transactional messages and we didn't catch
it in our testing as currently such messages are simply ignored by the
apply worker. This was introduced by changes in commit 216a784829.

While testing this, I noticed that we forgot to reset stream_xid after
processing the stream stop message which could also result in the wrong
apply action after the fix for non-transactional messages.

Reported-by: Tomas Vondra
Author: Amit Kapila
Reviewed-by: Tomas Vondra, Sawada Masahiko
Discussion: https://postgr.es/m/984ff689-adde-9977-affe-cd6029e850be@enterprisedb.com
---
 src/backend/replication/logical/worker.c | 63 ++++++++++++++++--------
 1 file changed, 42 insertions(+), 21 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d8b8a374c6..ce14b7a685 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
  * The action to be taken for the changes in the transaction.
  *
  * TRANS_LEADER_APPLY:
- * This action means that we are in the leader apply worker and changes of the
- * transaction are applied directly by the worker.
+ * This action means that we are in the leader apply worker or table sync
+ * worker. The changes of the transaction are either directly applied or
+ * are read from temporary files (for streaming transactions) and then
+ * applied by the worker.
  *
  * TRANS_LEADER_SERIALIZE:
  * This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
 {
 	LogicalRepBeginData begin_data;
 
+	/* There must not be an active streaming transaction. */
+	Assert(!TransactionIdIsValid(stream_xid));
+
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
+	/* There must not be an active streaming transaction. */
+	Assert(!TransactionIdIsValid(stream_xid));
+
 	logicalrep_read_begin_prepare(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
 
 	switch (apply_action)
 	{
-		case TRANS_LEADER_SERIALIZE:
+		case TRANS_LEADER_APPLY:
 
 			/*
 			 * The transaction has been serialized to file, so replay all the
@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("duplicate STREAM START message")));
 
+	/* There must not be an active streaming transaction. */
+	Assert(!TransactionIdIsValid(stream_xid));
+
 	/* notify handle methods we're processing a remote transaction */
 	in_streamed_transaction = true;
 
@@ -1710,6 +1721,7 @@ apply_handle_stream_stop(StringInfo s)
 	}
 
 	in_streamed_transaction = false;
+	stream_xid = InvalidTransactionId;
 
 	/*
 	 * The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
 
 	switch (apply_action)
 	{
-		case TRANS_LEADER_SERIALIZE:
+		case TRANS_LEADER_APPLY:
 
 			/*
 			 * We are in the leader apply worker and the transaction has been
@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
 
 	switch (apply_action)
 	{
-		case TRANS_LEADER_SERIALIZE:
+		case TRANS_LEADER_APPLY:
 
 			/*
 			 * The transaction has been serialized to file, so replay all the
@@ -4204,7 +4216,6 @@ stream_close_file(void)
 
 	BufFileClose(stream_fd);
 
-	stream_xid = InvalidTransactionId;
 	stream_fd = NULL;
 }
 
@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
 }
 
 /*
- * Return the action to be taken for the given transaction. *winfo is
- * assigned to the destination parallel worker info when the leader apply
- * worker has to pass all the transaction's changes to the parallel apply
- * worker.
+ * Return the action to be taken for the given transaction. See
+ * TransApplyAction for information on each of the actions.
+ *
+ * *winfo is assigned to the destination parallel worker info when the leader
+ * apply worker has to pass all the transaction's changes to the parallel
+ * apply worker.
  */
 static TransApplyAction
 get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
-	else if (in_remote_transaction)
-	{
-		return TRANS_LEADER_APPLY;
-	}
 
 	/*
-	 * Check if we are processing this transaction using a parallel apply
-	 * worker.
+	 * If we are processing this transaction using a parallel apply worker then
+	 * either we send the changes to the parallel worker or if the worker is busy
+	 * then serialize the changes to the file which will later be processed by
+	 * the parallel worker.
 	 */
 	*winfo = pa_find_worker(xid);
 
-	if (!*winfo)
+	if (*winfo && (*winfo)->serialize_changes)
 	{
-		return TRANS_LEADER_SERIALIZE;
+		return TRANS_LEADER_PARTIAL_SERIALIZE;
 	}
-	else if ((*winfo)->serialize_changes)
+	else if (*winfo)
 	{
-		return TRANS_LEADER_PARTIAL_SERIALIZE;
+		return TRANS_LEADER_SEND_TO_PARALLEL;
+	}
+
+	/*
+	 * If there is no parallel worker involved to process this transaction then
+	 * we either directly apply the change or serialize it to a file which will
+	 * later be applied when the transaction finish message is processed.
+	 */
+	else if (in_streamed_transaction)
+	{
+		return TRANS_LEADER_SERIALIZE;
 	}
 	else
 	{
-		return TRANS_LEADER_SEND_TO_PARALLEL;
+		return TRANS_LEADER_APPLY;
 	}
 }
-- 
2.28.0.windows.1

