From 069079bc0814d85b55d58f96532546cafc79f136 Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Wed, 9 Aug 2023 18:25:29 +0900
Subject: [PATCH v1 2/2] Share initial pruning result between parallel query
 processes

In the leader, ExecInitPartitionPruning() will save the resulting
bitmapset of initially valid subplans found by performing initial
pruning steps in EState.es_part_prune_results, a List of Bitmapsets
containing an element for each one in EState.es_part_prune_infos.

In workers, it will read a bitmapset at the given part_prune_index
and use that as the set of initially valid subplans, instead of
performing the initial pruning steps again.

Note that this is not just a performance optimization, but also
important to avoid different processes involved in the execution of a
Parallel Append with initial pruning possibly ending up with
different sets of initially valid subplans.  That can happen despite
the requirement that initial pruning steps may only contain
expressions that are at least stable, especially if the volatility
markings are not accurate.

Discussion: https://postgr.es/m/flat/CA%2BHiwqFA%3DswkzgGK8AmXUNFtLeEXFJwFyY3E7cTxvL46aa1OTw%40mail.gmail.com
---
 src/backend/executor/execMain.c      |  1 +
 src/backend/executor/execParallel.c  | 37 +++++++++++++++++++++---
 src/backend/executor/execPartition.c | 42 +++++++++++++++++++++++++++-
 src/backend/executor/execUtils.c     |  1 +
 src/backend/tcop/pquery.c            |  1 +
 src/include/executor/execdesc.h      |  6 ++++
 src/include/nodes/execnodes.h        |  1 +
 7 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 49293fc8ce..ba954d7934 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -856,6 +856,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
 
 	estate->es_plannedstmt = plannedstmt;
 	estate->es_part_prune_infos = plannedstmt->partPruneInfos;
+	estate->es_part_prune_results = queryDesc->part_prune_results;
 
 	/*
 	 * Next, build the ExecRowMark array from the PlanRowMark(s), if any.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index aa3f283453..1a6b2611de 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -66,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_PARTITION_PRUNE_RESULTS	UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -598,12 +599,15 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	FixedParallelExecutorState *fpes;
 	char	   *pstmt_data;
 	char	   *pstmt_space;
+	char	   *part_prune_results_data;
+	char	   *part_prune_results_space;
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
+	int			part_prune_results_len;
 	int			paramlistinfo_len;
 	int			instrumentation_len = 0;
 	int			jit_instrumentation_len = 0;
@@ -632,6 +636,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 
 	/* Fix up and serialize plan to be sent to workers. */
 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
+	part_prune_results_data = nodeToString(estate->es_part_prune_results);
 
 	/* Create a parallel context. */
 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
@@ -658,6 +663,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Estimate space for serialized part_prune_results. */
+	part_prune_results_len = strlen(part_prune_results_data) + 1;
+	shm_toc_estimate_chunk(&pcxt->estimator, part_prune_results_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Estimate space for serialized ParamListInfo. */
 	paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
@@ -752,6 +762,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	memcpy(pstmt_space, pstmt_data, pstmt_len);
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
 
+	/* Store serialized part_prune_results. */
+	part_prune_results_space = shm_toc_allocate(pcxt->toc, part_prune_results_len);
+	memcpy(part_prune_results_space, part_prune_results_data, part_prune_results_len);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS,
+				   part_prune_results_space);
+
 	/* Store serialized ParamListInfo. */
 	paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
@@ -1233,8 +1249,11 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
 						 int instrument_options)
 {
 	char	   *pstmtspace;
+	char	   *part_prune_results_space;
 	char	   *paramspace;
 	PlannedStmt *pstmt;
+	QueryDesc  *queryDesc;
+	List	   *part_prune_results;
 	ParamListInfo paramLI;
 	char	   *queryString;
 
@@ -1245,15 +1264,25 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
 
+	/* Reconstruct leader-supplied part_prune_results. */
+	part_prune_results_space =
+		shm_toc_lookup(toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, false);
+	part_prune_results = (List *) stringToNode(part_prune_results_space);
+
 	/* Reconstruct ParamListInfo. */
 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
 	paramLI = RestoreParamList(&paramspace);
 
 	/* Create a QueryDesc for the query. */
-	return CreateQueryDesc(pstmt,
-						   queryString,
-						   GetActiveSnapshot(), InvalidSnapshot,
-						   receiver, paramLI, NULL, instrument_options);
+	queryDesc = CreateQueryDesc(pstmt,
+								queryString,
+								GetActiveSnapshot(), InvalidSnapshot,
+								receiver, paramLI, NULL, instrument_options);
+
+	/* For ExecutorStart() to propagate into the EState. */
+	queryDesc->part_prune_results = part_prune_results;
+
+	return queryDesc;
 }
 
 /*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 9799968a42..a1ec26ad3c 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -1822,13 +1822,53 @@ ExecInitPartitionPruning(PlanState *planstate,
 	 * Perform an initial partition prune pass, if required.
 	 */
 	if (prunestate->do_initial_prune)
-		*initially_valid_subplans = ExecFindMatchingSubPlans(prunestate, true);
+	{
+		/*
+		 * If doing this in a parallel query plan, compute the initial pruning
+		 * steps only once in the leader and have it pass that down to the
+		 * workers so they don't need to compute that again.  That is done not
+		 * just for performance, but also to avoid situations where a worker
+		 * might end up with a different result of performing the same initial
+		 * pruning steps than the leader and/or other workers.
+		 */
+		if (IsParallelWorker())
+		{
+			Assert(estate->es_part_prune_results != NULL);
+			*initially_valid_subplans = list_nth(estate->es_part_prune_results,
+												 part_prune_index);
+		}
+		else
+		{
+			*initially_valid_subplans = ExecFindMatchingSubPlans(prunestate,
+																 true);
+
+			/*
+			 * Add the bitmap at part_prune_index to pass to parallel workers.
+			 * XXX - no way to tell whether or not we're under Gather to avoid
+			 * populating the list if not.
+			 */
+			Assert(list_length(estate->es_part_prune_results) ==
+				   part_prune_index);
+			estate->es_part_prune_results =
+				lappend(estate->es_part_prune_results,
+						*initially_valid_subplans);
+		}
+	}
 	else
 	{
 		/* No pruning, so we'll need to initialize all subplans */
 		Assert(n_total_subplans > 0);
 		*initially_valid_subplans = bms_add_range(NULL, 0,
 												  n_total_subplans - 1);
+
+		/*
+		 * Add a dummy NULL to es_part_prune_results at this index to keep it
+		 * of the same length as es_part_prune_infos.  Note that the worker
+		 * won't actually read this element, so there's no confusing NULL for
+		 * an empty set of initially valid subplans.
+		 */
+		estate->es_part_prune_results =
+				lappend(estate->es_part_prune_results, NULL);
 	}
 
 	/*
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 48366a33b5..5c6eb32e2e 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -141,6 +141,7 @@ CreateExecutorState(void)
 	estate->es_param_exec_vals = NULL;
 
 	estate->es_queryEnv = NULL;
+	estate->es_part_prune_results = NIL;
 
 	estate->es_query_cxt = qcontext;
 
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5565f200c3..ce94acdc8a 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -91,6 +91,7 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
 	qd->estate = NULL;
 	qd->planstate = NULL;
 	qd->totaltime = NULL;
+	qd->part_prune_results = NIL;
 
 	/* not yet executed */
 	qd->already_executed = false;
diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h
index af2bf36dfb..d96c383dbf 100644
--- a/src/include/executor/execdesc.h
+++ b/src/include/executor/execdesc.h
@@ -43,6 +43,12 @@ typedef struct QueryDesc
 	QueryEnvironment *queryEnv; /* query environment passed in */
 	int			instrument_options; /* OR of InstrumentOption flags */
 
+	/*
+	 * Used by ExecParallelGetQueryDesc() to save the result of initial
+	 * partition pruning sent down by the leader to workers.
+	 */
+	List		*part_prune_results; /* List of Bitmapset */
+
 	/* These fields are set by ExecutorStart */
 	TupleDesc	tupDesc;		/* descriptor for result tuples */
 	EState	   *estate;			/* executor's query-wide state */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 21c388a1f4..d92d7572eb 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -624,6 +624,7 @@ typedef struct EState
 	List	   *es_rteperminfos;	/* List of RTEPermissionInfo */
 	PlannedStmt *es_plannedstmt;	/* link to top of plan tree */
 	List		*es_part_prune_infos;	/* PlannedStmt.partPruneInfos */
+	List		*es_part_prune_results;	/* List of Bitmapset */
 	const char *es_sourceText;	/* Source text from QueryDesc */
 
 	JunkFilter *es_junkFilter;	/* top-level junk filter, if any */
-- 
2.35.3

