From f86842e7ab6c65df5d8ca3e2bd9374e6c45a3d11 Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Fri, 4 Nov 2022 09:40:31 +0000
Subject: [PATCH] fix wrong message-length estimation

---
 src/backend/replication/logical/applyparallelworker.c | 7 +++++--
 src/include/replication/worker_internal.h             | 3 ++-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index f99fb62eb2..8da9199ba6 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -365,7 +365,7 @@ parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid,
 		 * as it has already handled this message while applying spooled
 		 * messages.
 		 */
-		result = shm_mq_send(winfo->mq_handle, strlen(winfo->pending_msg),
+		result = shm_mq_send(winfo->mq_handle, winfo->pending_length,
 							 winfo->pending_msg, false, true);
 
 		if (result != SHM_MQ_SUCCESS)
@@ -375,6 +375,7 @@ parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid,
 
 		pfree(winfo->pending_msg);
 		winfo->pending_msg = NULL;
+		winfo->pending_length = 0;
 	}
 }
 
@@ -1035,8 +1036,10 @@ parallel_apply_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
 
 			/* Save the message that was only partially sent. */
 			oldcontext = MemoryContextSwitchTo(ApplyContext);
-			winfo->pending_msg = (char *) palloc(nbytes);
+			winfo->pending_msg = palloc(nbytes);
 			memcpy(winfo->pending_msg, data, nbytes);
+			winfo->pending_length = nbytes;
+
 			MemoryContextSwitchTo(oldcontext);
 
 			/*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 29ee78b801..e527f89484 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -167,7 +167,8 @@ typedef struct ParallelApplyWorkerInfo
 	 * Used to save the message that was only partially sent to parallel apply
 	 * worker.
 	 */
-	char	*pending_msg;
+	void	   *pending_msg;
+	Size		pending_length;
 
 	/*
 	 * True if the worker is being used to process a parallel apply
-- 
2.27.0

