From d91215c25023ec839c4bc8d3116cc3e32f48a3c3 Mon Sep 17 00:00:00 2001
From: bucoo <bucoo@sohu.com>
Date: Mon, 19 Oct 2020 17:54:11 +0800
Subject: [PATCH 1/2] Parallel distinct and union support

---
 src/backend/commands/explain.c                |  15 +
 src/backend/executor/Makefile                 |   1 +
 src/backend/executor/execAmi.c                |   5 +
 src/backend/executor/execParallel.c           |  17 ++
 src/backend/executor/execProcnode.c           |  10 +
 src/backend/executor/nodeBatchSort.c          | 377 ++++++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c                 |  19 ++
 src/backend/nodes/outfuncs.c                  |  15 +
 src/backend/nodes/readfuncs.c                 |  16 ++
 src/backend/optimizer/path/costsize.c         |  82 ++++++
 src/backend/optimizer/plan/createplan.c       |  47 +++-
 src/backend/optimizer/plan/planner.c          |  43 +++
 src/backend/optimizer/plan/setrefs.c          |   1 +
 src/backend/optimizer/plan/subselect.c        |   1 +
 src/backend/optimizer/prep/prepunion.c        |  52 +++-
 src/backend/optimizer/util/pathnode.c         |  38 +++
 src/backend/optimizer/util/tlist.c            |  16 ++
 src/backend/postmaster/pgstat.c               |   3 +
 src/backend/utils/misc/guc.c                  |   9 +
 src/include/executor/nodeBatchSort.h          |  19 ++
 src/include/nodes/execnodes.h                 |  15 +
 src/include/nodes/nodes.h                     |   3 +
 src/include/nodes/pathnodes.h                 |  10 +
 src/include/nodes/plannodes.h                 |  12 +
 src/include/optimizer/cost.h                  |   6 +
 src/include/optimizer/pathnode.h              |   9 +
 src/include/optimizer/tlist.h                 |   1 +
 src/include/pgstat.h                          |   3 +-
 src/test/regress/expected/select_distinct.out |  42 +++
 src/test/regress/expected/sysviews.out        |   3 +-
 src/test/regress/expected/union.out           |  55 ++++
 src/test/regress/sql/select_distinct.sql      |  14 +
 src/test/regress/sql/union.sql                |  15 +
 33 files changed, 960 insertions(+), 14 deletions(-)
 create mode 100644 src/backend/executor/nodeBatchSort.c
 create mode 100644 src/include/executor/nodeBatchSort.h

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index c98c9b5547..16a1fb035d 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1270,6 +1270,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Sort:
 			pname = sname = "Sort";
 			break;
+		case T_BatchSort:
+			pname = sname = "BatchSort";
+			break;
 		case T_IncrementalSort:
 			pname = sname = "Incremental Sort";
 			break;
@@ -1933,6 +1936,18 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
 			show_sort_info(castNode(SortState, planstate), es);
 			break;
+		case T_BatchSort:
+			{
+				BatchSort *bsort = (BatchSort*)plan;
+				show_sort_group_keys(planstate, "Sort Key",
+									 bsort->sort.numCols, 0, bsort->sort.sortColIdx,
+									 bsort->sort.sortOperators, bsort->sort.collations,
+									 bsort->sort.nullsFirst,
+									 ancestors, es);
+				if (es->verbose)
+					ExplainPropertyInteger("batches", NULL, bsort->numBatches, es);
+			}
+			break;
 		case T_IncrementalSort:
 			show_incremental_sort_keys(castNode(IncrementalSortState, planstate),
 									   ancestors, es);
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f990c6473a..a4855a8881 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -33,6 +33,7 @@ OBJS = \
 	instrument.o \
 	nodeAgg.o \
 	nodeAppend.o \
+	nodeBatchSort.o \
 	nodeBitmapAnd.o \
 	nodeBitmapHeapscan.o \
 	nodeBitmapIndexscan.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index e2154ba86a..6eb1fe2424 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -17,6 +17,7 @@
 #include "executor/execdebug.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
+#include "executor/nodeBatchSort.h"
 #include "executor/nodeBitmapAnd.h"
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeBitmapIndexscan.h"
@@ -253,6 +254,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSort((SortState *) node);
 			break;
 
+		case T_BatchSortState:
+			ExecReScanBatchSort((BatchSortState *)node);
+			break;
+
 		case T_IncrementalSortState:
 			ExecReScanIncrementalSort((IncrementalSortState *) node);
 			break;
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 382e78fb7f..a5abd48507 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -27,6 +27,7 @@
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
+#include "executor/nodeBatchSort.h"
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
@@ -285,6 +286,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
+		case T_BatchSortState:
+			if (planstate->plan->parallel_aware)
+				ExecBatchSortEstimate((BatchSortState*)planstate, e->pcxt);
+			break;
 		case T_IncrementalSortState:
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
@@ -505,6 +510,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
+		case T_BatchSortState:
+			if (planstate->plan->parallel_aware)
+				ExecBatchSortInitializeDSM((BatchSortState*)planstate, d->pcxt);
+			break;
 		case T_IncrementalSortState:
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
@@ -991,6 +1000,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 		case T_IncrementalSortState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
+		case T_BatchSortState:
+			if (planstate->plan->parallel_aware)
+				ExecBatchSortReInitializeDSM((BatchSortState*)planstate, pcxt);
+			break;
 
 		default:
 			break;
@@ -1341,6 +1354,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
+		case T_BatchSortState:
+			if (planstate->plan->parallel_aware)
+				ExecBatchSortInitializeWorker((BatchSortState*)planstate, pwcxt);
+			break;
 		case T_IncrementalSortState:
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 01b7b926bf..c13835ddda 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -75,6 +75,7 @@
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
+#include "executor/nodeBatchSort.h"
 #include "executor/nodeBitmapAnd.h"
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeBitmapIndexscan.h"
@@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 												estate, eflags);
 			break;
 
+		case T_BatchSort:
+			result = (PlanState *) ExecInitBatchSort((BatchSort *) node,
+													 estate, eflags);
+			break;
+
 		case T_IncrementalSort:
 			result = (PlanState *) ExecInitIncrementalSort((IncrementalSort *) node,
 														   estate, eflags);
@@ -699,6 +705,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSort((SortState *) node);
 			break;
 
+		case T_BatchSortState:
+			ExecEndBatchSort((BatchSortState *) node);
+			break;
+
 		case T_IncrementalSortState:
 			ExecEndIncrementalSort((IncrementalSortState *) node);
 			break;
diff --git a/src/backend/executor/nodeBatchSort.c b/src/backend/executor/nodeBatchSort.c
new file mode 100644
index 0000000000..b090fdaf43
--- /dev/null
+++ b/src/backend/executor/nodeBatchSort.c
@@ -0,0 +1,377 @@
+#include "postgres.h"
+
+#include "common/hashfn.h"
+#include "executor/executor.h"
+#include "executor/nodeBatchSort.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port/atomics.h"
+#include "storage/barrier.h"
+#include "utils/builtins.h"
+#include "utils/tuplesort.h"
+#include "utils/typcache.h"
+
+typedef struct ParallelBatchSort
+{
+	Barrier				barrier;
+	pg_atomic_uint32	attached;
+	pg_atomic_uint32	cur_batch;
+	Size				tuplesort_size;	/* MAXIMUM_ALIGNOF*n */
+}ParallelBatchSort;
+
+#define PARALLEL_BATCH_SORT_SIZE		MAXALIGN(sizeof(ParallelBatchSort))
+#define PARALLEL_BATCH_SORT_SHARED(p,n)	\
+	(Sharedsort*)(((char*)p) + PARALLEL_BATCH_SORT_SIZE + (p)->tuplesort_size * n)
+
+#define BUILD_BATCH_DONE	1
+
+static bool ExecNextParallelBatchSort(BatchSortState *state)
+{
+	ParallelBatchSort  *parallel = state->parallel;
+	BatchSort		   *plan = castNode(BatchSort, state->ps.plan);
+	SortCoordinateData	coord;
+	uint32				cur_batch;
+	Assert(parallel != NULL);
+
+	if (state->curBatch >= 0 &&
+		state->curBatch < plan->numBatches &&
+		state->batches[state->curBatch] != NULL)
+	{
+		tuplesort_end(state->batches[state->curBatch]);
+		state->batches[state->curBatch] = NULL;
+	}
+
+	cur_batch = pg_atomic_fetch_add_u32(&parallel->cur_batch, 1);
+	if (cur_batch >= plan->numBatches)
+	{
+		state->curBatch = plan->numBatches;
+		return false;
+	}
+
+	Assert(state->batches[cur_batch] == NULL);
+	state->curBatch = cur_batch;
+	coord.isWorker = false;
+	coord.nParticipants = pg_atomic_read_u32(&parallel->attached);
+	coord.sharedsort = PARALLEL_BATCH_SORT_SHARED(parallel, cur_batch);
+	state->batches[cur_batch] = tuplesort_begin_heap(ExecGetResultType(outerPlanState(state)),
+													 plan->sort.numCols,
+													 plan->sort.sortColIdx,
+													 plan->sort.sortOperators,
+													 plan->sort.collations,
+													 plan->sort.nullsFirst,
+													 work_mem,
+													 &coord,
+													 false);
+	tuplesort_performsort(state->batches[cur_batch]);
+	return true;
+}
+
+static TupleTableSlot *ExecEmptyBatchSort(PlanState *pstate)
+{
+	return ExecClearTuple(pstate->ps_ResultTupleSlot);
+}
+
+static TupleTableSlot *ExecBatchSort(PlanState *pstate)
+{
+	TupleTableSlot *slot = pstate->ps_ResultTupleSlot;
+	BatchSortState *state = castNode(BatchSortState, pstate);
+	Assert(state->sort_Done);
+
+re_get_:
+	if (tuplesort_gettupleslot(state->batches[state->curBatch],
+							   true,
+							   false,
+							   slot,
+							   NULL) == false &&
+		state->curBatch < castNode(BatchSort, pstate->plan)->numBatches-1)
+	{
+		if (state->parallel)
+		{
+			if (ExecNextParallelBatchSort(state) == false)
+			{
+				ExecSetExecProcNode(pstate, ExecEmptyBatchSort);
+				return ExecClearTuple(slot);
+			}
+		}else
+		{
+			state->curBatch++;
+		}
+		goto re_get_;
+	}
+
+	return slot;
+}
+
+static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
+{
+	BatchSort		   *node = castNode(BatchSort, pstate->plan);
+	BatchSortState	   *state = castNode(BatchSortState, pstate);
+	PlanState		   *outerNode = outerPlanState(pstate);
+	TupleTableSlot	   *slot;
+	ListCell		   *lc;
+	ParallelBatchSort  *parallel = state->parallel;
+	SortCoordinateData	coord;
+	FunctionCallInfo	fcinfo;
+	uint32				hash;
+	int					i;
+	AttrNumber			maxAttr;
+	Assert(state->sort_Done == false);
+	Assert(list_length(state->groupFuns) == node->numGroupCols);
+
+	if (parallel)
+	{
+		if (BarrierAttach(&parallel->barrier) >= BUILD_BATCH_DONE)
+			goto build_already_done_;
+		pg_atomic_add_fetch_u32(&parallel->attached, 1);
+	}
+
+	for (i=node->numBatches;i>0;)
+	{
+		--i;
+		if (parallel)
+		{
+			coord.isWorker = true;
+			coord.nParticipants = -1;
+			coord.sharedsort = PARALLEL_BATCH_SORT_SHARED(parallel, i);
+		}
+		state->batches[i] = tuplesort_begin_heap(ExecGetResultType(outerNode),
+												 node->sort.numCols,
+												 node->sort.sortColIdx,
+												 node->sort.sortOperators,
+												 node->sort.collations,
+												 node->sort.nullsFirst,
+												 work_mem / node->numBatches,
+												 parallel ? &coord : NULL,
+												 false);
+	}
+
+	maxAttr = 0;
+	for (i=node->numGroupCols;i>0;)
+	{
+		if (maxAttr < node->grpColIdx[--i])
+			maxAttr = node->grpColIdx[i];
+	}
+	for (i=node->sort.numCols;i>0;)
+	{
+		if (maxAttr < node->sort.sortColIdx[--i])
+			maxAttr = node->sort.sortColIdx[i];
+	}
+	Assert(maxAttr > 0);
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+		slot = ExecProcNode(outerNode);
+		if (TupIsNull(slot))
+			break;
+		slot_getsomeattrs(slot, maxAttr);
+
+		hash = 0;
+		i = 0;
+		foreach(lc, state->groupFuns)
+		{
+			AttrNumber att = node->grpColIdx[i++]-1;
+			if (slot->tts_isnull[att] == false)
+			{
+				fcinfo = lfirst(lc);
+				fcinfo->args[0].value = slot->tts_values[att];
+				hash = hash_combine(hash, DatumGetUInt32(FunctionCallInvoke(fcinfo)));
+				Assert(fcinfo->isnull == false);
+			}
+		}
+
+		tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
+	}
+
+	for (i=node->numBatches;i>0;)
+		tuplesort_performsort(state->batches[--i]);
+build_already_done_:
+	if (parallel)
+	{
+		for (i=node->numBatches;i>0;)
+		{
+			--i;
+			if (state->batches[i])
+			{
+				tuplesort_end(state->batches[i]);
+				state->batches[i] = NULL;
+			}
+		}
+		if (BarrierPhase(&parallel->barrier) < BUILD_BATCH_DONE)
+			BarrierArriveAndWait(&parallel->barrier, WAIT_EVENT_BATCH_SORT_BUILD);
+		BarrierDetach(&parallel->barrier);
+
+		if (ExecNextParallelBatchSort(state))
+			ExecSetExecProcNode(pstate, ExecBatchSort);
+		else
+			ExecSetExecProcNode(pstate, ExecEmptyBatchSort);
+	}else
+	{
+		state->curBatch = 0;
+		ExecSetExecProcNode(pstate, ExecBatchSort);
+	}
+	state->sort_Done = true;
+
+	return (*pstate->ExecProcNodeReal)(pstate);
+}
+
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+	BatchSortState *state;
+	TypeCacheEntry *typentry;
+	TupleDesc		desc;
+	int				i;
+
+	if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+	{
+		/* for now, we only using in group aggregate */
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("not support execute flag(s) %d for group sort", eflags)));
+	}
+
+	state = makeNode(BatchSortState);
+	state->ps.plan = (Plan*) node;
+	state->ps.state = estate;
+	state->ps.ExecProcNode = ExecBatchSortPrepare;
+
+	state->sort_Done = false;
+	state->batches = palloc0(node->numBatches * sizeof(Tuplesortstate*));
+
+	outerPlanState(state) = ExecInitNode(outerPlan(node), estate, eflags);
+
+	/*
+	 * Initialize return slot and type. No need to initialize projection info
+	 * because this node doesn't do projections.
+	 */
+	ExecInitResultTupleSlotTL(&state->ps, &TTSOpsMinimalTuple);
+	state->ps.ps_ProjInfo = NULL;
+
+	Assert(node->numGroupCols > 0);
+	desc = ExecGetResultType(outerPlanState(state));
+	for (i=0;i<node->numGroupCols;++i)
+	{
+		FmgrInfo			   *flinfo;
+		FunctionCallInfo		fcinfo;
+		Form_pg_attribute		attr = TupleDescAttr(desc, node->grpColIdx[i]-1);
+		typentry = lookup_type_cache(attr->atttypid, TYPECACHE_HASH_PROC);
+		if (!OidIsValid(typentry->hash_proc))
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_FUNCTION),
+					 errmsg("could not identify an extended hash function for type %s",
+							format_type_be(attr->atttypid))));
+		flinfo = palloc0(sizeof(*flinfo));
+		fcinfo = palloc0(SizeForFunctionCallInfo(1));
+		fmgr_info(typentry->hash_proc, flinfo);
+		InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+		fcinfo->args[0].isnull = false;
+		state->groupFuns = lappend(state->groupFuns, fcinfo);
+	}
+
+	return state;
+}
+
+static void CleanBatchSort(BatchSortState *node)
+{
+	int i;
+
+	ExecClearTuple(node->ps.ps_ResultTupleSlot);
+	if (node->sort_Done)
+	{
+		for (i=castNode(BatchSort, node->ps.plan)->numBatches;i>0;)
+		{
+			if (node->batches[--i] != NULL)
+			{
+				tuplesort_end(node->batches[i]);
+				node->batches[i] = NULL;
+			}
+		}
+		node->sort_Done = false;
+	}
+}
+
+void ExecEndBatchSort(BatchSortState *node)
+{
+	ExecClearTuple(node->ps.ps_ResultTupleSlot);
+	CleanBatchSort(node);
+	ExecEndNode(outerPlanState(node));
+}
+
+void ExecReScanBatchSort(BatchSortState *node)
+{
+	CleanBatchSort(node);
+	if (outerPlanState(node)->chgParam != NULL)
+		ExecReScan(outerPlanState(node));
+	ExecSetExecProcNode(&node->ps, ExecBatchSortPrepare);
+}
+
+void ExecShutdownBatchSort(BatchSortState *node)
+{
+	CleanBatchSort(node);
+}
+
+void ExecBatchSortEstimate(BatchSortState *node, ParallelContext *pcxt)
+{
+	Size size = mul_size(MAXALIGN(tuplesort_estimate_shared(pcxt->nworkers+1)),
+						 castNode(BatchSort, node->ps.plan)->numBatches);
+	size = add_size(size, PARALLEL_BATCH_SORT_SIZE);
+
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+static void InitializeBatchSortParallel(ParallelBatchSort *parallel,
+										int num_batches,
+										int num_workers,
+										dsm_segment *seg)
+{
+	int i;
+	BarrierInit(&parallel->barrier, 0);
+	pg_atomic_init_u32(&parallel->attached, 0);
+	pg_atomic_init_u32(&parallel->cur_batch, 0);
+	for (i=0;i<num_batches;++i)
+	{
+		tuplesort_initialize_shared(PARALLEL_BATCH_SORT_SHARED(parallel, i),
+									num_workers,
+									seg);
+	}
+}
+
+void ExecBatchSortInitializeDSM(BatchSortState *node, ParallelContext *pcxt)
+{
+	ParallelBatchSort *parallel;
+	BatchSort *plan = castNode(BatchSort, node->ps.plan);
+	Size tuplesort_size = MAXALIGN(tuplesort_estimate_shared(pcxt->nworkers+1));
+	Size size = mul_size(tuplesort_size, plan->numBatches);
+	size = add_size(PARALLEL_BATCH_SORT_SIZE, size);
+
+	node->parallel = parallel = shm_toc_allocate(pcxt->toc, size);
+	parallel->tuplesort_size = tuplesort_size;
+	InitializeBatchSortParallel(parallel, plan->numBatches, pcxt->nworkers+1, pcxt->seg);
+	shm_toc_insert(pcxt->toc, plan->sort.plan.plan_node_id, parallel);
+}
+
+void ExecBatchSortReInitializeDSM(BatchSortState *node, ParallelContext *pcxt)
+{
+	InitializeBatchSortParallel(node->parallel,
+								castNode(BatchSort, node->ps.plan)->numBatches,
+								pcxt->nworkers+1,
+								pcxt->seg);
+	ExecSetExecProcNode(&node->ps, ExecBatchSortPrepare);
+}
+
+void ExecBatchSortInitializeWorker(BatchSortState *node, ParallelWorkerContext *pwcxt)
+{
+	uint32 i;
+	BatchSort *plan = castNode(BatchSort, node->ps.plan);
+	ParallelBatchSort *parallel = shm_toc_lookup(pwcxt->toc,
+												 plan->sort.plan.plan_node_id,
+												 false);
+	node->parallel = parallel;
+	for (i=0;i<plan->numBatches;++i)
+	{
+		tuplesort_attach_shared(PARALLEL_BATCH_SORT_SHARED(parallel, i),
+								pwcxt->seg);
+	}
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 0409a40b82..958964f1fa 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -961,6 +961,22 @@ _copySort(const Sort *from)
 	return newnode;
 }
 
+/*
+ * _copyBatchSort
+ */
+static BatchSort *
+_copyBatchSort(const BatchSort *from)
+{
+	BatchSort	   *newnode = makeNode(BatchSort);
+
+	CopySortFields(&from->sort, &newnode->sort);
+
+	COPY_SCALAR_FIELD(numGroupCols);
+	COPY_SCALAR_FIELD(numBatches);
+	COPY_POINTER_FIELD(grpColIdx, from->numGroupCols * sizeof(AttrNumber));
+
+	return newnode;
+}
 
 /*
  * _copyIncrementalSort
@@ -4939,6 +4955,9 @@ copyObjectImpl(const void *from)
 		case T_Sort:
 			retval = _copySort(from);
 			break;
+		case T_BatchSort:
+			retval = _copyBatchSort(from);
+			break;
 		case T_IncrementalSort:
 			retval = _copyIncrementalSort(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f0386480ab..a8dd7ef23f 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -856,6 +856,18 @@ _outSort(StringInfo str, const Sort *node)
 	_outSortInfo(str, node);
 }
 
+static void
+_outBatchSort(StringInfo str, const BatchSort *node)
+{
+	WRITE_NODE_TYPE("BATCHSORT");
+
+	_outSortInfo(str, &node->sort);
+
+	WRITE_INT_FIELD(numGroupCols);
+	WRITE_INT_FIELD(numBatches);
+	WRITE_ATTRNUMBER_ARRAY(grpColIdx, node->numGroupCols);
+}
+
 static void
 _outIncrementalSort(StringInfo str, const IncrementalSort *node)
 {
@@ -3813,6 +3825,9 @@ outNode(StringInfo str, const void *obj)
 			case T_Sort:
 				_outSort(str, obj);
 				break;
+			case T_BatchSort:
+				_outBatchSort(str, obj);
+				break;
 			case T_IncrementalSort:
 				_outIncrementalSort(str, obj);
 				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 42050ab719..2c6eb4362c 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2181,6 +2181,20 @@ _readSort(void)
 	READ_DONE();
 }
 
+static BatchSort *
+_readBatchSort(void)
+{
+	READ_LOCALS(BatchSort);
+
+	ReadCommonSort(&local_node->sort);
+
+	READ_INT_FIELD(numGroupCols);
+	READ_INT_FIELD(numBatches);
+	READ_ATTRNUMBER_ARRAY(grpColIdx, local_node->numGroupCols);
+
+	READ_DONE();
+}
+
 /*
  * _readIncrementalSort
  */
@@ -2834,6 +2848,8 @@ parseNodeString(void)
 		return_value = _readMaterial();
 	else if (MATCH("SORT", 4))
 		return_value = _readSort();
+	else if (MATCH("BATCHSORT", 9))
+		return_value = _readBatchSort();
 	else if (MATCH("INCREMENTALSORT", 15))
 		return_value = _readIncrementalSort();
 	else if (MATCH("GROUP", 5))
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index cd3716d494..32d0dc8ce5 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -140,6 +140,7 @@ bool		enable_partitionwise_aggregate = false;
 bool		enable_parallel_append = true;
 bool		enable_parallel_hash = true;
 bool		enable_partition_pruning = true;
+bool		enable_batch_sort = true;
 
 typedef struct
 {
@@ -1948,6 +1949,87 @@ cost_sort(Path *path, PlannerInfo *root,
 	path->total_cost = startup_cost + run_cost;
 }
 
+void cost_batchsort(Path *path, PlannerInfo *root,
+					List *batchkeys, Cost input_cost,
+					double tuples, int width,
+					Cost comparison_cost, int sort_mem,
+					uint32 numGroupCols, uint32 numBatches)
+{
+	Cost		startup_cost = input_cost;
+	Cost		run_cost = 0;
+	double		input_bytes = relation_byte_size(tuples, width);
+	double		batch_bytes = input_bytes / numBatches;
+	double		batch_tuples = tuples / numBatches;
+	long		sort_mem_bytes = sort_mem * 1024L;
+
+	if (sort_mem_bytes < (64*1024))
+		sort_mem_bytes = (64*1024);
+
+	if (!enable_batch_sort)
+		startup_cost += disable_cost;
+
+	/* hash cost */
+	startup_cost += cpu_operator_cost * numGroupCols * tuples;
+
+	path->rows = tuples;
+
+	/*
+	 * We want to be sure the cost of a sort is never estimated as zero, even
+	 * if passed-in tuple count is zero.  Besides, mustn't do log(0)...
+	 */
+	if (tuples < 2.0)
+		tuples = 2.0;
+
+	if (batch_bytes > sort_mem_bytes)
+	{
+		/*
+		 * We'll have to use a disk-based sort of all the tuples
+		 */
+		double		npages = ceil(batch_bytes / BLCKSZ);
+		double		nruns = batch_bytes / sort_mem_bytes;
+		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
+		double		log_runs;
+		double		npageaccesses;
+
+		/*
+		 * CPU costs
+		 *
+		 * Assume about N log2 N comparisons
+		 */
+		startup_cost += comparison_cost * batch_tuples * LOG2(batch_tuples) * numBatches;
+
+		/* Disk costs */
+
+		/* Compute logM(r) as log(r) / log(M) */
+		if (nruns > mergeorder)
+			log_runs = ceil(log(nruns) / log(mergeorder));
+		else
+			log_runs = 1.0;
+		npageaccesses = 2.0 * npages * log_runs;
+		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
+		startup_cost += npageaccesses * numBatches *
+			(seq_page_cost * 0.75 + random_page_cost * 0.25);
+
+	}else
+	{
+		/* We'll use plain quicksort on all the input tuples */
+		startup_cost += comparison_cost * tuples * LOG2(tuples);
+	}
+
+	/*
+	 * Also charge a small amount (arbitrarily set equal to operator cost) per
+	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
+	 * doesn't do qual-checking or projection, so it has less overhead than
+	 * most plan nodes.  Note it's correct to use tuples not output_tuples
+	 * here --- the upper LIMIT will pro-rate the run cost so we'd be double
+	 * counting the LIMIT otherwise.
+	 */
+	run_cost += cpu_operator_cost * tuples;
+
+	path->startup_cost = startup_cost;
+	path->total_cost = startup_cost + run_cost;
+}
+
 /*
  * append_nonpartial_cost
  *	  Estimate the cost of the non-partial paths in a Parallel Append.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 3d7a4e373f..85969388c2 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -98,6 +98,7 @@ static Plan *create_projection_plan(PlannerInfo *root,
 									int flags);
 static Plan *inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe);
 static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags);
+static BatchSort *create_batchsort_plan(PlannerInfo *root, BatchSortPath *best_path, int flags);
 static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root,
 													IncrementalSortPath *best_path, int flags);
 static Group *create_group_plan(PlannerInfo *root, GroupPath *best_path);
@@ -468,6 +469,11 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 											 (SortPath *) best_path,
 											 flags);
 			break;
+		case T_BatchSort:
+			plan = (Plan *) create_batchsort_plan(root,
+												  (BatchSortPath*) best_path,
+												  flags);
+			break;
 		case T_IncrementalSort:
 			plan = (Plan *) create_incrementalsort_plan(root,
 														(IncrementalSortPath *) best_path,
@@ -2009,6 +2015,39 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	return plan;
 }
 
+static BatchSort *create_batchsort_plan(PlannerInfo *root, BatchSortPath *best_path, int flags)
+{
+	BatchSort	   *plan;
+	Plan		   *subplan;
+
+	subplan = create_plan_recurse(root, best_path->subpath,
+								  flags | CP_SMALL_TLIST);
+
+	plan = makeNode(BatchSort);
+	subplan = prepare_sort_from_pathkeys(subplan,
+										 best_path->batchkeys,
+										 IS_OTHER_REL(best_path->subpath->parent) ?
+										     best_path->path.parent->relids : NULL,
+										 NULL,
+										 false,
+										 &plan->sort.numCols,
+										 &plan->sort.sortColIdx,
+										 &plan->sort.sortOperators,
+										 &plan->sort.collations,
+										 &plan->sort.nullsFirst);
+	plan->sort.plan.targetlist = subplan->targetlist;
+	plan->sort.plan.qual = NIL;
+	outerPlan(plan) = subplan;
+	innerPlan(plan) = NULL;
+	plan->numBatches = best_path->numBatches;
+	plan->numGroupCols = list_length(best_path->batchgroup);
+	plan->grpColIdx = extract_grouping_cols(best_path->batchgroup,
+											subplan->targetlist);
+
+	copy_generic_path_info(&plan->sort.plan, &best_path->path);
+	return plan;
+}
+
 /*
  * create_incrementalsort_plan
  *
@@ -2085,6 +2124,12 @@ create_upper_unique_plan(PlannerInfo *root, UpperUniquePath *best_path, int flag
 {
 	Unique	   *plan;
 	Plan	   *subplan;
+	List	   *pathkeys;
+
+	if (IsA(best_path->subpath, BatchSortPath))
+		pathkeys = ((BatchSortPath*)best_path->subpath)->batchkeys;
+	else
+		pathkeys = best_path->path.pathkeys;
 
 	/*
 	 * Unique doesn't project, so tlist requirements pass through; moreover we
@@ -2094,7 +2139,7 @@ create_upper_unique_plan(PlannerInfo *root, UpperUniquePath *best_path, int flag
 								  flags | CP_LABEL_TLIST);
 
 	plan = make_unique_from_pathkeys(subplan,
-									 best_path->path.pathkeys,
+									 pathkeys,
 									 best_path->numkeys);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index f331f82a6c..ac7c2a52be 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -4806,6 +4806,8 @@ create_distinct_paths(PlannerInfo *root,
 											  cheapest_input_path->rows,
 											  NULL);
 	}
+	distinct_rel->rows = numDistinctRows;
+	distinct_rel->reltarget = root->upper_targets[UPPERREL_DISTINCT];
 
 	/*
 	 * Consider sort-based implementations of DISTINCT, if possible.
@@ -4825,6 +4827,7 @@ create_distinct_paths(PlannerInfo *root,
 		 * the other.)
 		 */
 		List	   *needed_pathkeys;
+		List	   *hashable_clause;
 
 		if (parse->hasDistinctOn &&
 			list_length(root->distinct_pathkeys) <
@@ -4871,6 +4874,44 @@ create_distinct_paths(PlannerInfo *root,
 										  path,
 										  list_length(root->distinct_pathkeys),
 										  numDistinctRows));
+
+		/* add parallel unique */
+		if (distinct_rel->consider_parallel &&
+			input_rel->partial_pathlist != NIL &&
+			numDistinctRows >= BATCH_SORT_MIN_BATCHES &&
+			(hashable_clause = grouping_get_hashable(parse->distinctClause)) != NIL)
+		{
+			double	numPartialDistinctRows;
+			uint32	num_batchs = (uint32)numDistinctRows;
+			if (num_batchs > BATCH_SORT_MAX_BATCHES)
+			{
+				/*
+				 * too many batchs(files) it is not a good idea,
+				 * limit to BATCH_SORT_MAX_BATCHES
+				 */
+				num_batchs = BATCH_SORT_MAX_BATCHES;
+			}
+
+			foreach (lc, input_rel->partial_pathlist)
+			{
+				Path *path = (Path*)create_batchsort_path(root,
+														  distinct_rel,
+														  lfirst(lc),
+														  needed_pathkeys,
+														  hashable_clause,
+														  num_batchs,
+														  true);
+				numPartialDistinctRows = numDistinctRows / path->parallel_workers;
+				if (numPartialDistinctRows < 1.0)
+					numPartialDistinctRows = 1.0;
+				path = (Path*)create_upper_unique_path(root,
+													   distinct_rel,
+													   path,
+													   list_length(root->distinct_pathkeys),
+													   numPartialDistinctRows);
+				add_partial_path(distinct_rel, path);
+			}
+		}
 	}
 
 	/*
@@ -4908,6 +4949,8 @@ create_distinct_paths(PlannerInfo *root,
 								 numDistinctRows));
 	}
 
+	generate_useful_gather_paths(root, distinct_rel, false);
+
 	/* Give a helpful error if we failed to find any implementation */
 	if (distinct_rel->pathlist == NIL)
 		ereport(ERROR,
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index dd8e2e966d..c39eed6b44 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -737,6 +737,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 
 		case T_Material:
 		case T_Sort:
+		case T_BatchSort:
 		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index fcce81926b..cfe2557988 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2752,6 +2752,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_BatchSort:
 		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 745f443e5c..fa1053f077 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -67,7 +67,7 @@ static List *plan_union_children(PlannerInfo *root,
 								 List *refnames_tlist,
 								 List **tlist_list);
 static Path *make_union_unique(SetOperationStmt *op, Path *path, List *tlist,
-							   PlannerInfo *root);
+							   PlannerInfo *root, List *groupList, List *sortKeys);
 static void postprocess_setop_rel(PlannerInfo *root, RelOptInfo *rel);
 static bool choose_hashed_setop(PlannerInfo *root, List *groupClauses,
 								Path *input_path,
@@ -354,6 +354,7 @@ recurse_set_operations(Node *setOp, PlannerInfo *root,
 			rel = generate_nonunion_paths(op, root,
 										  refnames_tlist,
 										  pTargetList);
+		generate_useful_gather_paths(root, rel, false);
 		if (pNumGroups)
 			*pNumGroups = rel->rows;
 
@@ -552,6 +553,8 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 	List	   *tlist_list;
 	List	   *tlist;
 	Path	   *path;
+	List	   *groupList = NIL;
+	List	   *sortKeys = NIL;
 
 	/*
 	 * If plain UNION, tell children to fetch all tuples.
@@ -587,6 +590,14 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 
 	*pTargetList = tlist;
 
+	if (!op->all)
+	{
+		/* Identify the grouping semantics */
+		groupList = generate_setop_grouplist(op, tlist);
+		if (grouping_is_sortable(groupList))
+			sortKeys = make_pathkeys_for_sortclauses(root, groupList, tlist);
+	}
+
 	/* Build path lists and relid set. */
 	foreach(lc, rellist)
 	{
@@ -627,7 +638,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 	 * node(s) to remove duplicates.
 	 */
 	if (!op->all)
-		path = make_union_unique(op, path, tlist, root);
+		path = make_union_unique(op, path, tlist, root, groupList, sortKeys);
 
 	add_path(result_rel, path);
 
@@ -646,6 +657,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 	{
 		Path	   *ppath;
 		ListCell   *lc;
+		List	   *hashable_list;
 		int			parallel_workers = 0;
 
 		/* Find the highest number of workers requested for any subpath. */
@@ -678,11 +690,35 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 							   NIL, NULL,
 							   parallel_workers, enable_parallel_append,
 							   NIL, -1);
+		if (!op->all &&
+			sortKeys != NIL &&
+			ppath->rows >= BATCH_SORT_MIN_BATCHES &&
+			(hashable_list = grouping_get_hashable(groupList)) != NIL)
+		{
+			Path   *partial_path;
+			uint32	numBatches = ppath->rows;
+			if (numBatches > BATCH_SORT_MAX_BATCHES)
+				numBatches = BATCH_SORT_MAX_BATCHES;
+			Assert(list_length(sortKeys) >= list_length(hashable_list));
+			partial_path = (Path*)create_batchsort_path(root,
+														result_rel,
+														ppath,
+														sortKeys,
+														hashable_list,
+														numBatches,
+														true);
+			partial_path = (Path*) create_upper_unique_path(root,
+															result_rel,
+															partial_path,
+															list_length(sortKeys),
+															partial_path->rows);
+			add_partial_path(result_rel, partial_path);
+		}
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 							   result_rel->reltarget, NULL, NULL);
 		if (!op->all)
-			ppath = make_union_unique(op, ppath, tlist, root);
+			ppath = make_union_unique(op, ppath, tlist, root, groupList, sortKeys);
 		add_path(result_rel, ppath);
 	}
 
@@ -933,15 +969,11 @@ plan_union_children(PlannerInfo *root,
  */
 static Path *
 make_union_unique(SetOperationStmt *op, Path *path, List *tlist,
-				  PlannerInfo *root)
+				  PlannerInfo *root, List *groupList, List *sortKeys)
 {
 	RelOptInfo *result_rel = fetch_upper_rel(root, UPPERREL_SETOP, NULL);
-	List	   *groupList;
 	double		dNumGroups;
 
-	/* Identify the grouping semantics */
-	groupList = generate_setop_grouplist(op, tlist);
-
 	/*
 	 * XXX for the moment, take the number of distinct groups as equal to the
 	 * total input size, ie, the worst case.  This is too conservative, but
@@ -976,9 +1008,7 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist,
 				create_sort_path(root,
 								 result_rel,
 								 path,
-								 make_pathkeys_for_sortclauses(root,
-															   groupList,
-															   tlist),
+								 sortKeys,
 								 -1.0);
 		path = (Path *) create_upper_unique_path(root,
 												 result_rel,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index c1fc866cbf..460d2e5faa 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -2880,6 +2880,44 @@ create_sort_path(PlannerInfo *root,
 	return pathnode;
 }
 
+BatchSortPath *
+create_batchsort_path(PlannerInfo *root,
+					  RelOptInfo *rel,
+					  Path *subpath,
+					  List *pathkeys,
+					  List *groupClause,
+					  uint32 numBatches,
+					  bool parallel_sort)
+{
+	BatchSortPath   *pathnode = makeNode(BatchSortPath);
+
+	pathnode->path.pathtype = T_BatchSort;
+	pathnode->path.parent = rel;
+	/* Sort doesn't project, so use source path's pathtarget */
+	pathnode->path.pathtarget = subpath->pathtarget;
+	/* For now, assume we are above any joins, so no parameterization */
+	pathnode->path.param_info = NULL;
+	pathnode->path.parallel_aware = parallel_sort;
+	pathnode->path.parallel_safe = rel->consider_parallel &&
+		subpath->parallel_safe;
+	pathnode->path.parallel_workers = subpath->parallel_workers;
+	pathnode->batchkeys = pathkeys;
+	pathnode->batchgroup = groupClause;
+	pathnode->numBatches = numBatches;
+
+	pathnode->subpath = subpath;
+
+	cost_batchsort(&pathnode->path, root, pathkeys,
+				   subpath->total_cost, subpath->rows,
+				   subpath->pathtarget->width,
+				   0.0,				/* XXX comparison_cost shouldn't be 0? */
+				   work_mem/numBatches,
+				   list_length(groupClause),
+				   numBatches);
+
+	return pathnode;
+}
+
 /*
  * create_group_path
  *	  Creates a pathnode that represents performing grouping of presorted input
diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c
index 02a3c6b165..949568c672 100644
--- a/src/backend/optimizer/util/tlist.c
+++ b/src/backend/optimizer/util/tlist.c
@@ -593,6 +593,22 @@ grouping_is_hashable(List *groupClause)
 	return true;
 }
 
+List *
+grouping_get_hashable(List *groupClause)
+{
+	ListCell   *lc;
+	List	   *result = NIL;
+
+	foreach (lc, groupClause)
+	{
+		SortGroupClause *groupcl = lfirst_node(SortGroupClause, lc);
+
+		if (groupcl->hashable)
+			result = lappend(result, groupcl);
+	}
+
+	return result;
+}
 
 /*****************************************************************************
  *		PathTarget manipulation functions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0ebc62..cacb7d13e6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4021,6 +4021,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_XACT_GROUP_UPDATE:
 			event_name = "XactGroupUpdate";
 			break;
+		case WAIT_EVENT_BATCH_SORT_BUILD:
+			event_name = "Batch/Sort/Building";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 596bcb7b84..43a4e36d78 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -987,6 +987,15 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_batch_sort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("enable batch sort method"),
+			NULL
+		},
+		&enable_batch_sort,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_incremental_sort", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of incremental sort steps."),
diff --git a/src/include/executor/nodeBatchSort.h b/src/include/executor/nodeBatchSort.h
new file mode 100644
index 0000000000..66c68e0125
--- /dev/null
+++ b/src/include/executor/nodeBatchSort.h
@@ -0,0 +1,19 @@
+
+#ifndef NODE_BATCH_SORT_H
+#define NODE_BATCH_SORT_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern BatchSortState *ExecInitBatchSort(BatchSort *node, EState *estate, int eflags);
+extern void ExecEndBatchSort(BatchSortState *node);
+extern void ExecReScanBatchSort(BatchSortState *node);
+
+/* parallel scan support */
+extern void ExecBatchSortEstimate(BatchSortState *node, ParallelContext *pcxt);
+extern void ExecBatchSortInitializeDSM(BatchSortState *node, ParallelContext *pcxt);
+extern void ExecBatchSortReInitializeDSM(BatchSortState *node, ParallelContext *pcxt);
+extern void ExecBatchSortInitializeWorker(BatchSortState *node, ParallelWorkerContext *pwcxt);
+extern void ExecShutdownBatchSort(BatchSortState *node);
+
+#endif							/* NODE_BATCH_SORT_H */
\ No newline at end of file
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ef448d67c7..14dde9fca3 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2012,6 +2012,21 @@ typedef struct SortState
 	SharedSortInfo *shared_info;	/* one entry per worker */
 } SortState;
 
+/* ----------------
+ *	 BatchSortState information
+ * ----------------
+ */
+typedef struct BatchSortState
+{
+	PlanState	ps;				/* its first field is NodeTag */
+	void	  **batches;		/* private state of tuplesort.c */
+	List	   *groupFuns;		/* hash function call info for each group-key */
+	struct ParallelBatchSort
+			   *parallel;		/* parallel info, private in nodeBatchSort.c */
+	int			curBatch;		/* current batch index */
+	bool		sort_Done;		/* sort completed yet? */
+}BatchSortState;
+
 /* ----------------
  *	 Instrumentation information for IncrementalSort
  * ----------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 7ddd8c011b..ace4c98939 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -74,6 +74,7 @@ typedef enum NodeTag
 	T_HashJoin,
 	T_Material,
 	T_Sort,
+	T_BatchSort,
 	T_IncrementalSort,
 	T_Group,
 	T_Agg,
@@ -131,6 +132,7 @@ typedef enum NodeTag
 	T_HashJoinState,
 	T_MaterialState,
 	T_SortState,
+	T_BatchSortState,
 	T_IncrementalSortState,
 	T_GroupState,
 	T_AggState,
@@ -246,6 +248,7 @@ typedef enum NodeTag
 	T_ProjectionPath,
 	T_ProjectSetPath,
 	T_SortPath,
+	T_BatchSortPath,
 	T_IncrementalSortPath,
 	T_GroupPath,
 	T_UpperUniquePath,
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index dbe86e7af6..273bdda452 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -1649,6 +1649,16 @@ typedef struct SortPath
 	Path	   *subpath;		/* path representing input source */
 } SortPath;
 
+typedef struct BatchSortPath
+{
+	Path		path;
+	Path	   *subpath;		/* path representing input source */
+	List	   *batchkeys;		/* our result is not all ordered, only for each batch,
+								 * so we can not use Path::pathkeys */
+	List	   *batchgroup;		/* a list of SortGroupClause for hash */
+	uint32		numBatches;
+}BatchSortPath;
+
 /*
  * IncrementalSortPath
  */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 83e01074ed..f7ad7881dc 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -774,6 +774,18 @@ typedef struct Sort
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
 } Sort;
 
+/* ----------------
+ *		batch sort node
+ * ----------------
+ */
+typedef struct BatchSort
+{
+	Sort		sort;
+	int			numGroupCols;	/* number of group-key columns */
+	int			numBatches;		/* number of group */
+	AttrNumber *grpColIdx;		/* their indexes in the target list */
+}BatchSort;
+
 /* ----------------
  *		incremental sort node
  * ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 6141654e47..37e6a12a6f 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -53,6 +53,7 @@ extern PGDLLIMPORT bool enable_indexonlyscan;
 extern PGDLLIMPORT bool enable_bitmapscan;
 extern PGDLLIMPORT bool enable_tidscan;
 extern PGDLLIMPORT bool enable_sort;
+extern PGDLLIMPORT bool enable_batch_sort;
 extern PGDLLIMPORT bool enable_incremental_sort;
 extern PGDLLIMPORT bool enable_hashagg;
 extern PGDLLIMPORT bool enable_nestloop;
@@ -102,6 +103,11 @@ extern void cost_sort(Path *path, PlannerInfo *root,
 					  List *pathkeys, Cost input_cost, double tuples, int width,
 					  Cost comparison_cost, int sort_mem,
 					  double limit_tuples);
+extern void cost_batchsort(Path *path, PlannerInfo *root,
+						   List *batchkeys, Cost input_cost,
+						   double tuples, int width,
+						   Cost comparison_cost, int sort_mem,
+						   uint32 numGroupCols, uint32 numBatchs);
 extern void cost_incremental_sort(Path *path,
 								  PlannerInfo *root, List *pathkeys, int presorted_keys,
 								  Cost input_startup_cost, Cost input_total_cost,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 715a24ad29..816fc37739 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -17,6 +17,8 @@
 #include "nodes/bitmapset.h"
 #include "nodes/pathnodes.h"
 
+#define BATCH_SORT_MIN_BATCHES		2
+#define BATCH_SORT_MAX_BATCHES		512
 
 /*
  * prototypes for pathnode.c
@@ -195,6 +197,13 @@ extern SortPath *create_sort_path(PlannerInfo *root,
 								  Path *subpath,
 								  List *pathkeys,
 								  double limit_tuples);
+extern BatchSortPath *create_batchsort_path(PlannerInfo *root,
+											RelOptInfo *rel,
+											Path *subpath,
+											List *pathkeys,
+											List *groupClause,
+											uint32 numBatches,
+											bool parallel_sort);
 extern GroupPath *create_group_path(PlannerInfo *root,
 									RelOptInfo *rel,
 									Path *subpath,
diff --git a/src/include/optimizer/tlist.h b/src/include/optimizer/tlist.h
index 1d4c7da545..9372cebeba 100644
--- a/src/include/optimizer/tlist.h
+++ b/src/include/optimizer/tlist.h
@@ -36,6 +36,7 @@ extern Oid *extract_grouping_collations(List *groupClause, List *tlist);
 extern AttrNumber *extract_grouping_cols(List *groupClause, List *tlist);
 extern bool grouping_is_sortable(List *groupClause);
 extern bool grouping_is_hashable(List *groupClause);
+extern List *grouping_get_hashable(List *groupClause);
 
 extern PathTarget *make_pathtarget_from_tlist(List *tlist);
 extern List *make_tlist_from_pathtarget(PathTarget *target);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4f15..f0b6dae97b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -952,7 +952,8 @@ typedef enum
 	WAIT_EVENT_REPLICATION_SLOT_DROP,
 	WAIT_EVENT_SAFE_SNAPSHOT,
 	WAIT_EVENT_SYNC_REP,
-	WAIT_EVENT_XACT_GROUP_UPDATE
+	WAIT_EVENT_XACT_GROUP_UPDATE,
+	WAIT_EVENT_BATCH_SORT_BUILD
 } WaitEventIPC;
 
 /* ----------
diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out
index 11c6f50fbf..c200e38d12 100644
--- a/src/test/regress/expected/select_distinct.out
+++ b/src/test/regress/expected/select_distinct.out
@@ -306,3 +306,45 @@ SELECT null IS NOT DISTINCT FROM null as "yes";
  t
 (1 row)
 
+-- parallel distinct
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size =0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (costs off)
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Unique
+               ->  Parallel BatchSort
+                     Sort Key: tenk1.unique2
+                     ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+ count 
+-------
+ 10000
+(1 row)
+
+explain (costs off)
+SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Unique
+         ->  Parallel BatchSort
+               Sort Key: tenk1.unique2
+               ->  Unique
+                     ->  Parallel BatchSort
+                           Sort Key: tenk1.unique2
+                           ->  Parallel Seq Scan on tenk1
+(9 rows)
+
+ABORT;
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 81bdacf59d..8ed047e520 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal;
 select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
+ enable_batch_sort              | off
  enable_bitmapscan              | on
  enable_gathermerge             | on
  enable_hashagg                 | on
@@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(18 rows)
+(19 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 6e72e92d80..5a2be9aec9 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1052,3 +1052,58 @@ where (x = 0) or (q1 >= q2 and q1 <= q2);
  4567890123456789 |  4567890123456789 | 1
 (6 rows)
 
+-- parallel union
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size =0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (costs off)
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Unique
+               ->  Parallel BatchSort
+                     Sort Key: tenk1.unique2
+                     ->  Parallel Append
+                           ->  Parallel Seq Scan on tenk1
+                           ->  Parallel Seq Scan on tenk1 tenk1_1
+(9 rows)
+
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
+ count 
+-------
+ 10000
+(1 row)
+
+EXPLAIN (costs off)
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
+                                  QUERY PLAN                                  
+------------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (tenk1_1.unique2 = tenk1.unique2)
+                     ->  Unique
+                           ->  Parallel BatchSort
+                                 Sort Key: tenk1_1.unique2
+                                 ->  Parallel Append
+                                       ->  Parallel Seq Scan on tenk1 tenk1_1
+                                       ->  Parallel Seq Scan on tenk1 tenk1_2
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on tenk1
+(14 rows)
+
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
+ count 
+-------
+ 10000
+(1 row)
+
+ABORT;
diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql
index 33102744eb..3ff7acf64d 100644
--- a/src/test/regress/sql/select_distinct.sql
+++ b/src/test/regress/sql/select_distinct.sql
@@ -135,3 +135,17 @@ SELECT 1 IS NOT DISTINCT FROM 2 as "no";
 SELECT 2 IS NOT DISTINCT FROM 2 as "yes";
 SELECT 2 IS NOT DISTINCT FROM null as "no";
 SELECT null IS NOT DISTINCT FROM null as "yes";
+
+-- parallel distinct
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size =0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (costs off)
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+explain (costs off)
+SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+ABORT;
\ No newline at end of file
diff --git a/src/test/regress/sql/union.sql b/src/test/regress/sql/union.sql
index 5f4881d594..a1cb1bb7ac 100644
--- a/src/test/regress/sql/union.sql
+++ b/src/test/regress/sql/union.sql
@@ -440,3 +440,18 @@ select * from
    union all
    select *, 1 as x from int8_tbl b) ss
 where (x = 0) or (q1 >= q2 and q1 <= q2);
+
+-- parallel union
+BEGIN;
+SET enable_batch_sort = ON;
+SET min_parallel_table_scan_size =0;
+SET parallel_tuple_cost = 0;
+SET parallel_setup_cost = 0;
+SET enable_indexonlyscan = OFF;
+EXPLAIN (costs off)
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
+EXPLAIN (costs off)
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
+SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
+ABORT;
\ No newline at end of file
-- 
2.16.3

