From ef005279f8b008e0593fb290cc4a30fbc275ab76 Mon Sep 17 00:00:00 2001
From: bucoo <bucoo@sohu.com>
Date: Mon, 19 Oct 2020 21:31:16 +0800
Subject: [PATCH 2/2] Parallel aggregate support using batch sort

---
 src/backend/optimizer/plan/planner.c              | 53 ++++++++++++++++
 src/include/nodes/pathnodes.h                     |  1 +
 src/test/regress/expected/partition_aggregate.out | 73 +++++++++++++++++++++++
 src/test/regress/sql/partition_aggregate.sql      | 25 ++++++++
 4 files changed, 152 insertions(+)

diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index ac7c2a52be..27680dbeb3 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3880,6 +3880,11 @@ create_grouping_paths(PlannerInfo *root,
 		extra.havingQual = parse->havingQual;
 		extra.targetList = parse->targetList;
 		extra.partial_costs_set = false;
+		if (parse->groupClause != NIL &&
+			(gd == NULL || gd->rollups == NIL))
+			extra.hashable_groups = grouping_get_hashable(parse->groupClause);
+		else
+			extra.hashable_groups = NIL;
 
 		/*
 		 * Determine whether partitionwise aggregation is in theory possible.
@@ -6705,6 +6710,54 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 			}
 		}
 
+		/*
+		 * create simple agg using parallel
+		 */
+		if (grouped_rel->consider_parallel &&
+			extra->hashable_groups != NIL &&
+			input_rel->partial_pathlist != NIL &&
+			dNumGroups >= BATCH_SORT_MIN_BATCHES)
+		{
+			Path	   *path;
+			uint32		numBatches = (uint32)dNumGroups;
+			if (numBatches > BATCH_SORT_MAX_BATCHES)
+				numBatches = BATCH_SORT_MAX_BATCHES;
+			Assert(parse->groupingSets == NIL);
+			Assert(parse->groupClause != NIL);
+			foreach (lc, input_rel->partial_pathlist)
+			{
+				double numGroups = dNumGroups / numBatches;
+				if (numGroups < 1.0)
+					numGroups = 1.0;
+				path = (Path*)create_batchsort_path(root,
+													grouped_rel,
+													lfirst(lc),
+													root->group_pathkeys,
+													extra->hashable_groups,
+													numBatches,
+													true);
+				if (parse->hasAggs)
+					path = (Path*)create_agg_path(root,
+												  grouped_rel,
+												  path,
+												  grouped_rel->reltarget,
+												  AGG_SORTED,
+												  AGGSPLIT_SIMPLE,
+												  parse->groupClause,
+												  havingQual,
+												  agg_costs,
+												  numGroups);
+				else
+					path = (Path*)create_group_path(root,
+													grouped_rel,
+													path,
+													parse->groupClause,
+													havingQual,
+													numGroups);
+				add_partial_path(grouped_rel, path);
+			}
+		}
+
 		/*
 		 * Instead of operating directly on the input relation, we can
 		 * consider finalizing a partially aggregated path.
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 273bdda452..80ba8fccdc 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -2502,6 +2502,7 @@ typedef struct
 	bool		target_parallel_safe;
 	Node	   *havingQual;
 	List	   *targetList;
+	List	   *hashable_groups;
 	PartitionwiseAggregateType patype;
 } GroupPathExtraData;
 
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index 45c698daf4..b187c1080b 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -1516,3 +1516,76 @@ SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) <
  21 | 6000 | 6.0000000000000000 |  1000
 (6 rows)
 
+-- simple agg in parallel
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size = 0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
+                  QUERY PLAN                  
+----------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  GroupAggregate
+         Group Key: unique2
+         ->  Parallel BatchSort
+               Sort Key: unique2
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  GroupAggregate
+               Group Key: tenk1.unique2
+               ->  Parallel BatchSort
+                     Sort Key: tenk1.unique2
+                     ->  Parallel Seq Scan on tenk1
+(8 rows)
+
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+ count 
+-------
+ 10000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+                               QUERY PLAN                               
+------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Nested Loop
+                     ->  GroupAggregate
+                           Group Key: tenk1_1.unique2
+                           ->  Parallel BatchSort
+                                 Sort Key: tenk1_1.unique2
+                                 ->  Parallel Seq Scan on tenk1 tenk1_1
+                     ->  Index Scan using tenk1_unique2 on tenk1
+                           Index Cond: (unique2 = (count(*)))
+(12 rows)
+
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+ count 
+-------
+ 10000
+(1 row)
+
+ABORT;
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index 117f65ecb4..3e50a48d37 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -330,3 +330,28 @@ RESET parallel_setup_cost;
 EXPLAIN (COSTS OFF)
 SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3;
 SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3;
+
+-- simple agg in parallel
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size = 0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+ABORT;
-- 
2.16.3

