From af6e6ddff64829c1b79f01f57fcd241cd25e10f1 Mon Sep 17 00:00:00 2001
From: Laurenz Albe <laurenz.albe@cybertec.at>
Date: Mon, 1 Mar 2021 16:06:49 +0100
Subject: [PATCH v5 2/2] Allow setting parallel_workers on partitioned tables.

Sometimes it's beneficial to do

ALTER TABLE my_part_tbl SET (parallel_workers = 32);

when the query planner is planning too few workers to read from your
partitions. For example, if you have a partitioned table where each
partition is a foreign table that cannot itself have this setting on it.
Shown to give a 100%+ performance increase (in my case, 700%) when used
with cstore_fdw column store partitions.

This is also useful to lower the number of parallel workers, for
example when the executor is expected to prune most partitions.

Authors: Seamus Abshere, Amit Langote, Laurenz Albe
Discussion: https://postgr.es/m/95b1dd96-8634-4545-b1de-e2ac779beb44%40www.fastmail.com
---
 doc/src/sgml/ref/create_table.sgml            |  13 +-
 src/backend/access/common/reloptions.c        |   6 +-
 src/backend/optimizer/path/allpaths.c         | 155 ++++++++++--------
 src/backend/optimizer/plan/planner.c          |   3 +
 src/include/utils/rel.h                       |  22 ++-
 .../regress/expected/partition_aggregate.out  |  51 +++++-
 src/test/regress/sql/partition_aggregate.sql  |  17 +-
 7 files changed, 183 insertions(+), 84 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 1fe4fb6e36..af0bb62625 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1339,7 +1339,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     equivalent <literal>toast.</literal> parameter is not, the TOAST table
     will use the table's parameter value.
     These parameters, with the exception of
-    <literal>parallel_insert_enabled</literal>, are not supported on partitioned
+    <literal>parallel_insert_enabled</literal> and
+    <literal>parallel_workers</literal>, are not supported on partitioned
     tables, but may be specified for individual leaf partitions.
    </para>
 
@@ -1403,9 +1404,13 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       This sets the number of workers that should be used to assist a parallel
       scan of this table.  If not set, the system will determine a value based
-      on the relation size.  The actual number of workers chosen by the planner
-      or by utility statements that use parallel scans may be less, for example
-      due to the setting of <xref linkend="guc-max-worker-processes"/>.
+      on the relation size and the number of scanned partitions.
+      When set on a partitioned table, the specified number of workers will
+      work on distinct partitions, so the number of partitions affected by the
+      parallel operation should be taken into account.
+      The actual number of workers chosen by the planner or by utility
+      statements that use parallel scans may be less, for example due
+      to the setting of <xref linkend="guc-max-worker-processes"/>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 5a0ae99750..e778800d56 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -386,7 +386,7 @@ static relopt_int intRelOpts[] =
 		{
 			"parallel_workers",
 			"Number of parallel processes that can be used per executor node for this relation.",
-			RELOPT_KIND_HEAP,
+			RELOPT_KIND_HEAP | RELOPT_KIND_PARTITIONED,
 			ShareUpdateExclusiveLock
 		},
 		-1, 0, 1024
@@ -1974,7 +1974,9 @@ partitioned_table_reloptions(Datum reloptions, bool validate)
 {
 	static const relopt_parse_elt tab[] = {
 		{"parallel_insert_enabled", RELOPT_TYPE_BOOL,
-		offsetof(PartitionedTableRdOptions, parallel_insert_enabled)}
+		offsetof(PartitionedTableRdOptions, parallel_insert_enabled)},
+		{"parallel_workers", RELOPT_TYPE_INT,
+		offsetof(PartitionedTableRdOptions, parallel_workers)},
 	};
 
 	return (bytea *) build_reloptions(reloptions, validate,
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index d73ac562eb..e22cb6f33e 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -97,6 +97,9 @@ static void set_append_rel_size(PlannerInfo *root, RelOptInfo *rel,
 								Index rti, RangeTblEntry *rte);
 static void set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 									Index rti, RangeTblEntry *rte);
+static int compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+										   int num_live_children,
+										   bool parallel_append);
 static void generate_orderedappend_paths(PlannerInfo *root, RelOptInfo *rel,
 										 List *live_childrels,
 										 List *all_child_pathkeys);
@@ -1268,6 +1271,65 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	add_paths_to_append_rel(root, rel, live_childrels);
 }
 
+/*
+ * compute_append_parallel_workers
+ * 		Computes the number of workers to assign to scan the subpaths appended
+ * 		by a given Append path
+ */
+static int
+compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+								int num_live_children,
+								bool parallel_append)
+{
+	ListCell   *lc;
+	int			parallel_workers = 0;
+
+	/*
+	 * For partitioned rels, first see if there is a root-level setting for
+	 * parallel_workers.  But only consider if a Parallel Append plan is
+	 * to be considered.
+	 */
+	if (IS_PARTITIONED_REL(rel) &&
+		parallel_append &&
+		rel->rel_parallel_workers != -1)
+	{
+		parallel_workers = Min(rel->rel_parallel_workers,
+							   max_parallel_workers_per_gather);
+
+		/* an explicit setting overrides heuristics */
+		return parallel_workers;
+	}
+
+	/* Find the highest number of workers requested for any subpath. */
+	foreach(lc, subpaths)
+	{
+		Path	   *path = lfirst(lc);
+
+		parallel_workers = Max(parallel_workers, path->parallel_workers);
+	}
+	Assert(parallel_workers > 0 || subpaths == NIL);
+
+	/*
+	 * If the use of parallel append is permitted, always request at least
+	 * log2(# of children) workers.  We assume it can be useful to have
+	 * extra workers in this case because they will be spread out across
+	 * the children.  The precise formula is just a guess, but we don't
+	 * want to end up with a radically different answer for a table with N
+	 * partitions vs. an unpartitioned table with the same data, so the
+	 * use of some kind of log-scaling here seems to make some sense.
+	 */
+	if (parallel_append)
+	{
+		parallel_workers = Max(parallel_workers,
+							   fls(num_live_children));
+		parallel_workers = Min(parallel_workers,
+							   max_parallel_workers_per_gather);
+	}
+	Assert(parallel_workers > 0);
+
+	return parallel_workers;
+}
+
 
 /*
  * add_paths_to_append_rel
@@ -1464,50 +1526,28 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (partial_subpaths_valid && partial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, partial_subpaths,
+											list_length(live_childrels),
+											enable_parallel_append);
 
-		/* Find the highest number of workers requested for any subpath. */
-		foreach(lc, partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
+			/* Generate a partial append path. */
+			appendpath = create_append_path(root, rel, NIL, partial_subpaths,
+											NIL, NULL, parallel_workers,
+											enable_parallel_append,
+											-1);
 
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
-		}
-		Assert(parallel_workers > 0);
+			/*
+			 * Make sure any subsequent partial paths use the same row count
+			 * estimate.
+			 */
+			partial_rows = appendpath->path.rows;
 
-		/*
-		 * If the use of parallel append is permitted, always request at least
-		 * log2(# of children) workers.  We assume it can be useful to have
-		 * extra workers in this case because they will be spread out across
-		 * the children.  The precise formula is just a guess, but we don't
-		 * want to end up with a radically different answer for a table with N
-		 * partitions vs. an unpartitioned table with the same data, so the
-		 * use of some kind of log-scaling here seems to make some sense.
-		 */
-		if (enable_parallel_append)
-		{
-			parallel_workers = Max(parallel_workers,
-								   fls(list_length(live_childrels)));
-			parallel_workers = Min(parallel_workers,
-								   max_parallel_workers_per_gather);
+			/* Add the path */
+			add_partial_path(rel, (Path *) appendpath);
 		}
-		Assert(parallel_workers > 0);
-
-		/* Generate a partial append path. */
-		appendpath = create_append_path(root, rel, NIL, partial_subpaths,
-										NIL, NULL, parallel_workers,
-										enable_parallel_append,
-										-1);
-
-		/*
-		 * Make sure any subsequent partial paths use the same row count
-		 * estimate.
-		 */
-		partial_rows = appendpath->path.rows;
-
-		/* Add the path. */
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
@@ -1519,36 +1559,19 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, pa_partial_subpaths,
+											list_length(live_childrels),
+											true);
 
-		/*
-		 * Find the highest number of workers requested for any partial
-		 * subpath.
-		 */
-		foreach(lc, pa_partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
-
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
+			appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
+											pa_partial_subpaths,
+											NIL, NULL, parallel_workers, true,
+											partial_rows);
+			add_partial_path(rel, (Path *) appendpath);
 		}
-
-		/*
-		 * Same formula here as above.  It's even more important in this
-		 * instance because the non-partial paths won't contribute anything to
-		 * the planned number of parallel workers.
-		 */
-		parallel_workers = Max(parallel_workers,
-							   fls(list_length(live_childrels)));
-		parallel_workers = Min(parallel_workers,
-							   max_parallel_workers_per_gather);
-		Assert(parallel_workers > 0);
-
-		appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
-										pa_partial_subpaths,
-										NIL, NULL, parallel_workers, true,
-										partial_rows);
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2317231be5..7e268fdaf6 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3953,6 +3953,9 @@ make_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 	grouped_rel->useridiscurrent = input_rel->useridiscurrent;
 	grouped_rel->fdwroutine = input_rel->fdwroutine;
 
+	/* Copy parallel_workers. */
+	grouped_rel->rel_parallel_workers = input_rel->rel_parallel_workers;
+
 	return grouped_rel;
 }
 
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 5375a37dd1..0026420b7c 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -354,15 +354,6 @@ typedef struct StdRdOptions
 	  (relation)->rd_rel->relkind == RELKIND_MATVIEW) ? \
 	 ((StdRdOptions *) (relation)->rd_options)->user_catalog_table : false)
 
-/*
- * RelationGetParallelWorkers
- *		Returns the relation's parallel_workers reloption setting.
- *		Note multiple eval of argument!
- */
-#define RelationGetParallelWorkers(relation, defaultpw) \
-	((relation)->rd_options ? \
-	 ((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw))
-
 /* ViewOptions->check_option values */
 typedef enum ViewOptCheckOption
 {
@@ -434,6 +425,7 @@ typedef struct PartitionedTableRdOptions
 	int32		vl_len_;		/* varlena header (do not touch directly!) */
 	bool		parallel_insert_enabled;	/* enables planner's use of
 											 * parallel insert */
+	int			parallel_workers;	/* max number of parallel workers */
 } PartitionedTableRdOptions;
 
 /*
@@ -448,6 +440,18 @@ typedef struct PartitionedTableRdOptions
 	 ((StdRdOptions *) (relation)->rd_options)->parallel_insert_enabled) :		\
 	 (defaultpd))
 
+/*
+ * RelationGetParallelWorkers
+ *		Returns the relation's parallel_workers reloption setting.
+ *		Note multiple eval of argument!
+ */
+#define RelationGetParallelWorkers(relation, defaultpw) \
+	((relation)->rd_options ? \
+	 ((relation)->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? \
+	  ((PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : \
+	  ((StdRdOptions *) (relation)->rd_options)->parallel_workers \
+	 ) : (defaultpw))
+
 /*
  * RelationIsValid
  *		True iff relation descriptor is valid.
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index dfa4b036b5..611dfc87ff 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -1166,9 +1166,58 @@ SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 O
  29 | 4500 |   500
 (12 rows)
 
--- Parallelism within partitionwise aggregates
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 4
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 6
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Append
+   ->  Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+         Filter: (b = 42)
+(7 rows)
+
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+-- Parallelism within partitionwise aggregates
+SET max_parallel_workers_per_gather TO 2;
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
 -- PARTITION KEY, thus we will have a partial aggregation for them.
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index c17294b15b..c058e2e181 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -253,10 +253,23 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 
--- Parallelism within partitionwise aggregates
-
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+
+-- Parallelism within partitionwise aggregates
+
+SET max_parallel_workers_per_gather TO 2;
 
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
-- 
2.24.1

