From 00c9628aa8475e7e9215b5887bb71614cffba56b Mon Sep 17 00:00:00 2001
From: wangxiaoran <fanfuxiaoran@gmail.com>
Date: Thu, 8 Aug 2024 11:02:20 +0800
Subject: [PATCH] Improve pqmq

Allow the param 'dsm_segment *seg' to be NULL in function
'pq_redirect_to_shm_mq'. As sometimes the shm_mq is created
in shared memory instead of DSM.

Add function 'pq_leave_shm_mq' to allow the process to go
back to the previous pq environment.
---
 src/backend/libpq/pqmq.c | 22 +++++++++++++++++++++-
 src/include/libpq/pqmq.h |  1 +
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index fd735e2fea..7609099629 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -28,6 +28,10 @@ static bool pq_mq_busy = false;
 static pid_t pq_mq_parallel_leader_pid = 0;
 static ProcNumber pq_mq_parallel_leader_proc_number = INVALID_PROC_NUMBER;
 
+static PQcommMethods *pre_PqCommMethods = NULL;
+static CommandDest pre_whereToSendOutput = DestNone;
+static ProtocolVersion pre_FrontendProtocol = PG_PROTOCOL_LATEST;
+
 static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
 static void mq_comm_reset(void);
 static int	mq_flush(void);
@@ -52,11 +56,27 @@ static const PQcommMethods PqCommMqMethods = {
 void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
+	pre_PqCommMethods = PqCommMethods;
+	pre_whereToSendOutput = whereToSendOutput;
+	pre_FrontendProtocol = FrontendProtocol;
+
 	PqCommMethods = &PqCommMqMethods;
 	pq_mq_handle = mqh;
 	whereToSendOutput = DestRemote;
 	FrontendProtocol = PG_PROTOCOL_LATEST;
-	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+	if (seg != NULL)
+		on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+}
+
+void
+pq_leave_shm_mq()
+{
+	if (pq_mq_handle == NULL)
+		return;
+	pq_mq_handle = NULL;
+	PqCommMethods = pre_PqCommMethods;
+	whereToSendOutput = pre_whereToSendOutput;
+	FrontendProtocol = pre_FrontendProtocol;
 }
 
 /*
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index 227df8976f..30543582c2 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
+extern void pq_leave_shm_mq();
 extern void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber);
 
 extern void pq_parse_errornotice(StringInfo msg, ErrorData *edata);
-- 
2.39.3 (Apple Git-146)

