diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 683d641fa7..80239faf21 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -1979,28 +1979,62 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
  119
 (10 rows)
 
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, not pushed down.  For this query, essential optimization is top-N
+-- sort.  But it can't be processed at remote side, because we never do LIMIT
+-- push down.  Assuming that sorting is not worth it to push down, CROSS JOIN
+-- is also not pushed down in order to transfer less tuples over network.
 EXPLAIN (VERBOSE, COSTS OFF)
-SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
-                             QUERY PLAN                              
----------------------------------------------------------------------
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+                            QUERY PLAN                            
+------------------------------------------------------------------
  Limit
-   Output: t1.c1, t2.c1
+   Output: t1.c3, t2.c3
    ->  Sort
-         Output: t1.c1, t2.c1
-         Sort Key: t1.c1, t2.c1
+         Output: t1.c3, t2.c3
+         Sort Key: t1.c3, t2.c3
          ->  Nested Loop
-               Output: t1.c1, t2.c1
+               Output: t1.c3, t2.c3
                ->  Foreign Scan on public.ft1 t1
-                     Output: t1.c1
-                     Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
+                     Output: t1.c3
+                     Remote SQL: SELECT c3 FROM "S 1"."T 1"
                ->  Materialize
-                     Output: t2.c1
+                     Output: t2.c3
                      ->  Foreign Scan on public.ft2 t2
-                           Output: t2.c1
-                           Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
+                           Output: t2.c3
+                           Remote SQL: SELECT c3 FROM "S 1"."T 1"
 (15 rows)
 
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+  c3   |  c3   
+-------+-------
+ 00001 | 00101
+ 00001 | 00102
+ 00001 | 00103
+ 00001 | 00104
+ 00001 | 00105
+ 00001 | 00106
+ 00001 | 00107
+ 00001 | 00108
+ 00001 | 00109
+ 00001 | 00110
+(10 rows)
+
+-- CROSS JOIN, pushed down.  Unlike previous query, remote side is able to
+-- return tuples in given order without full sort, but using index scan and
+-- incremental sort.  This is much cheaper than full sort on local side, even
+-- despite we don't know LIMIT on remote side.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+                                                                            QUERY PLAN                                                                             
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: t1.c1, t2.c1
+   ->  Foreign Scan
+         Output: t1.c1, t2.c1
+         Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2)
+         Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) ORDER BY r1."C 1" ASC NULLS LAST, r2."C 1" ASC NULLS LAST
+(6 rows)
+
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
  c1 | c1  
 ----+-----
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 3c3c5c705f..c324394942 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -508,7 +508,17 @@ SELECT t1.c1 FROM ft1 t1 WHERE EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c1)
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, not pushed down.  For this query, essential optimization is top-N
+-- sort.  But it can't be processed at remote side, because we never do LIMIT
+-- push down.  Assuming that sorting is not worth it to push down, CROSS JOIN
+-- is also not pushed down in order to transfer less tuples over network.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+-- CROSS JOIN, pushed down.  Unlike previous query, remote side is able to
+-- return tuples in given order without full sort, but using index scan and
+-- incremental sort.  This is much cheaper than full sort on local side, even
+-- despite we don't know LIMIT on remote side.
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e4a01699e4..fdcdc6683f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3553,6 +3553,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-incrementalsort" xreflabel="enable_incrementalsort">
+      <term><varname>enable_incrementalsort</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_incrementalsort</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of incremental sort
+        steps. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-indexscan" xreflabel="enable_indexscan">
       <term><varname>enable_indexscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 79e6985d0d..6cf5f8bad1 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -80,6 +80,8 @@ static void show_upper_qual(List *qual, const char *qlabel,
 				ExplainState *es);
 static void show_sort_keys(SortState *sortstate, List *ancestors,
 			   ExplainState *es);
+static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+					   List *ancestors, ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
@@ -93,7 +95,7 @@ static void show_grouping_set_keys(PlanState *planstate,
 static void show_group_keys(GroupState *gstate, List *ancestors,
 				ExplainState *es);
 static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es);
 static void show_sortorder_options(StringInfo buf, Node *sortexpr,
@@ -101,6 +103,8 @@ static void show_sortorder_options(StringInfo buf, Node *sortexpr,
 static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
 				 List *ancestors, ExplainState *es);
 static void show_sort_info(SortState *sortstate, ExplainState *es);
+static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
+									   ExplainState *es);
 static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 					ExplainState *es);
@@ -1011,6 +1015,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Sort:
 			pname = sname = "Sort";
 			break;
+		case T_IncrementalSort:
+			pname = sname = "Incremental Sort";
+			break;
 		case T_Group:
 			pname = sname = "Group";
 			break;
@@ -1611,6 +1618,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
 			show_sort_info(castNode(SortState, planstate), es);
 			break;
+		case T_IncrementalSort:
+			show_incremental_sort_keys(castNode(IncrementalSortState, planstate),
+									   ancestors, es);
+			show_incremental_sort_info(castNode(IncrementalSortState, planstate),
+									   es);
+			break;
 		case T_MergeAppend:
 			show_merge_append_keys(castNode(MergeAppendState, planstate),
 								   ancestors, es);
@@ -1936,14 +1949,37 @@ static void
 show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
 {
 	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
+	int			skipCols;
+
+	if (IsA(plan, IncrementalSort))
+		skipCols = ((IncrementalSort *) plan)->skipCols;
+	else
+		skipCols = 0;
 
 	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, skipCols, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
 }
 
+/*
+ * Show the sort keys for a IncrementalSort node.
+ */
+static void
+show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+						   List *ancestors, ExplainState *es)
+{
+	IncrementalSort *plan = (IncrementalSort *) incrsortstate->ss.ps.plan;
+
+	show_sort_group_keys((PlanState *) incrsortstate, "Sort Key",
+						 plan->sort.numCols, plan->skipCols,
+						 plan->sort.sortColIdx,
+						 plan->sort.sortOperators, plan->sort.collations,
+						 plan->sort.nullsFirst,
+						 ancestors, es);
+}
+
 /*
  * Likewise, for a MergeAppend node.
  */
@@ -1954,7 +1990,7 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
 
 	show_sort_group_keys((PlanState *) mstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, 0, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
@@ -1978,7 +2014,7 @@ show_agg_keys(AggState *astate, List *ancestors,
 			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
 		else
 			show_sort_group_keys(outerPlanState(astate), "Group Key",
-								 plan->numCols, plan->grpColIdx,
+								 plan->numCols, 0, plan->grpColIdx,
 								 NULL, NULL, NULL,
 								 ancestors, es);
 
@@ -2047,7 +2083,7 @@ show_grouping_set_keys(PlanState *planstate,
 	if (sortnode)
 	{
 		show_sort_group_keys(planstate, "Sort Key",
-							 sortnode->numCols, sortnode->sortColIdx,
+							 sortnode->numCols, 0, sortnode->sortColIdx,
 							 sortnode->sortOperators, sortnode->collations,
 							 sortnode->nullsFirst,
 							 ancestors, es);
@@ -2104,7 +2140,7 @@ show_group_keys(GroupState *gstate, List *ancestors,
 	/* The key columns refer to the tlist of the child plan */
 	ancestors = lcons(gstate, ancestors);
 	show_sort_group_keys(outerPlanState(gstate), "Group Key",
-						 plan->numCols, plan->grpColIdx,
+						 plan->numCols, 0, plan->grpColIdx,
 						 NULL, NULL, NULL,
 						 ancestors, es);
 	ancestors = list_delete_first(ancestors);
@@ -2117,13 +2153,14 @@ show_group_keys(GroupState *gstate, List *ancestors,
  */
 static void
 show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es)
 {
 	Plan	   *plan = planstate->plan;
 	List	   *context;
 	List	   *result = NIL;
+	List	   *resultPresorted = NIL;
 	StringInfoData sortkeybuf;
 	bool		useprefix;
 	int			keyno;
@@ -2163,9 +2200,13 @@ show_sort_group_keys(PlanState *planstate, const char *qlabel,
 								   nullsFirst[keyno]);
 		/* Emit one property-list item per sort key */
 		result = lappend(result, pstrdup(sortkeybuf.data));
+		if (keyno < nPresortedKeys)
+			resultPresorted = lappend(resultPresorted, exprstr);
 	}
 
 	ExplainPropertyList(qlabel, result, es);
+	if (nPresortedKeys > 0)
+		ExplainPropertyList("Presorted Key", resultPresorted, es);
 }
 
 /*
@@ -2373,6 +2414,95 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 	}
 }
 
+/*
+ * If it's EXPLAIN ANALYZE, show tuplesort stats for a incremental sort node
+ */
+static void
+show_incremental_sort_info(IncrementalSortState *incrsortstate,
+						   ExplainState *es)
+{
+	if (es->analyze && incrsortstate->sort_Done &&
+		incrsortstate->tuplesortstate != NULL)
+	{
+		Tuplesortstate *state = (Tuplesortstate *) incrsortstate->tuplesortstate;
+		TuplesortInstrumentation stats;
+		const char *sortMethod;
+		const char *spaceType;
+		long		spaceUsed;
+
+		tuplesort_get_stats(state, &stats);
+		sortMethod = tuplesort_method_name(stats.sortMethod);
+		spaceType = tuplesort_space_type_name(stats.spaceType);
+		spaceUsed = stats.spaceUsed;
+
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Method: %s  %s: %ldkB\n",
+							 sortMethod, spaceType, spaceUsed);
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Groups: %ld\n",
+							 incrsortstate->groupsCount);
+		}
+		else
+		{
+			ExplainPropertyText("Sort Method", sortMethod, es);
+			ExplainPropertyLong("Sort Space Used", spaceUsed, es);
+			ExplainPropertyText("Sort Space Type", spaceType, es);
+			ExplainPropertyLong("Sort Groups: %ld",
+								incrsortstate->groupsCount, es);
+		}
+	}
+
+	if (incrsortstate->shared_info != NULL)
+	{
+		int			n;
+		bool		opened_group = false;
+
+		for (n = 0; n < incrsortstate->shared_info->num_workers; n++)
+		{
+			TuplesortInstrumentation *sinstrument;
+			const char *sortMethod;
+			const char *spaceType;
+			long		spaceUsed;
+			int64		groupsCount;
+
+			sinstrument = &incrsortstate->shared_info->sinfo[n].sinstrument;
+			groupsCount = incrsortstate->shared_info->sinfo[n].groupsCount;
+			if (sinstrument->sortMethod == SORT_TYPE_STILL_IN_PROGRESS)
+				continue;		/* ignore any unfilled slots */
+			sortMethod = tuplesort_method_name(sinstrument->sortMethod);
+			spaceType = tuplesort_space_type_name(sinstrument->spaceType);
+			spaceUsed = sinstrument->spaceUsed;
+
+			if (es->format == EXPLAIN_FORMAT_TEXT)
+			{
+				appendStringInfoSpaces(es->str, es->indent * 2);
+				appendStringInfo(es->str,
+								 "Worker %d:  Sort Method: %s  %s: %ldkB  Groups: %ld\n",
+								 n, sortMethod, spaceType, spaceUsed, groupsCount);
+			}
+			else
+			{
+				if (!opened_group)
+				{
+					ExplainOpenGroup("Workers", "Workers", false, es);
+					opened_group = true;
+				}
+				ExplainOpenGroup("Worker", NULL, true, es);
+				ExplainPropertyInteger("Worker Number", n, es);
+				ExplainPropertyText("Sort Method", sortMethod, es);
+				ExplainPropertyLong("Sort Space Used", spaceUsed, es);
+				ExplainPropertyText("Sort Space Type", spaceType, es);
+				ExplainPropertyLong("Sort Groups", groupsCount, es);
+				ExplainCloseGroup("Worker", NULL, true, es);
+			}
+		}
+		if (opened_group)
+			ExplainCloseGroup("Workers", "Workers", false, es);
+	}
+}
+
 /*
  * Show information on hash buckets/batches.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..572aca05fb 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,8 +24,8 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
        nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
-       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
-       nodeValuesscan.o \
+       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o \
+       nodeSort.o nodeIncrementalSort.o nodeUnique.o nodeValuesscan.o \
        nodeCtescan.o nodeNamedtuplestorescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
        nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 9e78421978..34e05330ea 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -31,6 +31,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -253,6 +254,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecReScanIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecReScanGroup((GroupState *) node);
 			break;
@@ -525,8 +530,12 @@ ExecSupportsBackwardScan(Plan *node)
 		case T_CteScan:
 		case T_Material:
 		case T_Sort:
+			/* these don't evaluate tlist */
 			return true;
 
+		case T_IncrementalSort:
+			return false;
+
 		case T_LockRows:
 		case T_Limit:
 			return ExecSupportsBackwardScan(outerPlan(node));
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8b72ebab9..490d6dd76c 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -32,6 +32,7 @@
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -280,6 +281,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
+			break;
 
 		default:
 			break;
@@ -493,6 +498,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
+			break;
 
 		default:
 			break;
@@ -918,6 +927,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 		case T_SortState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortReInitializeDSM((IncrementalSortState *) planstate, pcxt);
+			break;
 
 		default:
 			break;
@@ -976,6 +989,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_SortState:
 			ExecSortRetrieveInstrumentation((SortState *) planstate);
 			break;
+		case T_IncrementalSortState:
+			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
+			break;
 		case T_HashState:
 			ExecHashRetrieveInstrumentation((HashState *) planstate);
 			break;
@@ -1225,6 +1241,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware */
+			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
+												pwcxt);
+			break;
 
 		default:
 			break;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 43a27a9af2..17163448a3 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -88,6 +88,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 												estate, eflags);
 			break;
 
+		case T_IncrementalSort:
+			result = (PlanState *) ExecInitIncrementalSort((IncrementalSort *) node,
+														   estate, eflags);
+			break;
+
 		case T_Group:
 			result = (PlanState *) ExecInitGroup((Group *) node,
 												 estate, eflags);
@@ -695,6 +701,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecEndIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecEndGroup((GroupState *) node);
 			break;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 46ee880415..30855c3fe7 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -667,6 +667,7 @@ initialize_phase(AggState *aggstate, int newphase)
 												  sortnode->collations,
 												  sortnode->nullsFirst,
 												  work_mem,
+												  false,
 												  false);
 	}
 
@@ -754,7 +755,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 									 pertrans->sortOperators,
 									 pertrans->sortCollations,
 									 pertrans->sortNullsFirst,
-									 work_mem, false);
+									 work_mem, false, false);
 	}
 
 	/*
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c
new file mode 100644
index 0000000000..a8e55e5e2d
--- /dev/null
+++ b/src/backend/executor/nodeIncrementalSort.c
@@ -0,0 +1,646 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncremenalSort.c
+ *	  Routines to handle incremental sorting of relations.
+ *
+ * DESCRIPTION
+ *
+ *		Incremental sort is a specially optimized kind of multikey sort used
+ *		when the input is already presorted by a prefix of the required keys
+ *		list.  Thus, when it's required to sort by (key1, key2 ... keyN) and
+ *		result is already sorted by (key1, key2 ... keyM), M < N, we sort groups
+ *		where values of (key1, key2 ... keyM) are equal.
+ *
+ *		Consider the following example.  We have input tuples consisting from
+ *		two integers (x, y) already presorted by x, while it's required to
+ *		sort them by x and y.  Let input tuples be following.
+ *
+ *		(1, 5)
+ *		(1, 2)
+ *		(2, 10)
+ *		(2, 1)
+ *		(2, 5)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort algorithm would sort by y following groups, which have
+ *		equal x, individually:
+ *			(1, 5) (1, 2)
+ *			(2, 10) (2, 1) (2, 5)
+ *			(3, 3) (3, 7)
+ *
+ *		After sorting these groups and putting them altogether, we would get
+ *		following tuple set which is actually sorted by x and y.
+ *
+ *		(1, 2)
+ *		(1, 5)
+ *		(2, 1)
+ *		(2, 5)
+ *		(2, 10)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort is faster than full sort on large datasets.  But
+ *		the case of most huge benefit of incremental sort is queries with
+ *		LIMIT because incremental sort can return first tuples without reading
+ *		whole input dataset.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeIncremenalSort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/execdebug.h"
+#include "executor/nodeIncrementalSort.h"
+#include "miscadmin.h"
+#include "utils/lsyscache.h"
+#include "utils/tuplesort.h"
+
+/*
+ * Prepare information for skipKeys comparison.
+ */
+static void
+prepareSkipCols(IncrementalSortState *node)
+{
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	int					skipCols,
+						i;
+
+	Assert(IsA(plannode, IncrementalSort));
+	skipCols = plannode->skipCols;
+
+	node->skipKeys = (SkipKeyData *) palloc(skipCols * sizeof(SkipKeyData));
+
+	for (i = 0; i < skipCols; i++)
+	{
+		Oid equalityOp, equalityFunc;
+		SkipKeyData *key;
+
+		key = &node->skipKeys[i];
+		key->attno = plannode->sort.sortColIdx[i];
+
+		equalityOp = get_equality_op_for_ordering_op(
+										plannode->sort.sortOperators[i], NULL);
+		if (!OidIsValid(equalityOp))
+			elog(ERROR, "missing equality operator for ordering operator %u",
+					plannode->sort.sortOperators[i]);
+
+		equalityFunc = get_opcode(equalityOp);
+		if (!OidIsValid(equalityFunc))
+			elog(ERROR, "missing function for operator %u", equalityOp);
+
+		/* Lookup the comparison function */
+		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
+
+		/* We can initialize the callinfo just once and re-use it */
+		InitFunctionCallInfoData(key->fcinfo, &key->flinfo, 2,
+								plannode->sort.collations[i], NULL, NULL);
+		key->fcinfo.argnull[0] = false;
+		key->fcinfo.argnull[1] = false;
+	}
+}
+
+/*
+ * Check if first "skipCols" sort values are equal.
+ */
+static bool
+cmpSortSkipCols(IncrementalSortState *node, TupleTableSlot *a,
+															TupleTableSlot *b)
+{
+	int n, i;
+
+	Assert(IsA(node->ss.ps.plan, IncrementalSort));
+
+	n = ((IncrementalSort *) node->ss.ps.plan)->skipCols;
+
+	for (i = 0; i < n; i++)
+	{
+		Datum datumA, datumB, result;
+		bool isnullA, isnullB;
+		AttrNumber attno = node->skipKeys[i].attno;
+		SkipKeyData *key;
+
+		datumA = slot_getattr(a, attno, &isnullA);
+		datumB = slot_getattr(b, attno, &isnullB);
+
+		/* Special case for NULL-vs-NULL, else use standard comparison */
+		if (isnullA || isnullB)
+		{
+			if (isnullA == isnullB)
+				continue;
+			else
+				return false;
+		}
+
+		key = &node->skipKeys[i];
+
+		key->fcinfo.arg[0] = datumA;
+		key->fcinfo.arg[1] = datumB;
+
+		/* just for paranoia's sake, we reset isnull each time */
+		key->fcinfo.isnull = false;
+
+		result = FunctionCallInvoke(&key->fcinfo);
+
+		/* Check for null result, since caller is clearly not expecting one */
+		if (key->fcinfo.isnull)
+			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
+
+		if (!DatumGetBool(result))
+			return false;
+	}
+	return true;
+}
+
+/*
+ * Copying of tuples to the node->sampleSlot introduces some overhead.  It's
+ * especially notable when groups are containing one or few tuples.  In order
+ * to cope this problem we don't copy sample tuple before the group contains
+ * at least MIN_GROUP_SIZE of tuples.  Surely, it might reduce efficiency of
+ * incremental sort, but it reduces the probability of regression.
+ */
+#define MIN_GROUP_SIZE 32
+
+/* ----------------------------------------------------------------
+ *		ExecIncrementalSort
+ *
+ *		Assuming that outer subtree returns tuple presorted by some prefix
+ *		of target sort columns, performs incremental sort.  It fetches
+ *		groups of tuples where prefix sort columns are equal and sorts them
+ *		using tuplesort.  This approach allows to evade sorting of whole
+ *		dataset.  Besides taking less memory and being faster, it allows to
+ *		start returning tuples before fetching full dataset from outer
+ *		subtree.
+ *
+ *		Conditions:
+ *		  -- none.
+ *
+ *		Initial States:
+ *		  -- the outer child is prepared to return the first tuple.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecIncrementalSort(PlanState *pstate)
+{
+	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
+	EState			   *estate;
+	ScanDirection		dir;
+	Tuplesortstate	   *tuplesortstate;
+	TupleTableSlot	   *slot;
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	PlanState		   *outerNode;
+	TupleDesc			tupDesc;
+	int64				nTuples = 0;
+
+	/*
+	 * get state info from node
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "entering routine");
+
+	estate = node->ss.ps.state;
+	dir = estate->es_direction;
+	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
+
+	/*
+	 * Return next tuple from sorted set if any.
+	 */
+	if (node->sort_Done)
+	{
+		slot = node->ss.ps.ps_ResultTupleSlot;
+		if (tuplesort_gettupleslot(tuplesortstate,
+									  ScanDirectionIsForward(dir),
+									  false, slot, NULL) || node->finished)
+			return slot;
+	}
+
+	/*
+	 * If first time through, read all tuples from outer plan and pass them to
+	 * tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
+	 */
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "sorting subplan");
+
+	/*
+	 * Want to scan subplan in the forward direction while creating the
+	 * sorted data.
+	 */
+	estate->es_direction = ForwardScanDirection;
+
+	/*
+	 * Initialize tuplesort module.
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "calling tuplesort_begin");
+
+	outerNode = outerPlanState(node);
+	tupDesc = ExecGetResultType(outerNode);
+
+	if (node->tuplesortstate == NULL)
+	{
+		/*
+		 * We are going to process the first group of presorted data.
+		 * Initialize support structures for cmpSortSkipCols - already
+		 * sorted columns.
+		 */
+		prepareSkipCols(node);
+
+		/*
+		 * Pass all the columns to tuplesort.  We pass to tuple sort groups
+		 * of at least MIN_GROUP_SIZE size.  Thus, these groups doesn't
+		 * necessary have equal value of the first column.  We unlikely will
+		 * have huge groups with incremental sort.  Therefore usage of
+		 * abbreviated keys would be likely a waste of time.
+		 */
+		tuplesortstate = tuplesort_begin_heap(
+									tupDesc,
+									plannode->sort.numCols,
+									plannode->sort.sortColIdx,
+									plannode->sort.sortOperators,
+									plannode->sort.collations,
+									plannode->sort.nullsFirst,
+									work_mem,
+									false,
+									true);
+		node->tuplesortstate = (void *) tuplesortstate;
+	}
+	else
+	{
+		/* Next group of presorted data */
+		tuplesort_reset((Tuplesortstate *) node->tuplesortstate);
+	}
+	node->groupsCount++;
+
+	/* Calculate remaining bound for bounded sort */
+	if (node->bounded)
+		tuplesort_set_bound(tuplesortstate, node->bound - node->bound_Done);
+
+	/* Put saved tuple to tuplesort if any */
+	if (!TupIsNull(node->sampleSlot))
+	{
+		tuplesort_puttupleslot(tuplesortstate, node->sampleSlot);
+		ExecClearTuple(node->sampleSlot);
+		nTuples++;
+	}
+
+	/*
+	 * Put next group of tuples where skipCols sort values are equal to
+	 * tuplesort.
+	 */
+	for (;;)
+	{
+		slot = ExecProcNode(outerNode);
+
+		if (TupIsNull(slot))
+		{
+			node->finished = true;
+			break;
+		}
+
+		/* Put next group of presorted data to the tuplesort */
+		if (nTuples < MIN_GROUP_SIZE)
+		{
+			tuplesort_puttupleslot(tuplesortstate, slot);
+
+			/* Save last tuple in minimal group */
+			if (nTuples == MIN_GROUP_SIZE - 1)
+				ExecCopySlot(node->sampleSlot, slot);
+			nTuples++;
+		}
+		else
+		{
+			/* Iterate while skip cols are the same as in saved tuple */
+			if (cmpSortSkipCols(node, node->sampleSlot, slot))
+			{
+				tuplesort_puttupleslot(tuplesortstate, slot);
+				nTuples++;
+			}
+			else
+			{
+				ExecCopySlot(node->sampleSlot, slot);
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Complete the sort.
+	 */
+	tuplesort_performsort(tuplesortstate);
+
+	/*
+	 * restore to user specified direction
+	 */
+	estate->es_direction = dir;
+
+	/*
+	 * finally set the sorted flag to true
+	 */
+	node->sort_Done = true;
+	node->bounded_Done = node->bounded;
+	if (node->shared_info && node->am_worker)
+	{
+		TuplesortInstrumentation *si;
+
+		Assert(IsParallelWorker());
+		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+		si = &node->shared_info->sinfo[ParallelWorkerNumber].sinstrument;
+		tuplesort_get_stats(tuplesortstate, si);
+		node->shared_info->sinfo[ParallelWorkerNumber].groupsCount =
+															node->groupsCount;
+	}
+
+	/*
+	 * Adjust bound_Done with number of tuples we've actually sorted.
+	 */
+	if (node->bounded)
+	{
+		if (node->finished)
+			node->bound_Done = node->bound;
+		else
+			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
+	}
+
+	SO1_printf("ExecIncrementalSort: %s\n", "sorting done");
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "retrieving tuple from tuplesort");
+
+	/*
+	 * Get the first or next tuple from tuplesort. Returns NULL if no more
+	 * tuples.
+	 */
+	slot = node->ss.ps.ps_ResultTupleSlot;
+	(void) tuplesort_gettupleslot(tuplesortstate,
+								  ScanDirectionIsForward(dir),
+								  false, slot, NULL);
+	return slot;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecInitIncrementalSort
+ *
+ *		Creates the run-time state information for the sort node
+ *		produced by the planner and initializes its outer subtree.
+ * ----------------------------------------------------------------
+ */
+IncrementalSortState *
+ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
+{
+	IncrementalSortState   *incrsortstate;
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "initializing sort node");
+
+	/*
+	 * Incremental sort can't be used with either EXEC_FLAG_REWIND,
+	 * EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK, because we hold only current
+	 * bucket in tuplesortstate.
+	 */
+	Assert((eflags & (EXEC_FLAG_REWIND |
+					  EXEC_FLAG_BACKWARD |
+					  EXEC_FLAG_MARK)) == 0);
+
+	/*
+	 * create state structure
+	 */
+	incrsortstate = makeNode(IncrementalSortState);
+	incrsortstate->ss.ps.plan = (Plan *) node;
+	incrsortstate->ss.ps.state = estate;
+	incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
+
+	incrsortstate->bounded = false;
+	incrsortstate->sort_Done = false;
+	incrsortstate->finished = false;
+	incrsortstate->tuplesortstate = NULL;
+	incrsortstate->sampleSlot = NULL;
+	incrsortstate->bound_Done = 0;
+	incrsortstate->groupsCount = 0;
+	incrsortstate->skipKeys = NULL;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * Sort nodes don't initialize their ExprContexts because they never call
+	 * ExecQual or ExecProject.
+	 */
+
+	/*
+	 * tuple table initialization
+	 *
+	 * sort nodes only return scan tuples from their sorted relation.
+	 */
+	ExecInitResultTupleSlot(estate, &incrsortstate->ss.ps);
+	ExecInitScanTupleSlot(estate, &incrsortstate->ss);
+
+	/*
+	 * initialize child nodes
+	 *
+	 * We shield the child node from the need to support REWIND, BACKWARD, or
+	 * MARK/RESTORE.
+	 */
+	eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
+
+	outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+	/*
+	 * initialize tuple type.  no need to initialize projection info because
+	 * this node doesn't do projections.
+	 */
+	ExecAssignResultTypeFromTL(&incrsortstate->ss.ps);
+	ExecAssignScanTypeFromOuterPlan(&incrsortstate->ss);
+	incrsortstate->ss.ps.ps_ProjInfo = NULL;
+
+	/* make standalone slot to store previous tuple from outer node */
+	incrsortstate->sampleSlot = MakeSingleTupleTableSlot(
+							ExecGetResultType(outerPlanState(incrsortstate)));
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "sort node initialized");
+
+	return incrsortstate;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecEndIncrementalSort(node)
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndIncrementalSort(IncrementalSortState *node)
+{
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "shutting down sort node");
+
+	/*
+	 * clean out the tuple table
+	 */
+	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	/* must drop stanalone tuple slot from outer node */
+	ExecDropSingleTupleTableSlot(node->sampleSlot);
+
+	/*
+	 * Release tuplesort resources
+	 */
+	if (node->tuplesortstate != NULL)
+		tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+
+	/*
+	 * shut down the subplan
+	 */
+	ExecEndNode(outerPlanState(node));
+
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "sort node shutdown");
+}
+
+void
+ExecReScanIncrementalSort(IncrementalSortState *node)
+{
+	PlanState  *outerPlan = outerPlanState(node);
+
+	/*
+	 * If we haven't sorted yet, just return. If outerplan's chgParam is not
+	 * NULL then it will be re-scanned by ExecProcNode, else no reason to
+	 * re-scan it at all.
+	 */
+	if (!node->sort_Done)
+		return;
+
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+
+	/*
+	 * If subnode is to be rescanned then we forget previous sort results; we
+	 * have to re-read the subplan and re-sort.  Also must re-sort if the
+	 * bounded-sort parameters changed or we didn't select randomAccess.
+	 *
+	 * Otherwise we can just rewind and rescan the sorted output.
+	 */
+	node->sort_Done = false;
+	tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+	node->bound_Done = 0;
+
+	/*
+	 * if chgParam of subnode is not null then plan will be re-scanned by
+	 * first ExecProcNode.
+	 */
+	if (outerPlan->chgParam == NULL)
+		ExecReScan(outerPlan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecSortEstimate
+ *
+ *		Estimate space required to propagate sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
+	size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeDSM
+ *
+ *		Initialize DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ pcxt->nworkers * sizeof(IncrementalSortInfo);
+	node->shared_info = shm_toc_allocate(pcxt->toc, size);
+	/* ensure any unfilled slots will contain zeroes */
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortReInitializeDSM
+ *
+ *		Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortReInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	/* If there's any instrumentation space, clear it for next time */
+	if (node->shared_info != NULL)
+	{
+		memset(node->shared_info->sinfo, 0,
+			   node->shared_info->num_workers * sizeof(IncrementalSortInfo));
+	}
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeWorker
+ *
+ *		Attach worker to DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
+{
+	node->shared_info =
+		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	node->am_worker = true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRetrieveInstrumentation
+ *
+ *		Transfer sort statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
+{
+	Size		size;
+	SharedIncrementalSortInfo *si;
+
+	if (node->shared_info == NULL)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ node->shared_info->num_workers * sizeof(IncrementalSortInfo);
+	si = palloc(size);
+	memcpy(si, node->shared_info, size);
+	node->shared_info = si;
+}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 9c68de8565..90c82af17f 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -93,7 +93,8 @@ ExecSort(PlanState *pstate)
 											  plannode->collations,
 											  plannode->nullsFirst,
 											  work_mem,
-											  node->randomAccess);
+											  node->randomAccess,
+											  false);
 		if (node->bounded)
 			tuplesort_set_bound(tuplesortstate, node->bound);
 		node->tuplesortstate = (void *) tuplesortstate;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ddbbc79823..94d5ba0e41 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -919,6 +919,24 @@ _copyMaterial(const Material *from)
 }
 
 
+/*
+ * CopySortFields
+ *
+ *		This function copies the fields of the Sort node.  It is used by
+ *		all the copy functions for classes which inherit from Sort.
+ */
+static void
+CopySortFields(const Sort *from, Sort *newnode)
+{
+	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+	COPY_SCALAR_FIELD(numCols);
+	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+}
+
 /*
  * _copySort
  */
@@ -930,13 +948,29 @@ _copySort(const Sort *from)
 	/*
 	 * copy node superclass fields
 	 */
-	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+	CopySortFields(from, newnode);
 
-	COPY_SCALAR_FIELD(numCols);
-	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
-	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+	return newnode;
+}
+
+
+/*
+ * _copyIncrementalSort
+ */
+static IncrementalSort *
+_copyIncrementalSort(const IncrementalSort *from)
+{
+	IncrementalSort	   *newnode = makeNode(IncrementalSort);
+
+	/*
+	 * copy node superclass fields
+	 */
+	CopySortFields((const Sort *) from, (Sort *) newnode);
+
+	/*
+	 * copy remainder of node
+	 */
+	COPY_SCALAR_FIELD(skipCols);
 
 	return newnode;
 }
@@ -4817,6 +4851,9 @@ copyObjectImpl(const void *from)
 		case T_Sort:
 			retval = _copySort(from);
 			break;
+		case T_IncrementalSort:
+			retval = _copyIncrementalSort(from);
+			break;
 		case T_Group:
 			retval = _copyGroup(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 5e72df137e..415a9e9b19 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -870,12 +870,10 @@ _outMaterial(StringInfo str, const Material *node)
 }
 
 static void
-_outSort(StringInfo str, const Sort *node)
+_outSortInfo(StringInfo str, const Sort *node)
 {
 	int			i;
 
-	WRITE_NODE_TYPE("SORT");
-
 	_outPlanInfo(str, (const Plan *) node);
 
 	WRITE_INT_FIELD(numCols);
@@ -897,6 +895,24 @@ _outSort(StringInfo str, const Sort *node)
 		appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
 }
 
+static void
+_outSort(StringInfo str, const Sort *node)
+{
+	WRITE_NODE_TYPE("SORT");
+
+	_outSortInfo(str, node);
+}
+
+static void
+_outIncrementalSort(StringInfo str, const IncrementalSort *node)
+{
+	WRITE_NODE_TYPE("INCREMENTALSORT");
+
+	_outSortInfo(str, (const Sort *) node);
+
+	WRITE_INT_FIELD(skipCols);
+}
+
 static void
 _outUnique(StringInfo str, const Unique *node)
 {
@@ -3739,6 +3755,9 @@ outNode(StringInfo str, const void *obj)
 			case T_Sort:
 				_outSort(str, obj);
 				break;
+			case T_IncrementalSort:
+				_outIncrementalSort(str, obj);
+				break;
 			case T_Unique:
 				_outUnique(str, obj);
 				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 9925866b53..9f64d50103 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2060,12 +2060,13 @@ _readMaterial(void)
 }
 
 /*
- * _readSort
+ * ReadCommonSort
+ *	Assign the basic stuff of all nodes that inherit from Sort
  */
-static Sort *
-_readSort(void)
+static void
+ReadCommonSort(Sort *local_node)
 {
-	READ_LOCALS(Sort);
+	READ_TEMP_LOCALS();
 
 	ReadCommonPlan(&local_node->plan);
 
@@ -2074,6 +2075,32 @@ _readSort(void)
 	READ_OID_ARRAY(sortOperators, local_node->numCols);
 	READ_OID_ARRAY(collations, local_node->numCols);
 	READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+}
+
+/*
+ * _readSort
+ */
+static Sort *
+_readSort(void)
+{
+	READ_LOCALS_NO_FIELDS(Sort);
+
+	ReadCommonSort(local_node);
+
+	READ_DONE();
+}
+
+/*
+ * _readIncrementalSort
+ */
+static IncrementalSort *
+_readIncrementalSort(void)
+{
+	READ_LOCALS(IncrementalSort);
+
+	ReadCommonSort(&local_node->sort);
+
+	READ_INT_FIELD(skipCols);
 
 	READ_DONE();
 }
@@ -2636,6 +2663,8 @@ parseNodeString(void)
 		return_value = _readMaterial();
 	else if (MATCH("SORT", 4))
 		return_value = _readSort();
+	else if (MATCH("INCREMENTALSORT", 15))
+		return_value = _readIncrementalSort();
 	else if (MATCH("GROUP", 5))
 		return_value = _readGroup();
 	else if (MATCH("AGG", 3))
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 12a6ee4a22..e96c5fe137 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3613,6 +3613,10 @@ print_path(PlannerInfo *root, Path *path, int indent)
 			ptype = "Sort";
 			subpath = ((SortPath *) path)->subpath;
 			break;
+		case T_IncrementalSortPath:
+			ptype = "IncrementalSort";
+			subpath = ((SortPath *) path)->subpath;
+			break;
 		case T_GroupPath:
 			ptype = "Group";
 			subpath = ((GroupPath *) path)->subpath;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 8679b14b29..fd0ba203d5 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -121,6 +121,7 @@ bool		enable_indexonlyscan = true;
 bool		enable_bitmapscan = true;
 bool		enable_tidscan = true;
 bool		enable_sort = true;
+bool		enable_incrementalsort = true;
 bool		enable_hashagg = true;
 bool		enable_nestloop = true;
 bool		enable_material = true;
@@ -1605,6 +1606,13 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  *	  Determines and returns the cost of sorting a relation, including
  *	  the cost of reading the input data.
  *
+ * Sort could be either full sort of relation or incremental sort when we already
+ * have data presorted by some of required pathkeys.  In the second case
+ * we estimate number of groups which source data is divided to by presorted
+ * pathkeys.  And then estimate cost of sorting each individual group assuming
+ * data is divided into group uniformly.  Also, if LIMIT is specified then
+ * we have to pull from source and sort only some of total groups.
+ *
  * If the total volume of data to sort is less than sort_mem, we will do
  * an in-memory sort, which requires no I/O and about t*log2(t) tuple
  * comparisons for t tuples.
@@ -1631,7 +1639,9 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  * work that has to be done to prepare the inputs to the comparison operators.
  *
  * 'pathkeys' is a list of sort keys
- * 'input_cost' is the total cost for reading the input data
+ * 'presorted_keys' is a number of pathkeys already presorted in given path
+ * 'input_startup_cost' is the startup cost for reading the input data
+ * 'input_total_cost' is the total cost for reading the input data
  * 'tuples' is the number of tuples in the relation
  * 'width' is the average tuple width in bytes
  * 'comparison_cost' is the extra cost per comparison, if any
@@ -1647,19 +1657,28 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  */
 void
 cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples)
 {
-	Cost		startup_cost = input_cost;
-	Cost		run_cost = 0;
+	Cost		startup_cost = input_startup_cost;
+	Cost		run_cost = 0,
+				rest_cost,
+				group_cost,
+				input_run_cost = input_total_cost - input_startup_cost;
 	double		input_bytes = relation_byte_size(tuples, width);
 	double		output_bytes;
 	double		output_tuples;
+	double		num_groups,
+				group_input_bytes,
+				group_tuples;
 	long		sort_mem_bytes = sort_mem * 1024L;
 
 	if (!enable_sort)
 		startup_cost += disable_cost;
+	if (!enable_incrementalsort)
+		presorted_keys = 0;
 
 	path->rows = tuples;
 
@@ -1685,13 +1704,50 @@ cost_sort(Path *path, PlannerInfo *root,
 		output_bytes = input_bytes;
 	}
 
-	if (output_bytes > sort_mem_bytes)
+	/*
+	 * Estimate number of groups which dataset is divided by presorted keys.
+	 */
+	if (presorted_keys > 0)
+	{
+		List	   *presortedExprs = NIL;
+		ListCell   *l;
+		int			i = 0;
+
+		/* Extract presorted keys as list of expressions */
+		foreach(l, pathkeys)
+		{
+			PathKey *key = (PathKey *)lfirst(l);
+			EquivalenceMember *member = (EquivalenceMember *)
+										linitial(key->pk_eclass->ec_members);
+
+			presortedExprs = lappend(presortedExprs, member->em_expr);
+
+			i++;
+			if (i >= presorted_keys)
+				break;
+		}
+
+		/* Estimate number of groups with equal presorted keys */
+		num_groups = estimate_num_groups(root, presortedExprs, tuples, NULL);
+	}
+	else
+	{
+		num_groups = 1.0;
+	}
+
+	/*
+	 * Estimate average cost of sorting of one group where presorted keys are
+	 * equal.
+	 */
+	group_input_bytes = input_bytes / num_groups;
+	group_tuples = tuples / num_groups;
+	if (output_bytes > sort_mem_bytes && group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll have to use a disk-based sort of all the tuples
 		 */
-		double		npages = ceil(input_bytes / BLCKSZ);
-		double		nruns = input_bytes / sort_mem_bytes;
+		double		npages = ceil(group_input_bytes / BLCKSZ);
+		double		nruns = group_input_bytes / sort_mem_bytes;
 		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
 		double		log_runs;
 		double		npageaccesses;
@@ -1701,7 +1757,7 @@ cost_sort(Path *path, PlannerInfo *root,
 		 *
 		 * Assume about N log2 N comparisons
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
 
 		/* Disk costs */
 
@@ -1712,10 +1768,10 @@ cost_sort(Path *path, PlannerInfo *root,
 			log_runs = 1.0;
 		npageaccesses = 2.0 * npages * log_runs;
 		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
-		startup_cost += npageaccesses *
+		group_cost += npageaccesses *
 			(seq_page_cost * 0.75 + random_page_cost * 0.25);
 	}
-	else if (tuples > 2 * output_tuples || input_bytes > sort_mem_bytes)
+	else if (group_tuples > 2 * output_tuples || group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll use a bounded heap-sort keeping just K tuples in memory, for
@@ -1723,14 +1779,33 @@ cost_sort(Path *path, PlannerInfo *root,
 		 * factor is a bit higher than for quicksort.  Tweak it so that the
 		 * cost curve is continuous at the crossover point.
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(2.0 * output_tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(2.0 * output_tuples);
 	}
 	else
 	{
-		/* We'll use plain quicksort on all the input tuples */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		/*
+		 * We'll use plain quicksort on all the input tuples.  If it appears
+		 * that we expect less than two tuples per sort group then assume
+		 * logarithmic part of estimate to be 1.
+		 */
+		if (group_tuples >= 2.0)
+			group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
+		else
+			group_cost = comparison_cost * group_tuples;
 	}
 
+	/* Add per group cost of fetching tuples from input */
+	group_cost += input_run_cost / num_groups;
+
+	/*
+	 * We've to sort first group to start output from node. Sorting rest of
+	 * groups are required to return all the other tuples.
+	 */
+	startup_cost += group_cost;
+	rest_cost = (num_groups * (output_tuples / tuples) - 1.0) * group_cost;
+	if (rest_cost > 0.0)
+		run_cost += rest_cost;
+
 	/*
 	 * Also charge a small amount (arbitrarily set equal to operator cost) per
 	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
@@ -1741,6 +1816,20 @@ cost_sort(Path *path, PlannerInfo *root,
 	 */
 	run_cost += cpu_operator_cost * tuples;
 
+	/* Extra costs of incremental sort */
+	if (presorted_keys > 0)
+	{
+		/*
+		 * In incremental sort case we also have to cost the detection of
+		 * sort groups.  This turns out to be one extra copy and comparison
+		 * per tuple.
+		 */
+		run_cost += (cpu_tuple_cost + comparison_cost) * tuples;
+
+		/* Cost of per group tuplesort reset */
+		run_cost += 2.0 * cpu_tuple_cost * num_groups;
+	}
+
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
@@ -2717,6 +2806,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  outersortkeys,
+				  pathkeys_common(outer_path->pathkeys, outersortkeys),
+				  outer_path->startup_cost,
 				  outer_path->total_cost,
 				  outer_path_rows,
 				  outer_path->pathtarget->width,
@@ -2743,6 +2834,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  innersortkeys,
+				  pathkeys_common(inner_path->pathkeys, innersortkeys),
+				  inner_path->startup_cost,
 				  inner_path->total_cost,
 				  inner_path_rows,
 				  inner_path->pathtarget->width,
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index ef58cff28d..329ba7b532 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -22,10 +22,12 @@
 #include "nodes/nodeFuncs.h"
 #include "nodes/plannodes.h"
 #include "optimizer/clauses.h"
+#include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/tlist.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 
 
 static bool pathkey_is_redundant(PathKey *new_pathkey, List *pathkeys);
@@ -308,6 +310,33 @@ compare_pathkeys(List *keys1, List *keys2)
 	return PATHKEYS_EQUAL;
 }
 
+
+/*
+ * pathkeys_common
+ *    Returns length of longest common prefix of keys1 and keys2.
+ */
+int
+pathkeys_common(List *keys1, List *keys2)
+{
+	int n;
+	ListCell   *key1,
+			   *key2;
+	n = 0;
+
+	forboth(key1, keys1, key2, keys2)
+	{
+		PathKey    *pathkey1 = (PathKey *) lfirst(key1);
+		PathKey    *pathkey2 = (PathKey *) lfirst(key2);
+
+		if (pathkey1 != pathkey2)
+			return n;
+		n++;
+	}
+
+	return n;
+}
+
+
 /*
  * pathkeys_contained_in
  *	  Common special case of compare_pathkeys: we just want to know
@@ -1488,26 +1517,42 @@ right_merge_direction(PlannerInfo *root, PathKey *pathkey)
  *		Count the number of pathkeys that are useful for meeting the
  *		query's requested output ordering.
  *
- * Unlike merge pathkeys, this is an all-or-nothing affair: it does us
- * no good to order by just the first key(s) of the requested ordering.
- * So the result is always either 0 or list_length(root->query_pathkeys).
+ * Returns number of pathkeys that maches given argument. Others can be
+ * satisfied by incremental sort.
  */
-static int
-pathkeys_useful_for_ordering(PlannerInfo *root, List *pathkeys)
+int
+pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys)
 {
-	if (root->query_pathkeys == NIL)
+	int	n_common_pathkeys;
+
+	if (query_pathkeys == NIL)
 		return 0;				/* no special ordering requested */
 
 	if (pathkeys == NIL)
 		return 0;				/* unordered path */
 
-	if (pathkeys_contained_in(root->query_pathkeys, pathkeys))
+	n_common_pathkeys = pathkeys_common(query_pathkeys, pathkeys);
+
+	if (enable_incrementalsort)
 	{
-		/* It's useful ... or at least the first N keys are */
-		return list_length(root->query_pathkeys);
+		/*
+		 * Return the number of path keys in common, or 0 if there are none. Any
+		 * first common pathkeys could be useful for ordering because we can use
+		 * incremental sort.
+		 */
+		return n_common_pathkeys;
+	}
+	else
+	{
+		/*
+		 * When incremental sort is disabled, pathkeys are useful only when they
+		 * do contain all the query pathkeys.
+		 */
+		if (n_common_pathkeys == list_length(query_pathkeys))
+			return n_common_pathkeys;
+		else
+			return 0;
 	}
-
-	return 0;					/* path ordering not useful */
 }
 
 /*
@@ -1523,7 +1568,7 @@ truncate_useless_pathkeys(PlannerInfo *root,
 	int			nuseful2;
 
 	nuseful = pathkeys_useful_for_merging(root, rel, pathkeys);
-	nuseful2 = pathkeys_useful_for_ordering(root, pathkeys);
+	nuseful2 = pathkeys_useful_for_ordering(root->query_pathkeys, pathkeys);
 	if (nuseful2 > nuseful)
 		nuseful = nuseful2;
 
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index e599283d6b..133435f516 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -236,7 +236,7 @@ static MergeJoin *make_mergejoin(List *tlist,
 			   Plan *lefttree, Plan *righttree,
 			   JoinType jointype, bool inner_unique,
 			   bool skip_mark_restore);
-static Sort *make_sort(Plan *lefttree, int numCols,
+static Sort *make_sort(Plan *lefttree, int numCols, int skipCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst);
 static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
@@ -252,10 +252,11 @@ static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
 					   TargetEntry *tle,
 					   Relids relids);
 static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
-						Relids relids);
+						Relids relids, int skipCols);
 static Sort *make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree);
+						 Plan *lefttree,
+						 int skipCols);
 static Material *make_material(Plan *lefttree);
 static WindowAgg *make_windowagg(List *tlist, Index winref,
 			   int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
@@ -437,6 +438,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 											   (GatherPath *) best_path);
 			break;
 		case T_Sort:
+		case T_IncrementalSort:
 			plan = (Plan *) create_sort_plan(root,
 											 (SortPath *) best_path,
 											 flags);
@@ -1122,6 +1124,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 		Oid		   *sortOperators;
 		Oid		   *collations;
 		bool	   *nullsFirst;
+		int			n_common_pathkeys;
 
 		/* Build the child plan */
 		/* Must insist that all children return the same tlist */
@@ -1156,9 +1159,11 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 					  numsortkeys * sizeof(bool)) == 0);
 
 		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
+		if (n_common_pathkeys < list_length(pathkeys))
 		{
 			Sort	   *sort = make_sort(subplan, numsortkeys,
+										 n_common_pathkeys,
 										 sortColIdx, sortOperators,
 										 collations, nullsFirst);
 
@@ -1508,6 +1513,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 	Plan	   *subplan;
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
+	int			n_common_pathkeys;
 
 	/* As with Gather, it's best to project away columns in the workers. */
 	subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
@@ -1537,12 +1543,16 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 
 
 	/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-	if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
+	n_common_pathkeys = pathkeys_common(pathkeys, best_path->subpath->pathkeys);
+	if (n_common_pathkeys < list_length(pathkeys))
+	{
 		subplan = (Plan *) make_sort(subplan, gm_plan->numCols,
+									 n_common_pathkeys,
 									 gm_plan->sortColIdx,
 									 gm_plan->sortOperators,
 									 gm_plan->collations,
 									 gm_plan->nullsFirst);
+	}
 
 	/* Now insert the subplan under GatherMerge. */
 	gm_plan->plan.lefttree = subplan;
@@ -1655,6 +1665,7 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 {
 	Sort	   *plan;
 	Plan	   *subplan;
+	int			n_common_pathkeys;
 
 	/*
 	 * We don't want any excess columns in the sorted tuples, so request a
@@ -1664,7 +1675,13 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	subplan = create_plan_recurse(root, best_path->subpath,
 								  flags | CP_SMALL_TLIST);
 
-	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys, NULL);
+	if (IsA(best_path, IncrementalSortPath))
+		n_common_pathkeys = ((IncrementalSortPath *) best_path)->skipCols;
+	else
+		n_common_pathkeys = 0;
+
+	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys,
+								   NULL, n_common_pathkeys);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -1908,7 +1925,8 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 				sort_plan = (Plan *)
 					make_sort_from_groupcols(rollup->groupClause,
 											 new_grpColIdx,
-											 subplan);
+											 subplan,
+											 0);
 			}
 
 			if (!rollup->is_hashed)
@@ -3848,10 +3866,15 @@ create_mergejoin_plan(PlannerInfo *root,
 	 */
 	if (best_path->outersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		outer_relids = outer_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(outer_plan,
-												   best_path->outersortkeys,
-												   outer_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->outersortkeys,
+									best_path->jpath.outerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(outer_plan, best_path->outersortkeys,
+									   outer_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		outer_plan = (Plan *) sort;
@@ -3862,10 +3885,15 @@ create_mergejoin_plan(PlannerInfo *root,
 
 	if (best_path->innersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		inner_relids = inner_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(inner_plan,
-												   best_path->innersortkeys,
-												   inner_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->innersortkeys,
+									best_path->jpath.innerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(inner_plan, best_path->innersortkeys,
+									   inner_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		inner_plan = (Plan *) sort;
@@ -4927,8 +4955,13 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
 {
 	Plan	   *lefttree = plan->plan.lefttree;
 	Path		sort_path;		/* dummy for result of cost_sort */
+	int			skip_cols = 0;
+
+	if (IsA(plan, IncrementalSort))
+		skip_cols = ((IncrementalSort *) plan)->skipCols;
 
-	cost_sort(&sort_path, root, NIL,
+	cost_sort(&sort_path, root, NIL, skip_cols,
+			  lefttree->startup_cost,
 			  lefttree->total_cost,
 			  lefttree->plan_rows,
 			  lefttree->plan_width,
@@ -5519,13 +5552,31 @@ make_mergejoin(List *tlist,
  * nullsFirst arrays already.
  */
 static Sort *
-make_sort(Plan *lefttree, int numCols,
+make_sort(Plan *lefttree, int numCols, int skipCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst)
 {
-	Sort	   *node = makeNode(Sort);
-	Plan	   *plan = &node->plan;
+	Sort	   *node;
+	Plan	   *plan;
+
+	/* Always use regular sort node when enable_incrementalsort = false */
+	if (!enable_incrementalsort)
+		skipCols = 0;
+
+	if (skipCols == 0)
+	{
+		node = makeNode(Sort);
+	}
+	else
+	{
+		IncrementalSort    *incrementalSort;
+
+		incrementalSort = makeNode(IncrementalSort);
+		node = &incrementalSort->sort;
+		incrementalSort->skipCols = skipCols;
+	}
 
+	plan = &node->plan;
 	plan->targetlist = lefttree->targetlist;
 	plan->qual = NIL;
 	plan->lefttree = lefttree;
@@ -5858,9 +5909,11 @@ find_ec_member_for_tle(EquivalenceClass *ec,
  *	  'lefttree' is the node which yields input tuples
  *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
  *	  'relids' is the set of relations required by prepare_sort_from_pathkeys()
+ *	  'skipCols' is the number of presorted columns in input tuples
  */
 static Sort *
-make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
+make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
+						Relids relids, int skipCols)
 {
 	int			numsortkeys;
 	AttrNumber *sortColIdx;
@@ -5880,7 +5933,7 @@ make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
 										  &nullsFirst);
 
 	/* Now build the Sort node */
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, skipCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5923,7 +5976,7 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, 0,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5944,7 +5997,8 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 static Sort *
 make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree)
+						 Plan *lefttree,
+						 int skipCols)
 {
 	List	   *sub_tlist = lefttree->targetlist;
 	ListCell   *l;
@@ -5977,7 +6031,7 @@ make_sort_from_groupcols(List *groupcls,
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, skipCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -6633,6 +6687,7 @@ is_projection_capable_plan(Plan *plan)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_LockRows:
diff --git a/src/backend/optimizer/plan/planagg.c b/src/backend/optimizer/plan/planagg.c
index 95cbffbd69..308f60beac 100644
--- a/src/backend/optimizer/plan/planagg.c
+++ b/src/backend/optimizer/plan/planagg.c
@@ -44,6 +44,7 @@
 #include "parser/parse_clause.h"
 #include "rewrite/rewriteManip.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 #include "utils/syscache.h"
 
 
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 7b52dadd81..3842271245 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3849,14 +3849,14 @@ create_grouping_paths(PlannerInfo *root,
 			foreach(lc, input_rel->partial_pathlist)
 			{
 				Path	   *path = (Path *) lfirst(lc);
-				bool		is_sorted;
+				int			n_useful_pathkeys;
 
-				is_sorted = pathkeys_contained_in(root->group_pathkeys,
-												  path->pathkeys);
-				if (path == cheapest_partial_path || is_sorted)
+				n_useful_pathkeys = pathkeys_useful_for_ordering(
+										root->group_pathkeys, path->pathkeys);
+				if (path == cheapest_partial_path || n_useful_pathkeys > 0)
 				{
 					/* Sort the cheapest partial path, if it isn't already */
-					if (!is_sorted)
+					if (n_useful_pathkeys < list_length(root->group_pathkeys))
 						path = (Path *) create_sort_path(root,
 														 grouped_rel,
 														 path,
@@ -3929,14 +3929,14 @@ create_grouping_paths(PlannerInfo *root,
 		foreach(lc, input_rel->pathlist)
 		{
 			Path	   *path = (Path *) lfirst(lc);
-			bool		is_sorted;
+			int			n_useful_pathkeys;
 
-			is_sorted = pathkeys_contained_in(root->group_pathkeys,
-											  path->pathkeys);
-			if (path == cheapest_path || is_sorted)
+			n_useful_pathkeys = pathkeys_useful_for_ordering(
+										root->group_pathkeys, path->pathkeys);
+			if (path == cheapest_path || n_useful_pathkeys > 0)
 			{
 				/* Sort the cheapest-total path if it isn't already sorted */
-				if (!is_sorted)
+				if (n_useful_pathkeys < list_length(root->group_pathkeys))
 					path = (Path *) create_sort_path(root,
 													 grouped_rel,
 													 path,
@@ -5003,13 +5003,13 @@ create_ordered_paths(PlannerInfo *root,
 	foreach(lc, input_rel->pathlist)
 	{
 		Path	   *path = (Path *) lfirst(lc);
-		bool		is_sorted;
+		int			n_useful_pathkeys;
 
-		is_sorted = pathkeys_contained_in(root->sort_pathkeys,
-										  path->pathkeys);
-		if (path == cheapest_input_path || is_sorted)
+		n_useful_pathkeys = pathkeys_useful_for_ordering(root->sort_pathkeys,
+														 path->pathkeys);
+		if (path == cheapest_input_path || n_useful_pathkeys > 0)
 		{
-			if (!is_sorted)
+			if (n_useful_pathkeys < list_length(root->sort_pathkeys))
 			{
 				/* An explicit sort here can take advantage of LIMIT */
 				path = (Path *) create_sort_path(root,
@@ -6139,8 +6139,9 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
 
 	/* Estimate the cost of seq scan + sort */
 	seqScanPath = create_seqscan_path(root, rel, NULL, 0);
-	cost_sort(&seqScanAndSortPath, root, NIL,
-			  seqScanPath->total_cost, rel->tuples, rel->reltarget->width,
+	cost_sort(&seqScanAndSortPath, root, NIL, 0,
+			  seqScanPath->startup_cost, seqScanPath->total_cost,
+			  rel->tuples, rel->reltarget->width,
 			  comparisonCost, maintenance_work_mem, -1.0);
 
 	/* Estimate the cost of index scan */
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 4617d12cb9..be520e6086 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -642,6 +642,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index 46367cba63..616ad1a474 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2782,6 +2782,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_Group:
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 5a08e75ad5..eb95ca4c5e 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -983,7 +983,8 @@ choose_hashed_setop(PlannerInfo *root, List *groupClauses,
 	sorted_p.startup_cost = input_path->startup_cost;
 	sorted_p.total_cost = input_path->total_cost;
 	/* XXX cost_sort doesn't actually look at pathkeys, so just pass NIL */
-	cost_sort(&sorted_p, root, NIL, sorted_p.total_cost,
+	cost_sort(&sorted_p, root, NIL, 0, 
+			  sorted_p.startup_cost, sorted_p.total_cost,
 			  input_path->rows, input_path->pathtarget->width,
 			  0.0, work_mem, -1.0);
 	cost_group(&sorted_p, root, numGroupCols, dNumGroups,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 7df8761710..9c6f910f14 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -105,7 +105,7 @@ compare_path_costs(Path *path1, Path *path2, CostSelector criterion)
 }
 
 /*
- * compare_path_fractional_costs
+ * compare_fractional_path_costs
  *	  Return -1, 0, or +1 according as path1 is cheaper, the same cost,
  *	  or more expensive than path2 for fetching the specified fraction
  *	  of the total tuples.
@@ -1356,12 +1356,13 @@ create_merge_append_path(PlannerInfo *root,
 	foreach(l, subpaths)
 	{
 		Path	   *subpath = (Path *) lfirst(l);
+		int			n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
 
 		pathnode->path.rows += subpath->rows;
 		pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
 			subpath->parallel_safe;
 
-		if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		if (n_common_pathkeys == list_length(pathkeys))
 		{
 			/* Subpath is adequately ordered, we won't need to sort it */
 			input_startup_cost += subpath->startup_cost;
@@ -1375,6 +1376,8 @@ create_merge_append_path(PlannerInfo *root,
 			cost_sort(&sort_path,
 					  root,
 					  pathkeys,
+					  n_common_pathkeys,
+					  subpath->startup_cost,
 					  subpath->total_cost,
 					  subpath->parent->tuples,
 					  subpath->pathtarget->width,
@@ -1622,7 +1625,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		/*
 		 * Estimate cost for sort+unique implementation
 		 */
-		cost_sort(&sort_path, root, NIL,
+		cost_sort(&sort_path, root, NIL, 0,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  rel->rows,
 				  subpath->pathtarget->width,
@@ -1715,6 +1719,7 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	GatherMergePath *pathnode = makeNode(GatherMergePath);
 	Cost		input_startup_cost = 0;
 	Cost		input_total_cost = 0;
+	int			n_common_pathkeys;
 
 	Assert(subpath->parallel_safe);
 	Assert(pathkeys);
@@ -1731,7 +1736,9 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
 	pathnode->path.rows += subpath->rows;
 
-	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+	n_common_pathkeys = pathkeys_common(pathkeys, subpath->pathkeys);
+
+	if (n_common_pathkeys == list_length(pathkeys))
 	{
 		/* Subpath is adequately ordered, we won't need to sort it */
 		input_startup_cost += subpath->startup_cost;
@@ -1745,6 +1752,8 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		cost_sort(&sort_path,
 				  root,
 				  pathkeys,
+				  n_common_pathkeys,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  subpath->rows,
 				  subpath->pathtarget->width,
@@ -2604,9 +2613,31 @@ create_sort_path(PlannerInfo *root,
 				 List *pathkeys,
 				 double limit_tuples)
 {
-	SortPath   *pathnode = makeNode(SortPath);
+	SortPath   *pathnode;
+	int			n_common_pathkeys;
+
+	if (enable_incrementalsort)
+		n_common_pathkeys = pathkeys_common(subpath->pathkeys, pathkeys);
+	else
+		n_common_pathkeys = 0;
+
+	if (n_common_pathkeys == 0)
+	{
+		pathnode = makeNode(SortPath);
+		pathnode->path.pathtype = T_Sort;
+	}
+	else
+	{
+		IncrementalSortPath   *incpathnode;
+
+		incpathnode = makeNode(IncrementalSortPath);
+		pathnode = &incpathnode->spath;
+		pathnode->path.pathtype = T_IncrementalSort;
+		incpathnode->skipCols = n_common_pathkeys;
+	}
+
+	Assert(n_common_pathkeys < list_length(pathkeys));
 
-	pathnode->path.pathtype = T_Sort;
 	pathnode->path.parent = rel;
 	/* Sort doesn't project, so use source path's pathtarget */
 	pathnode->path.pathtarget = subpath->pathtarget;
@@ -2620,7 +2651,9 @@ create_sort_path(PlannerInfo *root,
 
 	pathnode->subpath = subpath;
 
-	cost_sort(&pathnode->path, root, pathkeys,
+	cost_sort(&pathnode->path, root,
+			  pathkeys, n_common_pathkeys,
+			  subpath->startup_cost,
 			  subpath->total_cost,
 			  subpath->rows,
 			  subpath->pathtarget->width,
@@ -2932,7 +2965,8 @@ create_groupingsets_path(PlannerInfo *root,
 			else
 			{
 				/* Account for cost of sort, but don't charge input cost again */
-				cost_sort(&sort_path, root, NIL,
+				cost_sort(&sort_path, root, NIL, 0,
+						  0.0,
 						  0.0,
 						  subpath->rows,
 						  subpath->pathtarget->width,
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
index 79dbfd1a05..e3e984b3da 100644
--- a/src/backend/utils/adt/orderedsetaggs.c
+++ b/src/backend/utils/adt/orderedsetaggs.c
@@ -291,7 +291,8 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
 												   qstate->sortCollations,
 												   qstate->sortNullsFirsts,
 												   work_mem,
-												   qstate->rescan_needed);
+												   qstate->rescan_needed,
+												   false);
 	else
 		osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
 													qstate->sortOperator,
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index fcc8323f62..4726bee850 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -3714,6 +3714,42 @@ estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows,
 	return numdistinct;
 }
 
+/*
+ * estimate_pathkeys_groups	- Estimate number of groups which dataset is
+ * 							  divided to by pathkeys.
+ *
+ * Returns an array of group numbers. i'th element of array is number of groups
+ * which first i pathkeys divides dataset into.  Actually is a convenience
+ * wrapper over estimate_num_groups().
+ */
+double *
+estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root, double tuples)
+{
+	ListCell   *l;
+	List	   *groupExprs = NIL;
+	double	   *result;
+	int			i;
+
+	/*
+	 * Get number of groups for each prefix of pathkeys.
+	 */
+	i = 0;
+	result = (double *) palloc(sizeof(double) * list_length(pathkeys));
+	foreach(l, pathkeys)
+	{
+		PathKey *key = (PathKey *)lfirst(l);
+		EquivalenceMember *member = (EquivalenceMember *)
+							linitial(key->pk_eclass->ec_members);
+
+		groupExprs = lappend(groupExprs, member->em_expr);
+
+		result[i] = estimate_num_groups(root, groupExprs, tuples, NULL);
+		i++;
+	}
+
+	return result;
+}
+
 /*
  * Estimate hash bucket statistics when the specified expression is used
  * as a hash key for the given number of buckets.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be329e..bea4f00421 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -857,6 +857,15 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_incrementalsort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of incremental sort steps."),
+			NULL
+		},
+		&enable_incrementalsort,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of hashed aggregation plans."),
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index eecc66cafa..0265da312b 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -231,6 +231,13 @@ struct Tuplesortstate
 	int64		allowedMem;		/* total memory allowed, in bytes */
 	int			maxTapes;		/* number of tapes (Knuth's T) */
 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	int64		maxSpace;		/* maximum amount of space occupied among sort
+								   of groups, either in-memory or on-disk */
+	bool		maxSpaceOnDisk;	/* true when maxSpace is value for on-disk
+								   space, false when it's value for in-memory
+								   space */
+	TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
+	MemoryContext maincontext;
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
@@ -573,6 +580,9 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
 			  int tapenum, unsigned int len);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+static void tuplesort_free(Tuplesortstate *state, bool delete);
+static void tuplesort_updatemax(Tuplesortstate *state);
+
 
 /*
  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
@@ -607,18 +617,27 @@ static Tuplesortstate *
 tuplesort_begin_common(int workMem, bool randomAccess)
 {
 	Tuplesortstate *state;
+	MemoryContext maincontext;
 	MemoryContext sortcontext;
 	MemoryContext tuplecontext;
 	MemoryContext oldcontext;
 
 	/*
-	 * Create a working memory context for this sort operation. All data
-	 * needed by the sort will live inside this context.
+	 * Memory context surviving tuplesort_reset.  This memory context holds
+	 * data which is useful to keep while sorting multiple similar batches.
 	 */
-	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
+	maincontext = AllocSetContextCreate(CurrentMemoryContext,
 										"TupleSort main",
 										ALLOCSET_DEFAULT_SIZES);
 
+	/*
+	 * Create a working memory context for one sort operation.  The content of
+	 * this context is deleted by tuplesort_reset.
+	 */
+	sortcontext = AllocSetContextCreate(maincontext,
+										"TupleSort sort",
+										ALLOCSET_DEFAULT_SIZES);
+
 	/*
 	 * Caller tuple (e.g. IndexTuple) memory context.
 	 *
@@ -636,7 +655,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	 * Make the Tuplesortstate within the per-sort context.  This way, we
 	 * don't need a separate pfree() operation for it at shutdown.
 	 */
-	oldcontext = MemoryContextSwitchTo(sortcontext);
+	oldcontext = MemoryContextSwitchTo(maincontext);
 
 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
@@ -654,6 +673,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	state->availMem = state->allowedMem;
 	state->sortcontext = sortcontext;
 	state->tuplecontext = tuplecontext;
+	state->maincontext = maincontext;
 	state->tapeset = NULL;
 
 	state->memtupcount = 0;
@@ -694,13 +714,14 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, bool randomAccess)
+					 int workMem, bool randomAccess,
+					 bool skipAbbrev)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 	AssertArg(nkeys > 0);
 
@@ -742,7 +763,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
 		sortKey->ssup_attno = attNums[i];
 		/* Convey if abbreviation optimization is applicable in principle */
-		sortKey->abbreviate = (i == 0);
+		sortKey->abbreviate = (i == 0) && !skipAbbrev;
 
 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
 	}
@@ -773,7 +794,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -864,7 +885,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -939,7 +960,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
 	MemoryContext oldcontext;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -981,7 +1002,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	int16		typlen;
 	bool		typbyval;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1092,16 +1113,12 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
 }
 
 /*
- * tuplesort_end
- *
- *	Release resources and clean up.
+ * tuplesort_free
  *
- * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
- * pointing to garbage.  Be careful not to attempt to use or free such
- * pointers afterwards!
+ *	Internal routine for freeing resources of tuplesort.
  */
-void
-tuplesort_end(Tuplesortstate *state)
+static void
+tuplesort_free(Tuplesortstate *state, bool delete)
 {
 	/* context swap probably not needed, but let's be safe */
 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -1160,7 +1177,98 @@ tuplesort_end(Tuplesortstate *state)
 	 * Free the per-sort memory context, thereby releasing all working memory,
 	 * including the Tuplesortstate struct itself.
 	 */
-	MemoryContextDelete(state->sortcontext);
+	if (delete)
+	{
+		MemoryContextDelete(state->maincontext);
+	}
+	else
+	{
+		MemoryContextResetOnly(state->sortcontext);
+		MemoryContextResetOnly(state->tuplecontext);
+	}
+}
+
+/*
+ * tuplesort_end
+ *
+ *	Release resources and clean up.
+ *
+ * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
+ * pointing to garbage.  Be careful not to attempt to use or free such
+ * pointers afterwards!
+ */
+void
+tuplesort_end(Tuplesortstate *state)
+{
+	tuplesort_free(state, true);
+}
+
+/*
+ * tuplesort_updatemax 
+ *
+ *	Update maximum resource usage statistics.
+ */
+static void
+tuplesort_updatemax(Tuplesortstate *state)
+{
+	int64	spaceUsed;
+	bool	spaceUsedOnDisk;
+
+	/*
+	 * Note: it might seem we should provide both memory and disk usage for a
+	 * disk-based sort.  However, the current code doesn't track memory space
+	 * accurately once we have begun to return tuples to the caller (since we
+	 * don't account for pfree's the caller is expected to do), so we cannot
+	 * rely on availMem in a disk sort.  This does not seem worth the overhead
+	 * to fix.  Is it worth creating an API for the memory context code to
+	 * tell us how much is actually used in sortcontext?
+	 */
+	if (state->tapeset)
+	{
+		spaceUsedOnDisk = true;
+		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
+	}
+	else
+	{
+		spaceUsedOnDisk = false;
+		spaceUsed = state->allowedMem - state->availMem;
+	}
+
+	if (spaceUsed > state->maxSpace)
+	{
+		state->maxSpace = spaceUsed;
+		state->maxSpaceOnDisk = spaceUsedOnDisk;
+		state->maxSpaceStatus = state->status;
+	}
+}
+
+/*
+ * tuplesort_reset
+ *
+ *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
+ *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
+ *	a new sort.  It allows evade recreation of tuple sort (and save resources)
+ *	when sorting multiple small batches.
+ */
+void
+tuplesort_reset(Tuplesortstate *state)
+{
+	tuplesort_updatemax(state);
+	tuplesort_free(state, false);
+	state->status = TSS_INITIAL;
+	state->memtupcount = 0;
+	state->boundUsed = false;
+	state->tapeset = NULL;
+	state->currentRun = 0;
+	state->result_tape = -1;
+	state->bounded = false;
+	state->availMem = state->allowedMem;
+	state->lastReturnedTuple = NULL;
+	state->slabAllocatorUsed = false;
+	state->slabMemoryBegin = NULL;
+	state->slabMemoryEnd = NULL;
+	state->slabFreeHead = NULL;
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 }
 
 /*
@@ -2944,18 +3052,15 @@ tuplesort_get_stats(Tuplesortstate *state,
 	 * to fix.  Is it worth creating an API for the memory context code to
 	 * tell us how much is actually used in sortcontext?
 	 */
-	if (state->tapeset)
-	{
+	tuplesort_updatemax(state);
+
+	if (state->maxSpaceOnDisk)
 		stats->spaceType = SORT_SPACE_TYPE_DISK;
-		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
-	}
 	else
-	{
 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
-		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
-	}
+	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
 
-	switch (state->status)
+	switch (state->maxSpaceStatus)
 	{
 		case TSS_SORTEDINMEM:
 			if (state->boundUsed)
diff --git a/src/include/executor/nodeIncrementalSort.h b/src/include/executor/nodeIncrementalSort.h
new file mode 100644
index 0000000000..a9b562843d
--- /dev/null
+++ b/src/include/executor/nodeIncrementalSort.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncrementalSort.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeIncrementalSort.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEINCREMENTALSORT_H
+#define NODEINCREMENTALSORT_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern IncrementalSortState *ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags);
+extern void ExecEndIncrementalSort(IncrementalSortState *node);
+extern void ExecReScanIncrementalSort(IncrementalSortState *node);
+
+/* parallel instrumentation support */
+extern void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortReInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pcxt);
+extern void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node);
+
+#endif   /* NODEINCREMENTALSORT_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 2a4f7407a1..4180f57e88 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1754,6 +1754,20 @@ typedef struct MaterialState
 	Tuplestorestate *tuplestorestate;
 } MaterialState;
 
+
+/* ----------------
+ *	 When performing sorting by multiple keys input dataset could be already
+ *	 presorted by some prefix of these keys.  We call them "skip keys".
+ *	 SkipKeyData represents information about one such key.
+ * ----------------
+ */
+typedef struct SkipKeyData
+{
+	FmgrInfo				flinfo;	/* comparison function info */
+	FunctionCallInfoData	fcinfo;	/* comparison function call info */
+	OffsetNumber			attno;	/* attribute number in tuple */
+} SkipKeyData;
+
 /* ----------------
  *	 Shared memory container for per-worker sort information
  * ----------------
@@ -1782,6 +1796,44 @@ typedef struct SortState
 	SharedSortInfo *shared_info;	/* one entry per worker */
 } SortState;
 
+/* ----------------
+ *	 Shared memory container for per-worker incremental sort information
+ * ----------------
+ */
+typedef struct IncrementalSortInfo
+{
+	TuplesortInstrumentation	sinstrument;
+	int64						groupsCount;
+} IncrementalSortInfo;
+
+typedef struct SharedIncrementalSortInfo
+{
+	int							num_workers;
+	IncrementalSortInfo			sinfo[FLEXIBLE_ARRAY_MEMBER];
+} SharedIncrementalSortInfo;
+
+/* ----------------
+ *	 IncrementalSortState information
+ * ----------------
+ */
+typedef struct IncrementalSortState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	bool		bounded;		/* is the result set bounded? */
+	int64		bound;			/* if bounded, how many tuples are needed */
+	bool		sort_Done;		/* sort completed yet? */
+	bool		finished;		/* fetching tuples from outer node
+								   is finished ? */
+	bool		bounded_Done;	/* value of bounded we did the sort with */
+	int64		bound_Done;		/* value of bound we did the sort with */
+	void	   *tuplesortstate; /* private state of tuplesort.c */
+	SkipKeyData *skipKeys;		/* keys, dataset is presorted by */
+	int64		groupsCount;	/* number of groups with equal skip keys */
+	TupleTableSlot *sampleSlot;	/* slot for sample tuple of sort group */
+	bool		am_worker;		/* are we a worker? */
+	SharedIncrementalSortInfo *shared_info;	/* one entry per worker */
+} IncrementalSortState;
+
 /* ---------------------
  *	GroupState information
  * ---------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2eb3d6d371..b6a9d6c597 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -73,6 +73,7 @@ typedef enum NodeTag
 	T_HashJoin,
 	T_Material,
 	T_Sort,
+	T_IncrementalSort,
 	T_Group,
 	T_Agg,
 	T_WindowAgg,
@@ -125,6 +126,7 @@ typedef enum NodeTag
 	T_HashJoinState,
 	T_MaterialState,
 	T_SortState,
+	T_IncrementalSortState,
 	T_GroupState,
 	T_AggState,
 	T_WindowAggState,
@@ -240,6 +242,7 @@ typedef enum NodeTag
 	T_ProjectionPath,
 	T_ProjectSetPath,
 	T_SortPath,
+	T_IncrementalSortPath,
 	T_GroupPath,
 	T_UpperUniquePath,
 	T_AggPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 74e9fb5f7b..033ec416fe 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -750,6 +750,17 @@ typedef struct Sort
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
 } Sort;
 
+
+/* ----------------
+ *		incremental sort node
+ * ----------------
+ */
+typedef struct IncrementalSort
+{
+	Sort		sort;
+	int			skipCols;		/* number of presorted columns */
+} IncrementalSort;
+
 /* ---------------
  *	 group node -
  *		Used for queries with GROUP BY (but no aggregates) specified.
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 71689b8ed6..0d072fd7c3 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1513,6 +1513,16 @@ typedef struct SortPath
 	Path	   *subpath;		/* path representing input source */
 } SortPath;
 
+/*
+ * IncrementalSortPath
+ */
+typedef struct IncrementalSortPath
+{
+	SortPath	spath;
+	int			skipCols;
+} IncrementalSortPath;
+
+
 /*
  * GroupPath represents grouping (of presorted input)
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index d2fff76653..45cfbee724 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,7 @@ extern bool enable_indexonlyscan;
 extern bool enable_bitmapscan;
 extern bool enable_tidscan;
 extern bool enable_sort;
+extern bool enable_incrementalsort;
 extern bool enable_hashagg;
 extern bool enable_nestloop;
 extern bool enable_material;
@@ -105,8 +106,9 @@ extern void cost_namedtuplestorescan(Path *path, PlannerInfo *root,
 						 RelOptInfo *baserel, ParamPathInfo *param_info);
 extern void cost_recursive_union(Path *runion, Path *nrterm, Path *rterm);
 extern void cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples);
 extern void cost_append(AppendPath *path);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 0072b7aa0d..d6b8841d33 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -188,6 +188,7 @@ typedef enum
 
 extern PathKeysComparison compare_pathkeys(List *keys1, List *keys2);
 extern bool pathkeys_contained_in(List *keys1, List *keys2);
+extern int pathkeys_common(List *keys1, List *keys2);
 extern Path *get_cheapest_path_for_pathkeys(List *paths, List *pathkeys,
 							   Relids required_outer,
 							   CostSelector cost_criterion,
@@ -226,6 +227,7 @@ extern List *select_outer_pathkeys_for_merge(PlannerInfo *root,
 extern List *make_inner_pathkeys_for_merge(PlannerInfo *root,
 							  List *mergeclauses,
 							  List *outer_pathkeys);
+extern int pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys);
 extern List *truncate_useless_pathkeys(PlannerInfo *root,
 						  RelOptInfo *rel,
 						  List *pathkeys);
diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h
index 299c9f846a..43e8ef20dc 100644
--- a/src/include/utils/selfuncs.h
+++ b/src/include/utils/selfuncs.h
@@ -206,6 +206,9 @@ extern void mergejoinscansel(PlannerInfo *root, Node *clause,
 extern double estimate_num_groups(PlannerInfo *root, List *groupExprs,
 					double input_rows, List **pgset);
 
+extern double *estimate_pathkeys_groups(List *pathkeys, PlannerInfo *root,
+										double tuples);
+
 extern void estimate_hash_bucket_stats(PlannerInfo *root,
 						   Node *hashkey, double nbuckets,
 						   Selectivity *mcv_freq,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 5d57c503ab..9a5b7f8d3c 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -90,7 +90,8 @@ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, bool randomAccess);
+					 int workMem, bool randomAccess,
+					 bool skipAbbrev);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
 						Relation indexRel,
 						int workMem, bool randomAccess);
@@ -134,6 +135,8 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples,
 
 extern void tuplesort_end(Tuplesortstate *state);
 
+extern void tuplesort_reset(Tuplesortstate *state);
+
 extern void tuplesort_get_stats(Tuplesortstate *state,
 					TuplesortInstrumentation *stats);
 extern const char *tuplesort_method_name(TuplesortMethod m);
diff --git a/src/test/isolation/expected/drop-index-concurrently-1.out b/src/test/isolation/expected/drop-index-concurrently-1.out
index 75dff56bc4..e11fb617b5 100644
--- a/src/test/isolation/expected/drop-index-concurrently-1.out
+++ b/src/test/isolation/expected/drop-index-concurrently-1.out
@@ -19,9 +19,10 @@ Sort
 step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
 QUERY PLAN     
 
-Sort           
+Incremental Sort
   Sort Key: id, data
-  ->  Seq Scan on test_dc
+  Presorted Key: id
+  ->  Index Scan using test_dc_pkey on test_dc
         Filter: ((data)::text = '34'::text)
 step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
 id             data           
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
index a79f891da7..0926650a0f 100644
--- a/src/test/regress/expected/inherit.out
+++ b/src/test/regress/expected/inherit.out
@@ -1517,6 +1517,7 @@ NOTICE:  drop cascades to table matest1
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
 SELECT thousand, tenthous FROM tenk1
@@ -1657,9 +1658,45 @@ FROM generate_series(1, 3) g(i);
  {3,7,8,10,13,13,16,18,19,22}
 (3 rows)
 
+set enable_incrementalsort = on;
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Merge Append
+   Sort Key: tenk1.thousand, tenk1.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1
+   ->  Incremental Sort
+         Sort Key: tenk1_1.thousand, tenk1_1.thousand
+         Presorted Key: tenk1_1.thousand
+         ->  Index Only Scan using tenk1_thous_tenthous on tenk1 tenk1_1
+(7 rows)
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Merge Append
+   Sort Key: a.thousand, a.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1 a
+   ->  Incremental Sort
+         Sort Key: b.unique2, b.unique2
+         Presorted Key: b.unique2
+         ->  Index Only Scan using tenk1_unique2 on tenk1 b
+(7 rows)
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 --
 -- Check that constraint exclusion works correctly with partitions using
 -- implicit constraints generated from the partition bound information.
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index c9c8f51e1c..898361d6b3 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -76,6 +76,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_gathermerge         | on
  enable_hashagg             | on
  enable_hashjoin            | on
+ enable_incrementalsort     | on
  enable_indexonlyscan       | on
  enable_indexscan           | on
  enable_material            | on
@@ -87,7 +88,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan             | on
  enable_sort                | on
  enable_tidscan             | on
-(15 rows)
+(16 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index 2e42ae115d..7229997144 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -546,6 +546,7 @@ drop table matest0 cascade;
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
@@ -607,9 +608,26 @@ SELECT
     ORDER BY f.i LIMIT 10)
 FROM generate_series(1, 3) g(i);
 
+set enable_incrementalsort = on;
+
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 
 --
 -- Check that constraint exclusion works correctly with partitions using
