diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 683d641fa7..1814f98b8e 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -1979,27 +1979,18 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
  119
 (10 rows)
 
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, pushed down, thanks to incremental sort on remote side
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
-                             QUERY PLAN                              
----------------------------------------------------------------------
+                                                                            QUERY PLAN                                                                             
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
  Limit
    Output: t1.c1, t2.c1
-   ->  Sort
+   ->  Foreign Scan
          Output: t1.c1, t2.c1
-         Sort Key: t1.c1, t2.c1
-         ->  Nested Loop
-               Output: t1.c1, t2.c1
-               ->  Foreign Scan on public.ft1 t1
-                     Output: t1.c1
-                     Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
-               ->  Materialize
-                     Output: t2.c1
-                     ->  Foreign Scan on public.ft2 t2
-                           Output: t2.c1
-                           Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
-(15 rows)
+         Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2)
+         Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) ORDER BY r1."C 1" ASC NULLS LAST, r2."C 1" ASC NULLS LAST
+(6 rows)
 
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
  c1 | c1  
@@ -2016,6 +2007,44 @@ SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 1
   1 | 110
 (10 rows)
 
+-- CROSS JOIN, not pushed down, because we don't push down LIMIT and remote side
+-- can't perform top-N sort like local side can.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Limit
+   Output: t1.c3, t2.c3
+   ->  Sort
+         Output: t1.c3, t2.c3
+         Sort Key: t1.c3, t2.c3
+         ->  Nested Loop
+               Output: t1.c3, t2.c3
+               ->  Foreign Scan on public.ft1 t1
+                     Output: t1.c3
+                     Remote SQL: SELECT c3 FROM "S 1"."T 1"
+               ->  Materialize
+                     Output: t2.c3
+                     ->  Foreign Scan on public.ft2 t2
+                           Output: t2.c3
+                           Remote SQL: SELECT c3 FROM "S 1"."T 1"
+(15 rows)
+
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+  c3   |  c3   
+-------+-------
+ 00001 | 00101
+ 00001 | 00102
+ 00001 | 00103
+ 00001 | 00104
+ 00001 | 00105
+ 00001 | 00106
+ 00001 | 00107
+ 00001 | 00108
+ 00001 | 00109
+ 00001 | 00110
+(10 rows)
+
 -- different server, not pushed down. No result expected.
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 3c3c5c705f..bbf697d64b 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -508,10 +508,15 @@ SELECT t1.c1 FROM ft1 t1 WHERE EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c1)
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, pushed down, thanks to incremental sort on remote side
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+-- CROSS JOIN, not pushed down, because we don't push down LIMIT and remote side
+-- can't perform top-N sort like local side can.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
 -- different server, not pushed down. No result expected.
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e4a01699e4..f80d396cfc 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3553,6 +3553,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-incrementalsort" xreflabel="enable_incrementalsort">
+      <term><varname>enable_incrementalsort</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_incrementalsort</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of incremental sort
+        steps. The default is <literal>on</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-indexscan" xreflabel="enable_indexscan">
       <term><varname>enable_indexscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 79e6985d0d..6cf5f8bad1 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -80,6 +80,8 @@ static void show_upper_qual(List *qual, const char *qlabel,
 				ExplainState *es);
 static void show_sort_keys(SortState *sortstate, List *ancestors,
 			   ExplainState *es);
+static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+					   List *ancestors, ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
@@ -93,7 +95,7 @@ static void show_grouping_set_keys(PlanState *planstate,
 static void show_group_keys(GroupState *gstate, List *ancestors,
 				ExplainState *es);
 static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es);
 static void show_sortorder_options(StringInfo buf, Node *sortexpr,
@@ -101,6 +103,8 @@ static void show_sortorder_options(StringInfo buf, Node *sortexpr,
 static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
 				 List *ancestors, ExplainState *es);
 static void show_sort_info(SortState *sortstate, ExplainState *es);
+static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
+									   ExplainState *es);
 static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 					ExplainState *es);
@@ -1011,6 +1015,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Sort:
 			pname = sname = "Sort";
 			break;
+		case T_IncrementalSort:
+			pname = sname = "Incremental Sort";
+			break;
 		case T_Group:
 			pname = sname = "Group";
 			break;
@@ -1611,6 +1618,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
 			show_sort_info(castNode(SortState, planstate), es);
 			break;
+		case T_IncrementalSort:
+			show_incremental_sort_keys(castNode(IncrementalSortState, planstate),
+									   ancestors, es);
+			show_incremental_sort_info(castNode(IncrementalSortState, planstate),
+									   es);
+			break;
 		case T_MergeAppend:
 			show_merge_append_keys(castNode(MergeAppendState, planstate),
 								   ancestors, es);
@@ -1936,14 +1949,37 @@ static void
 show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
 {
 	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
+	int			skipCols;
+
+	if (IsA(plan, IncrementalSort))
+		skipCols = ((IncrementalSort *) plan)->skipCols;
+	else
+		skipCols = 0;
 
 	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, skipCols, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
 }
 
+/*
+ * Show the sort keys for a IncrementalSort node.
+ */
+static void
+show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+						   List *ancestors, ExplainState *es)
+{
+	IncrementalSort *plan = (IncrementalSort *) incrsortstate->ss.ps.plan;
+
+	show_sort_group_keys((PlanState *) incrsortstate, "Sort Key",
+						 plan->sort.numCols, plan->skipCols,
+						 plan->sort.sortColIdx,
+						 plan->sort.sortOperators, plan->sort.collations,
+						 plan->sort.nullsFirst,
+						 ancestors, es);
+}
+
 /*
  * Likewise, for a MergeAppend node.
  */
@@ -1954,7 +1990,7 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
 
 	show_sort_group_keys((PlanState *) mstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, 0, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
@@ -1978,7 +2014,7 @@ show_agg_keys(AggState *astate, List *ancestors,
 			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
 		else
 			show_sort_group_keys(outerPlanState(astate), "Group Key",
-								 plan->numCols, plan->grpColIdx,
+								 plan->numCols, 0, plan->grpColIdx,
 								 NULL, NULL, NULL,
 								 ancestors, es);
 
@@ -2047,7 +2083,7 @@ show_grouping_set_keys(PlanState *planstate,
 	if (sortnode)
 	{
 		show_sort_group_keys(planstate, "Sort Key",
-							 sortnode->numCols, sortnode->sortColIdx,
+							 sortnode->numCols, 0, sortnode->sortColIdx,
 							 sortnode->sortOperators, sortnode->collations,
 							 sortnode->nullsFirst,
 							 ancestors, es);
@@ -2104,7 +2140,7 @@ show_group_keys(GroupState *gstate, List *ancestors,
 	/* The key columns refer to the tlist of the child plan */
 	ancestors = lcons(gstate, ancestors);
 	show_sort_group_keys(outerPlanState(gstate), "Group Key",
-						 plan->numCols, plan->grpColIdx,
+						 plan->numCols, 0, plan->grpColIdx,
 						 NULL, NULL, NULL,
 						 ancestors, es);
 	ancestors = list_delete_first(ancestors);
@@ -2117,13 +2153,14 @@ show_group_keys(GroupState *gstate, List *ancestors,
  */
 static void
 show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es)
 {
 	Plan	   *plan = planstate->plan;
 	List	   *context;
 	List	   *result = NIL;
+	List	   *resultPresorted = NIL;
 	StringInfoData sortkeybuf;
 	bool		useprefix;
 	int			keyno;
@@ -2163,9 +2200,13 @@ show_sort_group_keys(PlanState *planstate, const char *qlabel,
 								   nullsFirst[keyno]);
 		/* Emit one property-list item per sort key */
 		result = lappend(result, pstrdup(sortkeybuf.data));
+		if (keyno < nPresortedKeys)
+			resultPresorted = lappend(resultPresorted, exprstr);
 	}
 
 	ExplainPropertyList(qlabel, result, es);
+	if (nPresortedKeys > 0)
+		ExplainPropertyList("Presorted Key", resultPresorted, es);
 }
 
 /*
@@ -2373,6 +2414,95 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 	}
 }
 
+/*
+ * If it's EXPLAIN ANALYZE, show tuplesort stats for a incremental sort node
+ */
+static void
+show_incremental_sort_info(IncrementalSortState *incrsortstate,
+						   ExplainState *es)
+{
+	if (es->analyze && incrsortstate->sort_Done &&
+		incrsortstate->tuplesortstate != NULL)
+	{
+		Tuplesortstate *state = (Tuplesortstate *) incrsortstate->tuplesortstate;
+		TuplesortInstrumentation stats;
+		const char *sortMethod;
+		const char *spaceType;
+		long		spaceUsed;
+
+		tuplesort_get_stats(state, &stats);
+		sortMethod = tuplesort_method_name(stats.sortMethod);
+		spaceType = tuplesort_space_type_name(stats.spaceType);
+		spaceUsed = stats.spaceUsed;
+
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Method: %s  %s: %ldkB\n",
+							 sortMethod, spaceType, spaceUsed);
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Groups: %ld\n",
+							 incrsortstate->groupsCount);
+		}
+		else
+		{
+			ExplainPropertyText("Sort Method", sortMethod, es);
+			ExplainPropertyLong("Sort Space Used", spaceUsed, es);
+			ExplainPropertyText("Sort Space Type", spaceType, es);
+			ExplainPropertyLong("Sort Groups: %ld",
+								incrsortstate->groupsCount, es);
+		}
+	}
+
+	if (incrsortstate->shared_info != NULL)
+	{
+		int			n;
+		bool		opened_group = false;
+
+		for (n = 0; n < incrsortstate->shared_info->num_workers; n++)
+		{
+			TuplesortInstrumentation *sinstrument;
+			const char *sortMethod;
+			const char *spaceType;
+			long		spaceUsed;
+			int64		groupsCount;
+
+			sinstrument = &incrsortstate->shared_info->sinfo[n].sinstrument;
+			groupsCount = incrsortstate->shared_info->sinfo[n].groupsCount;
+			if (sinstrument->sortMethod == SORT_TYPE_STILL_IN_PROGRESS)
+				continue;		/* ignore any unfilled slots */
+			sortMethod = tuplesort_method_name(sinstrument->sortMethod);
+			spaceType = tuplesort_space_type_name(sinstrument->spaceType);
+			spaceUsed = sinstrument->spaceUsed;
+
+			if (es->format == EXPLAIN_FORMAT_TEXT)
+			{
+				appendStringInfoSpaces(es->str, es->indent * 2);
+				appendStringInfo(es->str,
+								 "Worker %d:  Sort Method: %s  %s: %ldkB  Groups: %ld\n",
+								 n, sortMethod, spaceType, spaceUsed, groupsCount);
+			}
+			else
+			{
+				if (!opened_group)
+				{
+					ExplainOpenGroup("Workers", "Workers", false, es);
+					opened_group = true;
+				}
+				ExplainOpenGroup("Worker", NULL, true, es);
+				ExplainPropertyInteger("Worker Number", n, es);
+				ExplainPropertyText("Sort Method", sortMethod, es);
+				ExplainPropertyLong("Sort Space Used", spaceUsed, es);
+				ExplainPropertyText("Sort Space Type", spaceType, es);
+				ExplainPropertyLong("Sort Groups", groupsCount, es);
+				ExplainCloseGroup("Worker", NULL, true, es);
+			}
+		}
+		if (opened_group)
+			ExplainCloseGroup("Workers", "Workers", false, es);
+	}
+}
+
 /*
  * Show information on hash buckets/batches.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..572aca05fb 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,8 +24,8 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
        nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
-       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
-       nodeValuesscan.o \
+       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o \
+       nodeSort.o nodeIncrementalSort.o nodeUnique.o nodeValuesscan.o \
        nodeCtescan.o nodeNamedtuplestorescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
        nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 9e78421978..34e05330ea 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -31,6 +31,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -253,6 +254,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecReScanIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecReScanGroup((GroupState *) node);
 			break;
@@ -525,8 +530,12 @@ ExecSupportsBackwardScan(Plan *node)
 		case T_CteScan:
 		case T_Material:
 		case T_Sort:
+			/* these don't evaluate tlist */
 			return true;
 
+		case T_IncrementalSort:
+			return false;
+
 		case T_LockRows:
 		case T_Limit:
 			return ExecSupportsBackwardScan(outerPlan(node));
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8b72ebab9..490d6dd76c 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -32,6 +32,7 @@
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -280,6 +281,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
+			break;
 
 		default:
 			break;
@@ -493,6 +498,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
+			break;
 
 		default:
 			break;
@@ -918,6 +927,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 		case T_SortState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortReInitializeDSM((IncrementalSortState *) planstate, pcxt);
+			break;
 
 		default:
 			break;
@@ -976,6 +989,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_SortState:
 			ExecSortRetrieveInstrumentation((SortState *) planstate);
 			break;
+		case T_IncrementalSortState:
+			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
+			break;
 		case T_HashState:
 			ExecHashRetrieveInstrumentation((HashState *) planstate);
 			break;
@@ -1225,6 +1241,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
+												pwcxt);
+			break;
 
 		default:
 			break;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 43a27a9af2..bc92c3d0e7 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -88,6 +88,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 												estate, eflags);
 			break;
 
+		case T_IncrementalSort:
+			result = (PlanState *) ExecInitIncrementalSort(
+									(IncrementalSort *) node, estate, eflags);
+			break;
+
 		case T_Group:
 			result = (PlanState *) ExecInitGroup((Group *) node,
 												 estate, eflags);
@@ -695,6 +701,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecEndIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecEndGroup((GroupState *) node);
 			break;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 46ee880415..30855c3fe7 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -667,6 +667,7 @@ initialize_phase(AggState *aggstate, int newphase)
 												  sortnode->collations,
 												  sortnode->nullsFirst,
 												  work_mem,
+												  false,
 												  false);
 	}
 
@@ -754,7 +755,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 									 pertrans->sortOperators,
 									 pertrans->sortCollations,
 									 pertrans->sortNullsFirst,
-									 work_mem, false);
+									 work_mem, false, false);
 	}
 
 	/*
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c
new file mode 100644
index 0000000000..1a1e48fb77
--- /dev/null
+++ b/src/backend/executor/nodeIncrementalSort.c
@@ -0,0 +1,649 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncremenalSort.c
+ *	  Routines to handle incremental sorting of relations.
+ *
+ * DESCRIPTION
+ *
+ *		Incremental sort is specially optimized kind of multikey sort when
+ *		input is already presorted by prefix of required keys list.  Thus,
+ *		when it's required to sort by (key1, key2 ... keyN) and result is
+ *		already sorted by (key1, key2 ... keyM), M < N, we sort groups where
+ *		values of (key1, key2 ... keyM) are equal.
+ *
+ *		Consider following example.  We have input tuples consisting from
+ *		two integers (x, y) already presorted by x, while it's required to
+ *		sort them by x and y.  Let input tuples be following.
+ *
+ *		(1, 5)
+ *		(1, 2)
+ *		(2, 10)
+ *		(2, 1)
+ *		(2, 5)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort algorithm would sort by y following groups, which have
+ *		equal x, individually:
+ *			(1, 5) (1, 2)
+ *			(2, 10) (2, 1) (2, 5)
+ *			(3, 3) (3, 7)
+ *
+ *		After sorting these groups and putting them altogether, we would get
+ *		following tuple set which is actually sorted by x and y.
+ *
+ *		(1, 2)
+ *		(1, 5)
+ *		(2, 1)
+ *		(2, 5)
+ *		(2, 10)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort is faster than full sort on large datasets.  But
+ *		the case of most huge benefit of incremental sort is queries with
+ *		LIMIT because incremental sort can return first tuples without reading
+ *		whole input dataset.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeIncremenalSort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/execdebug.h"
+#include "executor/nodeIncrementalSort.h"
+#include "miscadmin.h"
+#include "utils/lsyscache.h"
+#include "utils/tuplesort.h"
+
+/*
+ * Prepare information for skipKeys comparison.
+ */
+static void
+prepareSkipCols(IncrementalSortState *node)
+{
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	int					skipCols,
+						i;
+
+	Assert(IsA(plannode, IncrementalSort));
+	skipCols = plannode->skipCols;
+
+	node->skipKeys = (SkipKeyData *) palloc(skipCols * sizeof(SkipKeyData));
+
+	for (i = 0; i < skipCols; i++)
+	{
+		Oid equalityOp, equalityFunc;
+		SkipKeyData *key;
+
+		key = &node->skipKeys[i];
+		key->attno = plannode->sort.sortColIdx[i];
+
+		equalityOp = get_equality_op_for_ordering_op(
+										plannode->sort.sortOperators[i], NULL);
+		if (!OidIsValid(equalityOp))
+			elog(ERROR, "missing equality operator for ordering operator %u",
+					plannode->sort.sortOperators[i]);
+
+		equalityFunc = get_opcode(equalityOp);
+		if (!OidIsValid(equalityFunc))
+			elog(ERROR, "missing function for operator %u", equalityOp);
+
+		/* Lookup the comparison function */
+		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
+
+		/* We can initialize the callinfo just once and re-use it */
+		InitFunctionCallInfoData(key->fcinfo, &key->flinfo, 2,
+								plannode->sort.collations[i], NULL, NULL);
+		key->fcinfo.argnull[0] = false;
+		key->fcinfo.argnull[1] = false;
+	}
+}
+
+/*
+ * Check if first "skipCols" sort values are equal.
+ */
+static bool
+cmpSortSkipCols(IncrementalSortState *node, TupleTableSlot *a,
+															TupleTableSlot *b)
+{
+	int n, i;
+
+	Assert(IsA(node->ss.ps.plan, IncrementalSort));
+
+	n = ((IncrementalSort *) node->ss.ps.plan)->skipCols;
+
+	for (i = 0; i < n; i++)
+	{
+		Datum datumA, datumB, result;
+		bool isnullA, isnullB;
+		AttrNumber attno = node->skipKeys[i].attno;
+		SkipKeyData *key;
+
+		datumA = slot_getattr(a, attno, &isnullA);
+		datumB = slot_getattr(b, attno, &isnullB);
+
+		/* Special case for NULL-vs-NULL, else use standard comparison */
+		if (isnullA || isnullB)
+		{
+			if (isnullA == isnullB)
+				continue;
+			else
+				return false;
+		}
+
+		key = &node->skipKeys[i];
+
+		key->fcinfo.arg[0] = datumA;
+		key->fcinfo.arg[1] = datumB;
+
+		/* just for paranoia's sake, we reset isnull each time */
+		key->fcinfo.isnull = false;
+
+		result = FunctionCallInvoke(&key->fcinfo);
+
+		/* Check for null result, since caller is clearly not expecting one */
+		if (key->fcinfo.isnull)
+			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
+
+		if (!DatumGetBool(result))
+			return false;
+	}
+	return true;
+}
+
+/*
+ * Copying of tuples to the node->sampleSlot introduces some overhead.  It's
+ * especially notable when groups are containing one or few tuples.  In order
+ * to cope this problem we don't copy sample tuple before the group contains
+ * at least MIN_GROUP_SIZE of tuples.  Surely, it might reduce efficiency of
+ * incremental sort, but it reduces the probability of regression.
+ */
+#define MIN_GROUP_SIZE 32
+
+/* ----------------------------------------------------------------
+ *		ExecIncrementalSort
+ *
+ *		Assuming that outer subtree returns tuple presorted by some prefix
+ *		of target sort columns, performs incremental sort.  It fetches
+ *		groups of tuples where prefix sort columns are equal and sorts them
+ *		using tuplesort.  This approach allows to evade sorting of whole
+ *		dataset.  Besides taking less memory and being faster, it allows to
+ *		start returning tuples before fetching full dataset from outer
+ *		subtree.
+ *
+ *		Conditions:
+ *		  -- none.
+ *
+ *		Initial States:
+ *		  -- the outer child is prepared to return the first tuple.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecIncrementalSort(PlanState *pstate)
+{
+	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
+	EState			   *estate;
+	ScanDirection		dir;
+	Tuplesortstate	   *tuplesortstate;
+	TupleTableSlot	   *slot;
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	PlanState		   *outerNode;
+	TupleDesc			tupDesc;
+	int64				nTuples = 0;
+
+	/*
+	 * get state info from node
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "entering routine");
+
+	estate = node->ss.ps.state;
+	dir = estate->es_direction;
+	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
+
+	/*
+	 * Return next tuple from sorted set if any.
+	 */
+	if (node->sort_Done)
+	{
+		slot = node->ss.ps.ps_ResultTupleSlot;
+		if (tuplesort_gettupleslot(tuplesortstate,
+									  ScanDirectionIsForward(dir),
+									  false, slot, NULL) || node->finished)
+			return slot;
+	}
+
+	/*
+	 * If first time through, read all tuples from outer plan and pass them to
+	 * tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
+	 */
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "sorting subplan");
+
+	/*
+	 * Want to scan subplan in the forward direction while creating the
+	 * sorted data.
+	 */
+	estate->es_direction = ForwardScanDirection;
+
+	/*
+	 * Initialize tuplesort module.
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "calling tuplesort_begin");
+
+	outerNode = outerPlanState(node);
+	tupDesc = ExecGetResultType(outerNode);
+
+	if (node->tuplesortstate == NULL)
+	{
+		/*
+		 * We are going to process the first group of presorted data.
+		 * Initialize support structures for cmpSortSkipCols - already
+		 * sorted columns.
+		 */
+		prepareSkipCols(node);
+
+		/*
+		 * Pass all the columns to tuplesort.  We pass to tuple sort groups
+		 * of at least MIN_GROUP_SIZE size.  Thus, these groups doesn't
+		 * necessary have equal value of the first column.  We unlikely will
+		 * have huge groups with incremental sort.  Therefore usage of
+		 * abbreviated keys would be likely a waste of time.
+		 */
+		tuplesortstate = tuplesort_begin_heap(
+									tupDesc,
+									plannode->sort.numCols,
+									plannode->sort.sortColIdx,
+									plannode->sort.sortOperators,
+									plannode->sort.collations,
+									plannode->sort.nullsFirst,
+									work_mem,
+									false,
+									true);
+		node->tuplesortstate = (void *) tuplesortstate;
+	}
+	else
+	{
+		/* Next group of presorted data */
+		tuplesort_reset((Tuplesortstate *) node->tuplesortstate);
+	}
+	node->groupsCount++;
+
+	/* Calculate remaining bound for bounded sort */
+	if (node->bounded)
+		tuplesort_set_bound(tuplesortstate, node->bound - node->bound_Done);
+
+	/* Put saved tuple to tuplesort if any */
+	if (!TupIsNull(node->sampleSlot))
+	{
+		tuplesort_puttupleslot(tuplesortstate, node->sampleSlot);
+		ExecClearTuple(node->sampleSlot);
+		nTuples++;
+	}
+
+	/*
+	 * Put next group of tuples where skipCols sort values are equal to
+	 * tuplesort.
+	 */
+	for (;;)
+	{
+		slot = ExecProcNode(outerNode);
+
+		if (TupIsNull(slot))
+		{
+			node->finished = true;
+			break;
+		}
+
+		/* Put next group of presorted data to the tuplesort */
+		if (nTuples < MIN_GROUP_SIZE)
+		{
+			tuplesort_puttupleslot(tuplesortstate, slot);
+
+			/* Save last tuple in minimal group */
+			if (nTuples == MIN_GROUP_SIZE - 1)
+				ExecCopySlot(node->sampleSlot, slot);
+			nTuples++;
+		}
+		else
+		{
+			/* Iterate while skip cols are the same as in saved tuple */
+			bool	cmp;
+			cmp = cmpSortSkipCols(node, node->sampleSlot, slot);
+
+			if (cmp)
+			{
+				tuplesort_puttupleslot(tuplesortstate, slot);
+				nTuples++;
+			}
+			else
+			{
+				ExecCopySlot(node->sampleSlot, slot);
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Complete the sort.
+	 */
+	tuplesort_performsort(tuplesortstate);
+
+	/*
+	 * restore to user specified direction
+	 */
+	estate->es_direction = dir;
+
+	/*
+	 * finally set the sorted flag to true
+	 */
+	node->sort_Done = true;
+	node->bounded_Done = node->bounded;
+	if (node->shared_info && node->am_worker)
+	{
+		TuplesortInstrumentation *si;
+
+		Assert(IsParallelWorker());
+		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+		si = &node->shared_info->sinfo[ParallelWorkerNumber].sinstrument;
+		tuplesort_get_stats(tuplesortstate, si);
+		node->shared_info->sinfo[ParallelWorkerNumber].groupsCount =
+															node->groupsCount;
+	}
+
+	/*
+	 * Adjust bound_Done with number of tuples we've actually sorted.
+	 */
+	if (node->bounded)
+	{
+		if (node->finished)
+			node->bound_Done = node->bound;
+		else
+			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
+	}
+
+	SO1_printf("ExecIncrementalSort: %s\n", "sorting done");
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "retrieving tuple from tuplesort");
+
+	/*
+	 * Get the first or next tuple from tuplesort. Returns NULL if no more
+	 * tuples.
+	 */
+	slot = node->ss.ps.ps_ResultTupleSlot;
+	(void) tuplesort_gettupleslot(tuplesortstate,
+								  ScanDirectionIsForward(dir),
+								  false, slot, NULL);
+	return slot;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecInitIncrementalSort
+ *
+ *		Creates the run-time state information for the sort node
+ *		produced by the planner and initializes its outer subtree.
+ * ----------------------------------------------------------------
+ */
+IncrementalSortState *
+ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
+{
+	IncrementalSortState   *incrsortstate;
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "initializing sort node");
+
+	/*
+	 * Incremental sort can't be used with either EXEC_FLAG_REWIND,
+	 * EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK, because we hold only current
+	 * bucket in tuplesortstate.
+	 */
+	Assert((eflags & (EXEC_FLAG_REWIND |
+					  EXEC_FLAG_BACKWARD |
+					  EXEC_FLAG_MARK)) == 0);
+
+	/*
+	 * create state structure
+	 */
+	incrsortstate = makeNode(IncrementalSortState);
+	incrsortstate->ss.ps.plan = (Plan *) node;
+	incrsortstate->ss.ps.state = estate;
+	incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
+
+	incrsortstate->bounded = false;
+	incrsortstate->sort_Done = false;
+	incrsortstate->finished = false;
+	incrsortstate->tuplesortstate = NULL;
+	incrsortstate->sampleSlot = NULL;
+	incrsortstate->bound_Done = 0;
+	incrsortstate->groupsCount = 0;
+	incrsortstate->skipKeys = NULL;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * Sort nodes don't initialize their ExprContexts because they never call
+	 * ExecQual or ExecProject.
+	 */
+
+	/*
+	 * tuple table initialization
+	 *
+	 * sort nodes only return scan tuples from their sorted relation.
+	 */
+	ExecInitResultTupleSlot(estate, &incrsortstate->ss.ps);
+	ExecInitScanTupleSlot(estate, &incrsortstate->ss);
+
+	/*
+	 * initialize child nodes
+	 *
+	 * We shield the child node from the need to support REWIND, BACKWARD, or
+	 * MARK/RESTORE.
+	 */
+	eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
+
+	outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+	/*
+	 * initialize tuple type.  no need to initialize projection info because
+	 * this node doesn't do projections.
+	 */
+	ExecAssignResultTypeFromTL(&incrsortstate->ss.ps);
+	ExecAssignScanTypeFromOuterPlan(&incrsortstate->ss);
+	incrsortstate->ss.ps.ps_ProjInfo = NULL;
+
+	/* make standalone slot to store previous tuple from outer node */
+	incrsortstate->sampleSlot = MakeSingleTupleTableSlot(
+							ExecGetResultType(outerPlanState(incrsortstate)));
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "sort node initialized");
+
+	return incrsortstate;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecEndIncrementalSort(node)
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndIncrementalSort(IncrementalSortState *node)
+{
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "shutting down sort node");
+
+	/*
+	 * clean out the tuple table
+	 */
+	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	/* must drop stanalone tuple slot from outer node */
+	ExecDropSingleTupleTableSlot(node->sampleSlot);
+
+	/*
+	 * Release tuplesort resources
+	 */
+	if (node->tuplesortstate != NULL)
+		tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+
+	/*
+	 * shut down the subplan
+	 */
+	ExecEndNode(outerPlanState(node));
+
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "sort node shutdown");
+}
+
+void
+ExecReScanIncrementalSort(IncrementalSortState *node)
+{
+	PlanState  *outerPlan = outerPlanState(node);
+
+	/*
+	 * If we haven't sorted yet, just return. If outerplan's chgParam is not
+	 * NULL then it will be re-scanned by ExecProcNode, else no reason to
+	 * re-scan it at all.
+	 */
+	if (!node->sort_Done)
+		return;
+
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+
+	/*
+	 * If subnode is to be rescanned then we forget previous sort results; we
+	 * have to re-read the subplan and re-sort.  Also must re-sort if the
+	 * bounded-sort parameters changed or we didn't select randomAccess.
+	 *
+	 * Otherwise we can just rewind and rescan the sorted output.
+	 */
+	node->sort_Done = false;
+	tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+	node->bound_Done = 0;
+
+	/*
+	 * if chgParam of subnode is not null then plan will be re-scanned by
+	 * first ExecProcNode.
+	 */
+	if (outerPlan->chgParam == NULL)
+		ExecReScan(outerPlan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecSortEstimate
+ *
+ *		Estimate space required to propagate sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
+	size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeDSM
+ *
+ *		Initialize DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ pcxt->nworkers * sizeof(IncrementalSortInfo);
+	node->shared_info = shm_toc_allocate(pcxt->toc, size);
+	/* ensure any unfilled slots will contain zeroes */
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortReInitializeDSM
+ *
+ *		Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortReInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	/* If there's any instrumentation space, clear it for next time */
+	if (node->shared_info != NULL)
+	{
+		memset(node->shared_info->sinfo, 0,
+			   node->shared_info->num_workers * sizeof(IncrementalSortInfo));
+	}
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeWorker
+ *
+ *		Attach worker to DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
+{
+	node->shared_info =
+		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	node->am_worker = true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRetrieveInstrumentation
+ *
+ *		Transfer sort statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
+{
+	Size		size;
+	SharedIncrementalSortInfo *si;
+
+	if (node->shared_info == NULL)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ node->shared_info->num_workers * sizeof(IncrementalSortInfo);
+	si = palloc(size);
+	memcpy(si, node->shared_info, size);
+	node->shared_info = si;
+}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 9c68de8565..90c82af17f 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -93,7 +93,8 @@ ExecSort(PlanState *pstate)
 											  plannode->collations,
 											  plannode->nullsFirst,
 											  work_mem,
-											  node->randomAccess);
+											  node->randomAccess,
+											  false);
 		if (node->bounded)
 			tuplesort_set_bound(tuplesortstate, node->bound);
 		node->tuplesortstate = (void *) tuplesortstate;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ddbbc79823..94d5ba0e41 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -919,6 +919,24 @@ _copyMaterial(const Material *from)
 }
 
 
+/*
+ * CopySortFields
+ *
+ *		This function copies the fields of the Sort node.  It is used by
+ *		all the copy functions for classes which inherit from Sort.
+ */
+static void
+CopySortFields(const Sort *from, Sort *newnode)
+{
+	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+	COPY_SCALAR_FIELD(numCols);
+	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+}
+
 /*
  * _copySort
  */
@@ -930,13 +948,29 @@ _copySort(const Sort *from)
 	/*
 	 * copy node superclass fields
 	 */
-	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+	CopySortFields(from, newnode);
 
-	COPY_SCALAR_FIELD(numCols);
-	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
-	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+	return newnode;
+}
+
+
+/*
+ * _copyIncrementalSort
+ */
+static IncrementalSort *
+_copyIncrementalSort(const IncrementalSort *from)
+{
+	IncrementalSort	   *newnode = makeNode(IncrementalSort);
+
+	/*
+	 * copy node superclass fields
+	 */
+	CopySortFields((const Sort *) from, (Sort *) newnode);
+
+	/*
+	 * copy remainder of node
+	 */
+	COPY_SCALAR_FIELD(skipCols);
 
 	return newnode;
 }
@@ -4817,6 +4851,9 @@ copyObjectImpl(const void *from)
 		case T_Sort:
 			retval = _copySort(from);
 			break;
+		case T_IncrementalSort:
+			retval = _copyIncrementalSort(from);
+			break;
 		case T_Group:
 			retval = _copyGroup(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 5e72df137e..415a9e9b19 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -870,12 +870,10 @@ _outMaterial(StringInfo str, const Material *node)
 }
 
 static void
-_outSort(StringInfo str, const Sort *node)
+_outSortInfo(StringInfo str, const Sort *node)
 {
 	int			i;
 
-	WRITE_NODE_TYPE("SORT");
-
 	_outPlanInfo(str, (const Plan *) node);
 
 	WRITE_INT_FIELD(numCols);
@@ -897,6 +895,24 @@ _outSort(StringInfo str, const Sort *node)
 		appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
 }
 
+static void
+_outSort(StringInfo str, const Sort *node)
+{
+	WRITE_NODE_TYPE("SORT");
+
+	_outSortInfo(str, node);
+}
+
+static void
+_outIncrementalSort(StringInfo str, const IncrementalSort *node)
+{
+	WRITE_NODE_TYPE("INCREMENTALSORT");
+
+	_outSortInfo(str, (const Sort *) node);
+
+	WRITE_INT_FIELD(skipCols);
+}
+
 static void
 _outUnique(StringInfo str, const Unique *node)
 {
@@ -3739,6 +3755,9 @@ outNode(StringInfo str, const void *obj)
 			case T_Sort:
 				_outSort(str, obj);
 				break;
+			case T_IncrementalSort:
+				_outIncrementalSort(str, obj);
+				break;
 			case T_Unique:
 				_outUnique(str, obj);
 				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 9925866b53..99d6938ddc 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2060,12 +2060,13 @@ _readMaterial(void)
 }
 
 /*
- * _readSort
+ * ReadCommonSort
+ *	Assign the basic stuff of all nodes that inherit from Sort
  */
-static Sort *
-_readSort(void)
+static void
+ReadCommonSort(Sort *local_node)
 {
-	READ_LOCALS(Sort);
+	READ_TEMP_LOCALS();
 
 	ReadCommonPlan(&local_node->plan);
 
@@ -2074,6 +2075,32 @@ _readSort(void)
 	READ_OID_ARRAY(sortOperators, local_node->numCols);
 	READ_OID_ARRAY(collations, local_node->numCols);
 	READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+}
+
+/*
+ * _readSort
+ */
+static Sort *
+_readSort(void)
+{
+	READ_LOCALS_NO_FIELDS(Sort);
+
+	ReadCommonSort(local_node);
+
+	READ_DONE();
+}
+
+/*
+ * _readIncrementalSort
+ */
+static IncrementalSort *
+_readIncrementalSort(void)
+{
+	READ_LOCALS(IncrementalSort);
+
+	ReadCommonSort(&local_node->sort);
+
+	READ_INT_FIELD(skipCols);
 
 	READ_DONE();
 }
@@ -2636,6 +2663,8 @@ parseNodeString(void)
 		return_value = _readMaterial();
 	else if (MATCH("SORT", 4))
 		return_value = _readSort();
+	else if (MATCH("INCREMENTALSORT", 7))
+		return_value = _readIncrementalSort();
 	else if (MATCH("GROUP", 5))
 		return_value = _readGroup();
 	else if (MATCH("AGG", 3))
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 12a6ee4a22..e96c5fe137 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3613,6 +3613,10 @@ print_path(PlannerInfo *root, Path *path, int indent)
 			ptype = "Sort";
 			subpath = ((SortPath *) path)->subpath;
 			break;
+		case T_IncrementalSortPath:
+			ptype = "IncrementalSort";
+			subpath = ((SortPath *) path)->subpath;
+			break;
 		case T_GroupPath:
 			ptype = "Group";
 			subpath = ((GroupPath *) path)->subpath;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 8679b14b29..05f58fff79 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -121,6 +121,7 @@ bool		enable_indexonlyscan = true;
 bool		enable_bitmapscan = true;
 bool		enable_tidscan = true;
 bool		enable_sort = true;
+bool		enable_incrementalsort = true;
 bool		enable_hashagg = true;
 bool		enable_nestloop = true;
 bool		enable_material = true;
@@ -1605,6 +1606,13 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  *	  Determines and returns the cost of sorting a relation, including
  *	  the cost of reading the input data.
  *
+ * Sort could be either full sort of relation or incremental sort when we already
+ * have data presorted by some of required pathkeys.  In the second case
+ * we estimate number of groups which source data is divided to by presorted
+ * pathkeys.  And then estimate cost of sorting each individual group assuming
+ * data is divided into group uniformly.  Also, if LIMIT is specified then
+ * we have to pull from source and sort only some of total groups.
+ *
  * If the total volume of data to sort is less than sort_mem, we will do
  * an in-memory sort, which requires no I/O and about t*log2(t) tuple
  * comparisons for t tuples.
@@ -1631,7 +1639,9 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  * work that has to be done to prepare the inputs to the comparison operators.
  *
  * 'pathkeys' is a list of sort keys
- * 'input_cost' is the total cost for reading the input data
+ * 'presorted_keys' is a number of pathkeys already presorted in given path
+ * 'input_startup_cost' is the startup cost for reading the input data
+ * 'input_total_cost' is the total cost for reading the input data
  * 'tuples' is the number of tuples in the relation
  * 'width' is the average tuple width in bytes
  * 'comparison_cost' is the extra cost per comparison, if any
@@ -1647,19 +1657,28 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  */
 void
 cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples)
 {
-	Cost		startup_cost = input_cost;
-	Cost		run_cost = 0;
+	Cost		startup_cost = input_startup_cost;
+	Cost		run_cost = 0,
+				rest_cost,
+				group_cost,
+				input_run_cost = input_total_cost - input_startup_cost;
 	double		input_bytes = relation_byte_size(tuples, width);
 	double		output_bytes;
 	double		output_tuples;
+	double		num_groups,
+				group_input_bytes,
+				group_tuples;
 	long		sort_mem_bytes = sort_mem * 1024L;
 
 	if (!enable_sort)
 		startup_cost += disable_cost;
+	if (!enable_incrementalsort)
+		presorted_keys = 0;
 
 	path->rows = tuples;
 
@@ -1685,13 +1704,50 @@ cost_sort(Path *path, PlannerInfo *root,
 		output_bytes = input_bytes;
 	}
 
-	if (output_bytes > sort_mem_bytes)
+	/*
+	 * Estimate number of groups which dataset is divided by presorted keys.
+	 */
+	if (presorted_keys > 0)
+	{
+		List	   *presortedExprs = NIL;
+		ListCell   *l;
+		int			i = 0;
+
+		/* Extract presorted keys as list of expressions */
+		foreach(l, pathkeys)
+		{
+			PathKey *key = (PathKey *)lfirst(l);
+			EquivalenceMember *member = (EquivalenceMember *)
+										linitial(key->pk_eclass->ec_members);
+
+			presortedExprs = lappend(presortedExprs, member->em_expr);
+
+			i++;
+			if (i >= presorted_keys)
+				break;
+		}
+
+		/* Estimate number of groups with equal presorted keys */
+		num_groups = estimate_num_groups(root, presortedExprs, tuples, NULL);
+	}
+	else
+	{
+		num_groups = 1.0;
+	}
+
+	/*
+	 * Estimate average cost of sorting of one group where presorted keys are
+	 * equal.
+	 */
+	group_input_bytes = input_bytes / num_groups;
+	group_tuples = tuples / num_groups;
+	if (output_bytes > sort_mem_bytes && group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll have to use a disk-based sort of all the tuples
 		 */
-		double		npages = ceil(input_bytes / BLCKSZ);
-		double		nruns = input_bytes / sort_mem_bytes;
+		double		npages = ceil(group_input_bytes / BLCKSZ);
+		double		nruns = group_input_bytes / sort_mem_bytes;
 		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
 		double		log_runs;
 		double		npageaccesses;
@@ -1701,7 +1757,7 @@ cost_sort(Path *path, PlannerInfo *root,
 		 *
 		 * Assume about N log2 N comparisons
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
 
 		/* Disk costs */
 
@@ -1712,10 +1768,10 @@ cost_sort(Path *path, PlannerInfo *root,
 			log_runs = 1.0;
 		npageaccesses = 2.0 * npages * log_runs;
 		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
-		startup_cost += npageaccesses *
+		group_cost += npageaccesses *
 			(seq_page_cost * 0.75 + random_page_cost * 0.25);
 	}
-	else if (tuples > 2 * output_tuples || input_bytes > sort_mem_bytes)
+	else if (group_tuples > 2 * output_tuples || group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll use a bounded heap-sort keeping just K tuples in memory, for
@@ -1723,14 +1779,33 @@ cost_sort(Path *path, PlannerInfo *root,
 		 * factor is a bit higher than for quicksort.  Tweak it so that the
 		 * cost curve is continuous at the crossover point.
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(2.0 * output_tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(2.0 * output_tuples);
 	}
 	else
 	{
-		/* We'll use plain quicksort on all the input tuples */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		/*
+		 * We'll use plain quicksort on all the input tuples.  If it appears
+		 * that we expect less than two tuples per sort group then assume
+		 * logarithmic part of estimate to be 1.
+		 */
+		if (group_tuples >= 2.0)
+			group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
+		else
+			group_cost = comparison_cost * group_tuples;
 	}
 
+	/* Add per group cost of fetching tuples from input */
+	group_cost += input_run_cost / num_groups;
+
+	/*
+	 * We've to sort first group to start output from node. Sorting rest of
+	 * groups are required to return all the other tuples.
+	 */
+	startup_cost += group_cost;
+	rest_cost = (num_groups * (output_tuples / tuples) - 1.0) * group_cost;
+	if (rest_cost > 0.0)
+		run_cost += rest_cost;
+
 	/*
 	 * Also charge a small amount (arbitrarily set equal to operator cost) per
 	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
@@ -1741,6 +1816,19 @@ cost_sort(Path *path, PlannerInfo *root,
 	 */
 	run_cost += cpu_operator_cost * tuples;
 
+	/* Extra costs of incremental sort */
+	if (presorted_keys > 0)
+	{
+		/*
+		 * In incremental sort case we also have to cost to detect sort groups.
+		 * It turns out into extra copy and comparison for each tuple.
+		 */
+		run_cost += (cpu_tuple_cost + comparison_cost) * tuples;
+
+		/* Cost of per group tuplesort reset */
+		run_cost += 2.0 * cpu_tuple_cost * num_groups;
+	}
+
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
@@ -2717,6 +2805,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  outersortkeys,
+				  pathkeys_common(outer_path->pathkeys, outersortkeys),
+				  outer_path->startup_cost,
 				  outer_path->total_cost,
 				  outer_path_rows,
 				  outer_path->pathtarget->width,
@@ -2743,6 +2833,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  innersortkeys,
+				  pathkeys_common(inner_path->pathkeys, innersortkeys),
+				  inner_path->startup_cost,
 				  inner_path->total_cost,
 				  inner_path_rows,
 				  inner_path->pathtarget->width,
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index ef58cff28d..329ba7b532 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -22,10 +22,12 @@
 #include "nodes/nodeFuncs.h"
 #include "nodes/plannodes.h"
 #include "optimizer/clauses.h"
+#include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/tlist.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 
 
 static bool pathkey_is_redundant(PathKey *new_pathkey, List *pathkeys);
@@ -308,6 +310,33 @@ compare_pathkeys(List *keys1, List *keys2)
 	return PATHKEYS_EQUAL;
 }
 
+
+/*
+ * pathkeys_common
+ *    Returns length of longest common prefix of keys1 and keys2.
+ */
+int
+pathkeys_common(List *keys1, List *keys2)
+{
+	int n;
+	ListCell   *key1,
+			   *key2;
+	n = 0;
+
+	forboth(key1, keys1, key2, keys2)
+	{
+		PathKey    *pathkey1 = (PathKey *) lfirst(key1);
+		PathKey    *pathkey2 = (PathKey *) lfirst(key2);
+
+		if (pathkey1 != pathkey2)
+			return n;
+		n++;
+	}
+
+	return n;
+}
+
+
 /*
  * pathkeys_contained_in
  *	  Common special case of compare_pathkeys: we just want to know
@@ -1488,26 +1517,42 @@ right_merge_direction(PlannerInfo *root, PathKey *pathkey)
  *		Count the number of pathkeys that are useful for meeting the
  *		query's requested output ordering.
  *
- * Unlike merge pathkeys, this is an all-or-nothing affair: it does us
- * no good to order by just the first key(s) of the requested ordering.
- * So the result is always either 0 or list_length(root->query_pathkeys).
+ * Returns number of pathkeys that maches given argument. Others can be
+ * satisfied by incremental sort.
  */
-static int
-pathkeys_useful_for_ordering(PlannerInfo *root, List *pathkeys)
+int
+pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys)
 {
-	if (root->query_pathkeys == NIL)
+	int	n_common_pathkeys;
+
+	if (query_pathkeys == NIL)
 		return 0;				/* no special ordering requested */
 
 	if (pathkeys == NIL)
 		return 0;				/* unordered path */
 
-	if (pathkeys_contained_in(root->query_pathkeys, pathkeys))
+	n_common_pathkeys = pathkeys_common(query_pathkeys, pathkeys);
+
+	if (enable_incrementalsort)
 	{
-		/* It's useful ... or at least the first N keys are */
-		return list_length(root->query_pathkeys);
+		/*
+		 * Return the number of path keys in common, or 0 if there are none. Any
+		 * first common pathkeys could be useful for ordering because we can use
+		 * incremental sort.
+		 */
+		return n_common_pathkeys;
+	}
+	else
+	{
+		/*
+		 * When incremental sort is disabled, pathkeys are useful only when they
+		 * do contain all the query pathkeys.
+		 */
+		if (n_common_pathkeys == list_length(query_pathkeys))
+			return n_common_pathkeys;
+		else
+			return 0;
 	}
-
-	return 0;					/* path ordering not useful */
 }
 
 /*
@@ -1523,7 +1568,7 @@ truncate_useless_pathkeys(PlannerInfo *root,
 	int			nuseful2;
 
 	nuseful = pathkeys_useful_for_merging(root, rel, pathkeys);
-	nuseful2 = pathkeys_useful_for_ordering(root, pathkeys);
+	nuseful2 = pathkeys_useful_for_ordering(root->query_pathkeys, pathkeys);
 	if (nuseful2 > nuseful)
 		nuseful = nuseful2;
 
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index e599283d6b..133435f516 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -236,7 +236,7 @@ static MergeJoin *make_mergejoin(List *tlist,
 			   Plan *lefttree, Plan *righttree,
 			   JoinType jointype, bool inner_unique,
 			   bool skip_mark_restore);
-static Sort *make_sort(Plan *lefttree, int numCols,
+static Sort *make_sort(Plan *lefttree, int numCols, int skipCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst);
 static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
@@ -252,10 +252,11 @@ static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
 					   TargetEntry *tle,
 					   Relids relids);
 static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
-						Relids relids);
+						Relids relids, int skipCols);
 static Sort *make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree);
+						 Plan *lefttree,
+						 int skipCols);
 static Material *make_material(Plan *lefttree);
 static WindowAgg *make_windowagg(List *tlist, Index winref,
 			   int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
@@ -437,6 +438,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 											   (GatherPath *) best_path);
 			break;
 		case T_Sort:
+		case T_IncrementalSort:
 			plan = (Plan *) create_sort_plan(root,
 											 (SortPath *) best_path,
 											 flags);
@@ -1122,6 +1124,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 		Oid		   *sortOperators;
 		Oid		   *collations;
 		bool	   *nullsFirst;
+		int			n_common_pathkeys;
 
 		/* Build the child plan */
 		/* Must insist that all children return the same tlist */
@@ -1156,9 +1159,11 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 					  numsortkeys * sizeof(bool)) == 0);
 
 		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
+		if (n_common_pathkeys < list_length(pathkeys))
 		{
 			Sort	   *sort = make_sort(subplan, numsortkeys,
+										 n_common_pathkeys,
 										 sortColIdx, sortOperators,
 										 collations, nullsFirst);
 
@@ -1508,6 +1513,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 	Plan	   *subplan;
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
+	int			n_common_pathkeys;
 
 	/* As with Gather, it's best to project away columns in the workers. */
 	subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
@@ -1537,12 +1543,16 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 
 
 	/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-	if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
+	n_common_pathkeys = pathkeys_common(pathkeys, best_path->subpath->pathkeys);
+	if (n_common_pathkeys < list_length(pathkeys))
+	{
 		subplan = (Plan *) make_sort(subplan, gm_plan->numCols,
+									 n_common_pathkeys,
 									 gm_plan->sortColIdx,
 									 gm_plan->sortOperators,
 									 gm_plan->collations,
 									 gm_plan->nullsFirst);
+	}
 
 	/* Now insert the subplan under GatherMerge. */
 	gm_plan->plan.lefttree = subplan;
@@ -1655,6 +1665,7 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 {
 	Sort	   *plan;
 	Plan	   *subplan;
+	int			n_common_pathkeys;
 
 	/*
 	 * We don't want any excess columns in the sorted tuples, so request a
@@ -1664,7 +1675,13 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	subplan = create_plan_recurse(root, best_path->subpath,
 								  flags | CP_SMALL_TLIST);
 
-	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys, NULL);
+	if (IsA(best_path, IncrementalSortPath))
+		n_common_pathkeys = ((IncrementalSortPath *) best_path)->skipCols;
+	else
+		n_common_pathkeys = 0;
+
+	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys,
+								   NULL, n_common_pathkeys);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -1908,7 +1925,8 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 				sort_plan = (Plan *)
 					make_sort_from_groupcols(rollup->groupClause,
 											 new_grpColIdx,
-											 subplan);
+											 subplan,
+											 0);
 			}
 
 			if (!rollup->is_hashed)
@@ -3848,10 +3866,15 @@ create_mergejoin_plan(PlannerInfo *root,
 	 */
 	if (best_path->outersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		outer_relids = outer_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(outer_plan,
-												   best_path->outersortkeys,
-												   outer_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->outersortkeys,
+									best_path->jpath.outerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(outer_plan, best_path->outersortkeys,
+									   outer_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		outer_plan = (Plan *) sort;
@@ -3862,10 +3885,15 @@ create_mergejoin_plan(PlannerInfo *root,
 
 	if (best_path->innersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		inner_relids = inner_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(inner_plan,
-												   best_path->innersortkeys,
-												   inner_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->innersortkeys,
+									best_path->jpath.innerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(inner_plan, best_path->innersortkeys,
+									   inner_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		inner_plan = (Plan *) sort;
@@ -4927,8 +4955,13 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
 {
 	Plan	   *lefttree = plan->plan.lefttree;
 	Path		sort_path;		/* dummy for result of cost_sort */
+	int			skip_cols = 0;
+
+	if (IsA(plan, IncrementalSort))
+		skip_cols = ((IncrementalSort *) plan)->skipCols;
 
-	cost_sort(&sort_path, root, NIL,
+	cost_sort(&sort_path, root, NIL, skip_cols,
+			  lefttree->startup_cost,
 			  lefttree->total_cost,
 			  lefttree->plan_rows,
 			  lefttree->plan_width,
@@ -5519,13 +5552,31 @@ make_mergejoin(List *tlist,
  * nullsFirst arrays already.
  */
 static Sort *
-make_sort(Plan *lefttree, int numCols,
+make_sort(Plan *lefttree, int numCols, int skipCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst)
 {
-	Sort	   *node = makeNode(Sort);
-	Plan	   *plan = &node->plan;
+	Sort	   *node;
+	Plan	   *plan;
+
+	/* Always use regular sort node when enable_incrementalsort = false */
+	if (!enable_incrementalsort)
+		skipCols = 0;
+
+	if (skipCols == 0)
+	{
+		node = makeNode(Sort);
+	}
+	else
+	{
+		IncrementalSort    *incrementalSort;
+
+		incrementalSort = makeNode(IncrementalSort);
+		node = &incrementalSort->sort;
+		incrementalSort->skipCols = skipCols;
+	}
 
+	plan = &node->plan;
 	plan->targetlist = lefttree->targetlist;
 	plan->qual = NIL;
 	plan->lefttree = lefttree;
@@ -5858,9 +5909,11 @@ find_ec_member_for_tle(EquivalenceClass *ec,
  *	  'lefttree' is the node which yields input tuples
  *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
  *	  'relids' is the set of relations required by prepare_sort_from_pathkeys()
+ *	  'skipCols' is the number of presorted columns in input tuples
  */
 static Sort *
-make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
+make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
+						Relids relids, int skipCols)
 {
 	int			numsortkeys;
 	AttrNumber *sortColIdx;
@@ -5880,7 +5933,7 @@ make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
 										  &nullsFirst);
 
 	/* Now build the Sort node */
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, skipCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5923,7 +5976,7 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, 0,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5944,7 +5997,8 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 static Sort *
 make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree)
+						 Plan *lefttree,
+						 int skipCols)
 {
 	List	   *sub_tlist = lefttree->targetlist;
 	ListCell   *l;
@@ -5977,7 +6031,7 @@ make_sort_from_groupcols(List *groupcls,
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, skipCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -6633,6 +6687,7 @@ is_projection_capable_plan(Plan *plan)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_LockRows:
diff --git a/src/backend/optimizer/plan/planagg.c b/src/backend/optimizer/plan/planagg.c
index 95cbffbd69..308f60beac 100644
--- a/src/backend/optimizer/plan/planagg.c
+++ b/src/backend/optimizer/plan/planagg.c
@@ -44,6 +44,7 @@
 #include "parser/parse_clause.h"
 #include "rewrite/rewriteManip.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 #include "utils/syscache.h"
 
 
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 7b52dadd81..3842271245 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3849,14 +3849,14 @@ create_grouping_paths(PlannerInfo *root,
 			foreach(lc, input_rel->partial_pathlist)
 			{
 				Path	   *path = (Path *) lfirst(lc);
-				bool		is_sorted;
+				int			n_useful_pathkeys;
 
-				is_sorted = pathkeys_contained_in(root->group_pathkeys,
-												  path->pathkeys);
-				if (path == cheapest_partial_path || is_sorted)
+				n_useful_pathkeys = pathkeys_useful_for_ordering(
+										root->group_pathkeys, path->pathkeys);
+				if (path == cheapest_partial_path || n_useful_pathkeys > 0)
 				{
 					/* Sort the cheapest partial path, if it isn't already */
-					if (!is_sorted)
+					if (n_useful_pathkeys < list_length(root->group_pathkeys))
 						path = (Path *) create_sort_path(root,
 														 grouped_rel,
 														 path,
@@ -3929,14 +3929,14 @@ create_grouping_paths(PlannerInfo *root,
 		foreach(lc, input_rel->pathlist)
 		{
 			Path	   *path = (Path *) lfirst(lc);
-			bool		is_sorted;
+			int			n_useful_pathkeys;
 
-			is_sorted = pathkeys_contained_in(root->group_pathkeys,
-											  path->pathkeys);
-			if (path == cheapest_path || is_sorted)
+			n_useful_pathkeys = pathkeys_useful_for_ordering(
+										root->group_pathkeys, path->pathkeys);
+			if (path == cheapest_path || n_useful_pathkeys > 0)
 			{
 				/* Sort the cheapest-total path if it isn't already sorted */
-				if (!is_sorted)
+				if (n_useful_pathkeys < list_length(root->group_pathkeys))
 					path = (Path *) create_sort_path(root,
 													 grouped_rel,
 													 path,
@@ -5003,13 +5003,13 @@ create_ordered_paths(PlannerInfo *root,
 	foreach(lc, input_rel->pathlist)
 	{
 		Path	   *path = (Path *) lfirst(lc);
-		bool		is_sorted;
+		int			n_useful_pathkeys;
 
-		is_sorted = pathkeys_contained_in(root->sort_pathkeys,
-										  path->pathkeys);
-		if (path == cheapest_input_path || is_sorted)
+		n_useful_pathkeys = pathkeys_useful_for_ordering(root->sort_pathkeys,
+														 path->pathkeys);
+		if (path == cheapest_input_path || n_useful_pathkeys > 0)
 		{
-			if (!is_sorted)
+			if (n_useful_pathkeys < list_length(root->sort_pathkeys))
 			{
 				/* An explicit sort here can take advantage of LIMIT */
 				path = (Path *) create_sort_path(root,
@@ -6139,8 +6139,9 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
 
 	/* Estimate the cost of seq scan + sort */
 	seqScanPath = create_seqscan_path(root, rel, NULL, 0);
-	cost_sort(&seqScanAndSortPath, root, NIL,
-			  seqScanPath->total_cost, rel->tuples, rel->reltarget->width,
+	cost_sort(&seqScanAndSortPath, root, NIL, 0,
+			  seqScanPath->startup_cost, seqScanPath->total_cost,
+			  rel->tuples, rel->reltarget->width,
 			  comparisonCost, maintenance_work_mem, -1.0);
 
 	/* Estimate the cost of index scan */
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 4617d12cb9..be520e6086 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -642,6 +642,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index 46367cba63..616ad1a474 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2782,6 +2782,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_Group:
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 5a08e75ad5..eb95ca4c5e 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -983,7 +983,8 @@ choose_hashed_setop(PlannerInfo *root, List *groupClauses,
 	sorted_p.startup_cost = input_path->startup_cost;
 	sorted_p.total_cost = input_path->total_cost;
 	/* XXX cost_sort doesn't actually look at pathkeys, so just pass NIL */
-	cost_sort(&sorted_p, root, NIL, sorted_p.total_cost,
+	cost_sort(&sorted_p, root, NIL, 0, 
+			  sorted_p.startup_cost, sorted_p.total_cost,
 			  input_path->rows, input_path->pathtarget->width,
 			  0.0, work_mem, -1.0);
 	cost_group(&sorted_p, root, numGroupCols, dNumGroups,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 7df8761710..9c6f910f14 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -105,7 +105,7 @@ compare_path_costs(Path *path1, Path *path2, CostSelector criterion)
 }
 
 /*
- * compare_path_fractional_costs
+ * compare_fractional_path_costs
  *	  Return -1, 0, or +1 according as path1 is cheaper, the same cost,
  *	  or more expensive than path2 for fetching the specified fraction
  *	  of the total tuples.
@@ -1356,12 +1356,13 @@ create_merge_append_path(PlannerInfo *root,
 	foreach(l, subpaths)
 	{
 		Path	   *subpath = (Path *) lfirst(l);
+		int			n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
 
 		pathnode->path.rows += subpath->rows;
 		pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
 			subpath->parallel_safe;
 
-		if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		if (n_common_pathkeys == list_length(pathkeys))
 		{
 			/* Subpath is adequately ordered, we won't need to sort it */
 			input_startup_cost += subpath->startup_cost;
@@ -1375,6 +1376,8 @@ create_merge_append_path(PlannerInfo *root,
 			cost_sort(&sort_path,
 					  root,
 					  pathkeys,
+					  n_common_pathkeys,
+					  subpath->startup_cost,
 					  subpath->total_cost,
 					  subpath->parent->tuples,
 					  subpath->pathtarget->width,
@@ -1622,7 +1625,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		/*
 		 * Estimate cost for sort+unique implementation
 		 */
-		cost_sort(&sort_path, root, NIL,
+		cost_sort(&sort_path, root, NIL, 0,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  rel->rows,
 				  subpath->pathtarget->width,
@@ -1715,6 +1719,7 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	GatherMergePath *pathnode = makeNode(GatherMergePath);
 	Cost		input_startup_cost = 0;
 	Cost		input_total_cost = 0;
+	int			n_common_pathkeys;
 
 	Assert(subpath->parallel_safe);
 	Assert(pathkeys);
@@ -1731,7 +1736,9 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
 	pathnode->path.rows += subpath->rows;
 
-	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+	n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
+
+	if (n_common_pathkeys == list_length(pathkeys))
 	{
 		/* Subpath is adequately ordered, we won't need to sort it */
 		input_startup_cost += subpath->startup_cost;
@@ -1745,6 +1752,8 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		cost_sort(&sort_path,
 				  root,
 				  pathkeys,
+				  n_common_pathkeys,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  subpath->rows,
 				  subpath->pathtarget->width,
@@ -2604,9 +2613,31 @@ create_sort_path(PlannerInfo *root,
 				 List *pathkeys,
 				 double limit_tuples)
 {
-	SortPath   *pathnode = makeNode(SortPath);
+	SortPath   *pathnode;
+	int			n_common_pathkeys;
+
+	if (enable_incrementalsort)
+		n_common_pathkeys = pathkeys_common(subpath->pathkeys, pathkeys);
+	else
+		n_common_pathkeys = 0;
+
+	if (n_common_pathkeys == 0)
+	{
+		pathnode = makeNode(SortPath);
+		pathnode->path.pathtype = T_Sort;
+	}
+	else
+	{
+		IncrementalSortPath   *incpathnode;
+
+		incpathnode = makeNode(IncrementalSortPath);
+		pathnode = &incpathnode->spath;
+		pathnode->path.pathtype = T_IncrementalSort;
+		incpathnode->skipCols = n_common_pathkeys;
+	}
+
+	Assert(n_common_pathkeys < list_length(pathkeys));
 
-	pathnode->path.pathtype = T_Sort;
 	pathnode->path.parent = rel;
 	/* Sort doesn't project, so use source path's pathtarget */
 	pathnode->path.pathtarget = subpath->pathtarget;
@@ -2620,7 +2651,9 @@ create_sort_path(PlannerInfo *root,
 
 	pathnode->subpath = subpath;
 
-	cost_sort(&pathnode->path, root, pathkeys,
+	cost_sort(&pathnode->path, root,
+			  pathkeys, n_common_pathkeys,
+			  subpath->startup_cost,
 			  subpath->total_cost,
 			  subpath->rows,
 			  subpath->pathtarget->width,
@@ -2932,7 +2965,8 @@ create_groupingsets_path(PlannerInfo *root,
 			else
 			{
 				/* Account for cost of sort, but don't charge input cost again */
-				cost_sort(&sort_path, root, NIL,
+				cost_sort(&sort_path, root, NIL, 0,
+						  0.0,
 						  0.0,
 						  subpath->rows,
 						  subpath->pathtarget->width,
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
index 79dbfd1a05..e3e984b3da 100644
--- a/src/backend/utils/adt/orderedsetaggs.c
+++ b/src/backend/utils/adt/orderedsetaggs.c
@@ -291,7 +291,8 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
 												   qstate->sortCollations,
 												   qstate->sortNullsFirsts,
 												   work_mem,
-												   qstate->rescan_needed);
+												   qstate->rescan_needed,
+												   false);
 	else
 		osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
 													qstate->sortOperator,
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index fcc8323f62..4726bee850 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -3714,6 +3714,42 @@ estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows,
 	return numdistinct;
 }
 
+/*
+ * estimate_pathkeys_groups	- Estimate number of groups which dataset is
+ * 							  divided to by pathkeys.
+ *
+ * Returns an array of group numbers. i'th element of array is number of groups
+ * which first i pathkeys divides dataset into.  Actually is a convenience
+ * wrapper over estimate_num_groups().
+ */
+double *
+estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root, double tuples)
+{
+	ListCell   *l;
+	List	   *groupExprs = NIL;
+	double	   *result;
+	int			i;
+
+	/*
+	 * Get number of groups for each prefix of pathkeys.
+	 */
+	i = 0;
+	result = (double *) palloc(sizeof(double) * list_length(pathkeys));
+	foreach(l, pathkeys)
+	{
+		PathKey *key = (PathKey *)lfirst(l);
+		EquivalenceMember *member = (EquivalenceMember *)
+							linitial(key->pk_eclass->ec_members);
+
+		groupExprs = lappend(groupExprs, member->em_expr);
+
+		result[i] = estimate_num_groups(root, groupExprs, tuples, NULL);
+		i++;
+	}
+
+	return result;
+}
+
 /*
  * Estimate hash bucket statistics when the specified expression is used
  * as a hash key for the given number of buckets.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be329e..bea4f00421 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -857,6 +857,15 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_incrementalsort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of incremental sort steps."),
+			NULL
+		},
+		&enable_incrementalsort,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of hashed aggregation plans."),
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index eecc66cafa..80bc67c093 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -231,6 +231,13 @@ struct Tuplesortstate
 	int64		allowedMem;		/* total memory allowed, in bytes */
 	int			maxTapes;		/* number of tapes (Knuth's T) */
 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	int64		maxSpace;		/* maximum amount of space occupied among sort
+								   of groups, either in-memory or on-disk */
+	bool		maxSpaceOnDisk;	/* true when maxSpace is value for on-disk
+								   space, fase when it's value for in-memory
+								   space */
+	TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
+	MemoryContext maincontext;
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
@@ -573,6 +580,9 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
 			  int tapenum, unsigned int len);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+static void tuplesort_free(Tuplesortstate *state, bool delete);
+static void tuplesort_updatemax(Tuplesortstate *state);
+
 
 /*
  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
@@ -607,18 +617,27 @@ static Tuplesortstate *
 tuplesort_begin_common(int workMem, bool randomAccess)
 {
 	Tuplesortstate *state;
+	MemoryContext maincontext;
 	MemoryContext sortcontext;
 	MemoryContext tuplecontext;
 	MemoryContext oldcontext;
 
 	/*
-	 * Create a working memory context for this sort operation. All data
-	 * needed by the sort will live inside this context.
+	 * Memory context surviving tuplesort_reset.  This memory context holds
+	 * data which is useful to keep while sorting multiple similar batches.
 	 */
-	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
+	maincontext = AllocSetContextCreate(CurrentMemoryContext,
 										"TupleSort main",
 										ALLOCSET_DEFAULT_SIZES);
 
+	/*
+	 * Create a working memory context for one sort operation.  The content of
+	 * this context is deleted by tuplesort_reset.
+	 */
+	sortcontext = AllocSetContextCreate(maincontext,
+										"TupleSort sort",
+										ALLOCSET_DEFAULT_SIZES);
+
 	/*
 	 * Caller tuple (e.g. IndexTuple) memory context.
 	 *
@@ -636,7 +655,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	 * Make the Tuplesortstate within the per-sort context.  This way, we
 	 * don't need a separate pfree() operation for it at shutdown.
 	 */
-	oldcontext = MemoryContextSwitchTo(sortcontext);
+	oldcontext = MemoryContextSwitchTo(maincontext);
 
 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
@@ -654,6 +673,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	state->availMem = state->allowedMem;
 	state->sortcontext = sortcontext;
 	state->tuplecontext = tuplecontext;
+	state->maincontext = maincontext;
 	state->tapeset = NULL;
 
 	state->memtupcount = 0;
@@ -694,13 +714,14 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, bool randomAccess)
+					 int workMem, bool randomAccess,
+					 bool skipAbbrev)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 	AssertArg(nkeys > 0);
 
@@ -742,7 +763,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
 		sortKey->ssup_attno = attNums[i];
 		/* Convey if abbreviation optimization is applicable in principle */
-		sortKey->abbreviate = (i == 0);
+		sortKey->abbreviate = (i == 0) && !skipAbbrev;
 
 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
 	}
@@ -773,7 +794,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -864,7 +885,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -939,7 +960,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 	MemoryContext oldcontext;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -981,7 +1002,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	int16		typlen;
 	bool		typbyval;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1092,16 +1113,12 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
 }
 
 /*
- * tuplesort_end
- *
- *	Release resources and clean up.
+ * tuplesort_free
  *
- * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
- * pointing to garbage.  Be careful not to attempt to use or free such
- * pointers afterwards!
+ *	Internal routine for freeing resources of tuplesort.
  */
-void
-tuplesort_end(Tuplesortstate *state)
+static void
+tuplesort_free(Tuplesortstate *state, bool delete)
 {
 	/* context swap probably not needed, but let's be safe */
 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -1160,7 +1177,98 @@ tuplesort_end(Tuplesortstate *state)
 	 * Free the per-sort memory context, thereby releasing all working memory,
 	 * including the Tuplesortstate struct itself.
 	 */
-	MemoryContextDelete(state->sortcontext);
+	if (delete)
+	{
+		MemoryContextDelete(state->maincontext);
+	}
+	else
+	{
+		MemoryContextResetOnly(state->sortcontext);
+		MemoryContextResetOnly(state->tuplecontext);
+	}
+}
+
+/*
+ * tuplesort_end
+ *
+ *	Release resources and clean up.
+ *
+ * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
+ * pointing to garbage.  Be careful not to attempt to use or free such
+ * pointers afterwards!
+ */
+void
+tuplesort_end(Tuplesortstate *state)
+{
+	tuplesort_free(state, true);
+}
+
+/*
+ * tuplesort_updatemax 
+ *
+ *	Update maximum resource usage statistics.
+ */
+static void
+tuplesort_updatemax(Tuplesortstate *state)
+{
+	int64	spaceUsed;
+	bool	spaceUsedOnDisk;
+
+	/*
+	 * Note: it might seem we should provide both memory and disk usage for a
+	 * disk-based sort.  However, the current code doesn't track memory space
+	 * accurately once we have begun to return tuples to the caller (since we
+	 * don't account for pfree's the caller is expected to do), so we cannot
+	 * rely on availMem in a disk sort.  This does not seem worth the overhead
+	 * to fix.  Is it worth creating an API for the memory context code to
+	 * tell us how much is actually used in sortcontext?
+	 */
+	if (state->tapeset)
+	{
+		spaceUsedOnDisk = true;
+		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
+	}
+	else
+	{
+		spaceUsedOnDisk = false;
+		spaceUsed = state->allowedMem - state->availMem;
+	}
+
+	if (spaceUsed > state->maxSpace)
+	{
+		state->maxSpace = spaceUsed;
+		state->maxSpaceOnDisk = spaceUsedOnDisk;
+		state->maxSpaceStatus = state->status;
+	}
+}
+
+/*
+ * tuplesort_reset
+ *
+ *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
+ *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
+ *	a new sort.  It allows evade recreation of tuple sort (and save resources)
+ *	when sorting multiple small batches.
+ */
+void
+tuplesort_reset(Tuplesortstate *state)
+{
+	tuplesort_updatemax(state);
+	tuplesort_free(state, false);
+	state->status = TSS_INITIAL;
+	state->memtupcount = 0;
+	state->boundUsed = false;
+	state->tapeset = NULL;
+	state->currentRun = 0;
+	state->result_tape = -1;
+	state->bounded = false;
+	state->availMem = state->allowedMem;
+	state->lastReturnedTuple = NULL;
+	state->slabAllocatorUsed = false;
+	state->slabMemoryBegin = NULL;
+	state->slabMemoryEnd = NULL;
+	state->slabFreeHead = NULL;
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 }
 
 /*
@@ -2944,18 +3052,15 @@ tuplesort_get_stats(Tuplesortstate *state,
 	 * to fix.  Is it worth creating an API for the memory context code to
 	 * tell us how much is actually used in sortcontext?
 	 */
-	if (state->tapeset)
-	{
+	tuplesort_updatemax(state);
+
+	if (state->maxSpaceOnDisk)
 		stats->spaceType = SORT_SPACE_TYPE_DISK;
-		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
-	}
 	else
-	{
 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
-		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
-	}
+	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
 
-	switch (state->status)
+	switch (state->maxSpaceStatus)
 	{
 		case TSS_SORTEDINMEM:
 			if (state->boundUsed)
diff --git a/src/include/executor/nodeIncrementalSort.h b/src/include/executor/nodeIncrementalSort.h
new file mode 100644
index 0000000000..b2e4e5061f
--- /dev/null
+++ b/src/include/executor/nodeIncrementalSort.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncrementalSort.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeIncrementalSort.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEINCREMENTALSORT_H
+#define NODEINCREMENTALSORT_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern IncrementalSortState *ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags);
+extern void ExecEndIncrementalSort(IncrementalSortState *node);
+extern void ExecReScanIncrementalSort(IncrementalSortState *node);
+
+/* parallel instrumentation support */
+extern void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortReInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pcxt);
+extern void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node);
+
+#endif   /* NODEINCREMENTALSORT_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 2a4f7407a1..4180f57e88 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1754,6 +1754,20 @@ typedef struct MaterialState
 	Tuplestorestate *tuplestorestate;
 } MaterialState;
 
+
+/* ----------------
+ *	 When performing sorting by multiple keys input dataset could be already
+ *	 presorted by some prefix of these keys.  We call them "skip keys".
+ *	 SkipKeyData represents information about one such key.
+ * ----------------
+ */
+typedef struct SkipKeyData
+{
+	FmgrInfo				flinfo;	/* comparison function info */
+	FunctionCallInfoData	fcinfo;	/* comparison function call info */
+	OffsetNumber			attno;	/* attribute number in tuple */
+} SkipKeyData;
+
 /* ----------------
  *	 Shared memory container for per-worker sort information
  * ----------------
@@ -1782,6 +1796,44 @@ typedef struct SortState
 	SharedSortInfo *shared_info;	/* one entry per worker */
 } SortState;
 
+/* ----------------
+ *	 Shared memory container for per-worker incremental sort information
+ * ----------------
+ */
+typedef struct IncrementalSortInfo
+{
+	TuplesortInstrumentation	sinstrument;
+	int64						groupsCount;
+} IncrementalSortInfo;
+
+typedef struct SharedIncrementalSortInfo
+{
+	int							num_workers;
+	IncrementalSortInfo			sinfo[FLEXIBLE_ARRAY_MEMBER];
+} SharedIncrementalSortInfo;
+
+/* ----------------
+ *	 IncrementalSortState information
+ * ----------------
+ */
+typedef struct IncrementalSortState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	bool		bounded;		/* is the result set bounded? */
+	int64		bound;			/* if bounded, how many tuples are needed */
+	bool		sort_Done;		/* sort completed yet? */
+	bool		finished;		/* fetching tuples from outer node
+								   is finished ? */
+	bool		bounded_Done;	/* value of bounded we did the sort with */
+	int64		bound_Done;		/* value of bound we did the sort with */
+	void	   *tuplesortstate; /* private state of tuplesort.c */
+	SkipKeyData *skipKeys;		/* keys, dataset is presorted by */
+	int64		groupsCount;	/* number of groups with equal skip keys */
+	TupleTableSlot *sampleSlot;	/* slot for sample tuple of sort group */
+	bool		am_worker;		/* are we a worker? */
+	SharedIncrementalSortInfo *shared_info;	/* one entry per worker */
+} IncrementalSortState;
+
 /* ---------------------
  *	GroupState information
  * ---------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2eb3d6d371..b6a9d6c597 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -73,6 +73,7 @@ typedef enum NodeTag
 	T_HashJoin,
 	T_Material,
 	T_Sort,
+	T_IncrementalSort,
 	T_Group,
 	T_Agg,
 	T_WindowAgg,
@@ -125,6 +126,7 @@ typedef enum NodeTag
 	T_HashJoinState,
 	T_MaterialState,
 	T_SortState,
+	T_IncrementalSortState,
 	T_GroupState,
 	T_AggState,
 	T_WindowAggState,
@@ -240,6 +242,7 @@ typedef enum NodeTag
 	T_ProjectionPath,
 	T_ProjectSetPath,
 	T_SortPath,
+	T_IncrementalSortPath,
 	T_GroupPath,
 	T_UpperUniquePath,
 	T_AggPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 74e9fb5f7b..033ec416fe 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -750,6 +750,17 @@ typedef struct Sort
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
 } Sort;
 
+
+/* ----------------
+ *		incremental sort node
+ * ----------------
+ */
+typedef struct IncrementalSort
+{
+	Sort		sort;
+	int			skipCols;		/* number of presorted columns */
+} IncrementalSort;
+
 /* ---------------
  *	 group node -
  *		Used for queries with GROUP BY (but no aggregates) specified.
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 71689b8ed6..0d072fd7c3 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1513,6 +1513,16 @@ typedef struct SortPath
 	Path	   *subpath;		/* path representing input source */
 } SortPath;
 
+/*
+ * IncrementalSortPath
+ */
+typedef struct IncrementalSortPath
+{
+	SortPath	spath;
+	int			skipCols;
+} IncrementalSortPath;
+
+
 /*
  * GroupPath represents grouping (of presorted input)
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index d2fff76653..45cfbee724 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,7 @@ extern bool enable_indexonlyscan;
 extern bool enable_bitmapscan;
 extern bool enable_tidscan;
 extern bool enable_sort;
+extern bool enable_incrementalsort;
 extern bool enable_hashagg;
 extern bool enable_nestloop;
 extern bool enable_material;
@@ -105,8 +106,9 @@ extern void cost_namedtuplestorescan(Path *path, PlannerInfo *root,
 						 RelOptInfo *baserel, ParamPathInfo *param_info);
 extern void cost_recursive_union(Path *runion, Path *nrterm, Path *rterm);
 extern void cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples);
 extern void cost_append(AppendPath *path);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 0072b7aa0d..d6b8841d33 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -188,6 +188,7 @@ typedef enum
 
 extern PathKeysComparison compare_pathkeys(List *keys1, List *keys2);
 extern bool pathkeys_contained_in(List *keys1, List *keys2);
+extern int pathkeys_common(List *keys1, List *keys2);
 extern Path *get_cheapest_path_for_pathkeys(List *paths, List *pathkeys,
 							   Relids required_outer,
 							   CostSelector cost_criterion,
@@ -226,6 +227,7 @@ extern List *select_outer_pathkeys_for_merge(PlannerInfo *root,
 extern List *make_inner_pathkeys_for_merge(PlannerInfo *root,
 							  List *mergeclauses,
 							  List *outer_pathkeys);
+extern int pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys);
 extern List *truncate_useless_pathkeys(PlannerInfo *root,
 						  RelOptInfo *rel,
 						  List *pathkeys);
diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h
index 299c9f846a..43e8ef20dc 100644
--- a/src/include/utils/selfuncs.h
+++ b/src/include/utils/selfuncs.h
@@ -206,6 +206,9 @@ extern void mergejoinscansel(PlannerInfo *root, Node *clause,
 extern double estimate_num_groups(PlannerInfo *root, List *groupExprs,
 					double input_rows, List **pgset);
 
+extern double *estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root,
+										double tuples);
+
 extern void estimate_hash_bucket_stats(PlannerInfo *root,
 						   Node *hashkey, double nbuckets,
 						   Selectivity *mcv_freq,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 5d57c503ab..9a5b7f8d3c 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -90,7 +90,8 @@ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, bool randomAccess);
+					 int workMem, bool randomAccess,
+					 bool skipAbbrev);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
 						Relation indexRel,
 						int workMem, bool randomAccess);
@@ -134,6 +135,8 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples,
 
 extern void tuplesort_end(Tuplesortstate *state);
 
+extern void tuplesort_reset(Tuplesortstate *state);
+
 extern void tuplesort_get_stats(Tuplesortstate *state,
 					TuplesortInstrumentation *stats);
 extern const char *tuplesort_method_name(TuplesortMethod m);
diff --git a/src/test/isolation/expected/drop-index-concurrently-1.out b/src/test/isolation/expected/drop-index-concurrently-1.out
index 75dff56bc4..e11fb617b5 100644
--- a/src/test/isolation/expected/drop-index-concurrently-1.out
+++ b/src/test/isolation/expected/drop-index-concurrently-1.out
@@ -19,9 +19,10 @@ Sort
 step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
 QUERY PLAN     
 
-Sort           
+Incremental Sort
   Sort Key: id, data
-  ->  Seq Scan on test_dc
+  Presorted Key: id
+  ->  Index Scan using test_dc_pkey on test_dc
         Filter: ((data)::text = '34'::text)
 step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
 id             data           
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
index a79f891da7..0926650a0f 100644
--- a/src/test/regress/expected/inherit.out
+++ b/src/test/regress/expected/inherit.out
@@ -1517,6 +1517,7 @@ NOTICE:  drop cascades to table matest1
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
 SELECT thousand, tenthous FROM tenk1
@@ -1657,9 +1658,45 @@ FROM generate_series(1, 3) g(i);
  {3,7,8,10,13,13,16,18,19,22}
 (3 rows)
 
+set enable_incrementalsort = on;
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Merge Append
+   Sort Key: tenk1.thousand, tenk1.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1
+   ->  Incremental Sort
+         Sort Key: tenk1_1.thousand, tenk1_1.thousand
+         Presorted Key: tenk1_1.thousand
+         ->  Index Only Scan using tenk1_thous_tenthous on tenk1 tenk1_1
+(7 rows)
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Merge Append
+   Sort Key: a.thousand, a.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1 a
+   ->  Incremental Sort
+         Sort Key: b.unique2, b.unique2
+         Presorted Key: b.unique2
+         ->  Index Only Scan using tenk1_unique2 on tenk1 b
+(7 rows)
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 --
 -- Check that constraint exclusion works correctly with partitions using
 -- implicit constraints generated from the partition bound information.
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index c9c8f51e1c..898361d6b3 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -76,6 +76,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_gathermerge         | on
  enable_hashagg             | on
  enable_hashjoin            | on
+ enable_incrementalsort     | on
  enable_indexonlyscan       | on
  enable_indexscan           | on
  enable_material            | on
@@ -87,7 +88,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan             | on
  enable_sort                | on
  enable_tidscan             | on
-(15 rows)
+(16 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index 2e42ae115d..7229997144 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -546,6 +546,7 @@ drop table matest0 cascade;
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
@@ -607,9 +608,26 @@ SELECT
     ORDER BY f.i LIMIT 10)
 FROM generate_series(1, 3) g(i);
 
+set enable_incrementalsort = on;
+
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 
 --
 -- Check that constraint exclusion works correctly with partitions using
