From b213809f03053efdba9370d89f70f90be827733d Mon Sep 17 00:00:00 2001
From: Paul Guo <pguo@pivotal.io>
Date: Thu, 28 Feb 2019 15:43:34 +0800
Subject: [PATCH] Heap batch insert for CTAS.

---
 src/backend/access/heap/heapam.c |  9 ++++-----
 src/backend/commands/copy.c      | 12 ++----------
 src/backend/commands/createas.c  | 30 +++++++++++++++++++++++++-----
 src/include/access/heapam.h      |  2 ++
 4 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dc3499349b..2e56e85463 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2399,16 +2399,12 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * That's faster than calling heap_insert() in a loop, because when multiple
  * tuples can be inserted on a single page, we can write just a single WAL
  * record covering all of them, and only need to lock/unlock the page once.
- *
- * Note: this leaks memory into the current memory context. You can create a
- * temporary context before calling this, if that's a problem.
  */
 void
 heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
-	HeapTuple  *heaptuples;
 	int			i;
 	int			ndone;
 	PGAlignedBlock scratch;
@@ -2417,6 +2413,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
 	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
+	/* Declare it as static to let this memory not be on stack. */
+	static HeapTuple	heaptuples[MAX_MULTI_INSERT_TUPLES];
+
+	Assert(ntuples <= MAX_MULTI_INSERT_TUPLES);
 
 	/* currently not needed (thus unsupported) for heap_multi_insert() */
 	AssertArg(!(options & HEAP_INSERT_NO_LOGICAL));
@@ -2426,7 +2426,6 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 												   HEAP_DEFAULT_FILLFACTOR);
 
 	/* Toast and set header data in all the tuples */
-	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
 		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
 											xid, cid, options);
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index dbb06397e6..a80a246a48 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2335,7 +2335,6 @@ CopyFrom(CopyState cstate)
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
 
-#define MAX_BUFFERED_TUPLES 1000
 #define RECHECK_MULTI_INSERT_THRESHOLD 1000
 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
 	Size		bufferedTuplesSize = 0;
@@ -2644,7 +2643,7 @@ CopyFrom(CopyState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		bufferedTuples = palloc(MAX_MULTI_INSERT_TUPLES * sizeof(HeapTuple));
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2961,7 +2960,7 @@ CopyFrom(CopyState cstate)
 					 * large, to avoid using large amounts of memory for the
 					 * buffer when the tuples are exceptionally wide.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+					if (nBufferedTuples == MAX_MULTI_INSERT_TUPLES ||
 						bufferedTuplesSize > 65535)
 					{
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
@@ -3113,7 +3112,6 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 					int nBufferedTuples, HeapTuple *bufferedTuples,
 					uint64 firstBufferedLineNo)
 {
-	MemoryContext oldcontext;
 	int			i;
 	uint64		save_cur_lineno;
 	bool		line_buf_valid = cstate->line_buf_valid;
@@ -3125,18 +3123,12 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * heap_multi_insert leaks memory, so switch to short-lived memory context
-	 * before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
 					  hi_options,
 					  bistate);
-	MemoryContextSwitchTo(oldcontext);
 
 	/*
 	 * If there are any indexes, update them for all the inserted tuples, and
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 2bc8f928ea..837ec65920 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,9 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			hi_options;		/* heap_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	HeapTuple   bufferedTuples[MAX_MULTI_INSERT_TUPLES];
+	int         nBufferedTuples;
+	int bufferedTuplesSize;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -559,6 +562,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->hi_options = HEAP_INSERT_SKIP_FSM |
 		(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
 	myState->bistate = GetBulkInsertState();
+	myState->nBufferedTuples = 0;
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
 	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
@@ -579,11 +583,18 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 */
 	tuple = ExecCopySlotHeapTuple(slot);
 
-	heap_insert(myState->rel,
-				tuple,
-				myState->output_cid,
-				myState->hi_options,
-				myState->bistate);
+	myState->bufferedTuples[myState->nBufferedTuples++] = tuple;
+	myState->bufferedTuplesSize += tuple->t_len;
+
+	if (myState->nBufferedTuples == MAX_MULTI_INSERT_TUPLES ||
+		myState->bufferedTuplesSize >= 65535)
+	{
+		heap_multi_insert(myState->rel, myState->bufferedTuples,
+						  myState->nBufferedTuples, myState->output_cid,
+						  myState->hi_options, myState->bistate);
+		myState->nBufferedTuples = 0;
+		myState->bufferedTuplesSize = 0;
+	}
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -598,6 +609,15 @@ intorel_shutdown(DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
 
+	if (myState->nBufferedTuples != 0)
+	{
+		heap_multi_insert(myState->rel, myState->bufferedTuples,
+						  myState->nBufferedTuples, myState->output_cid,
+						  myState->hi_options, myState->bistate);
+		myState->nBufferedTuples = 0;
+		myState->bufferedTuplesSize = 0;
+	}
+
 	FreeBulkInsertState(myState->bistate);
 
 	/* If we skipped using WAL, must heap_sync before commit */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ab0879138f..927babd4e6 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -37,6 +37,8 @@ typedef struct BulkInsertStateData *BulkInsertState;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
+#define MAX_MULTI_INSERT_TUPLES	1000
+
 /*
  * When heap_update, heap_delete, or heap_lock_tuple fail because the target
  * tuple is already outdated, they fill in this struct to provide information
-- 
2.17.2

