From 8dd54a131d27651f36587a784e0eedadfdf4ee5a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Tue, 8 Dec 2020 12:02:31 +0530
Subject: [PATCH v1] New Table AMs for Multi and Single Inserts

This patch introduces new table access methods for multi and
single inserts. Also implements/rearranges the outside code for
heap am into these new APIs.

Main design goal of these new APIs is to give flexibility to
tableam developers in implementing multi insert logic dependent on
the underlying storage engine. Currently, for all the underlying
storage engines, we follow the same multi insert logic such as when
and how to flush the buffered tuples, tuple size calculation, and
this logic doesn't take into account the underlying storage engine
capabilities.

We can also avoid duplicating multi insert code (for existing COPY,
and upcoming CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs). We
can also move bulk insert state allocation and deallocation inside
these APIs.
---
 src/backend/access/heap/heapam.c         | 245 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   5 +
 src/backend/access/table/tableamapi.c    |   7 +
 src/backend/executor/execTuples.c        |  83 +++++++-
 src/include/access/heapam.h              |  11 +
 src/include/access/tableam.h             | 120 +++++++++++
 src/include/executor/tuptable.h          |   1 +
 7 files changed, 471 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a9583f3103..48c93ccce7 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -66,6 +66,7 @@
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -2371,6 +2372,250 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Allocate and initialize TableInsertState.
+ *
+ * If alloc_bistate is true, then bulk insert state is allocated, otherwise not
+ *
+ * For single inserts:
+ *  1) Specify alloc_bistate as false, then multi insert state is NULL.
+ *  2) mi_max_slots and mi_max_size are ignored, but good to specify negative
+ * 	   values.
+ *
+ * For multi inserts:
+ *  1) Specify is_multi as true, then multi insert state is allcoated.
+ *  2) Specify mi_max_slots > 0 i.e. the number of slots to buffer.
+ * 	   mi_max_slots <= 0 is invalid.
+ *  3) Specify mi_max_size > 0 i.e. the total tuple size (in bytes) the
+ * 	   buffered slots can hold until flush.
+ *  4) Flush the buffers either if all the mi_max_slots are filled or if the
+ * 	   total tuple size of the so far buffered slots is >= mi_max_size. If
+ *	   mi_max_size <= 0, then flush the buffers only when all the mi_max_slots
+ *	   are filled.
+ *
+ *  Other input parameters i.e. relation, command id, options are common for
+ *  both single and multi inserts.
+ */
+TableInsertState* heap_insert_begin(Relation rel, CommandId cid, int options,
+									bool alloc_bistate, bool is_multi,
+									int32 mi_max_slots, int64 mi_max_size)
+{
+	TableInsertState *state = NULL;
+
+	state = palloc0(sizeof(TableInsertState));
+
+	state->rel = rel;
+	state->cid = cid;
+	state->options = options;
+
+	if (alloc_bistate)
+		state->bistate = GetBulkInsertState();
+	else
+		state->bistate = NULL;
+
+	if (is_multi)
+	{
+		if (mi_max_slots > 0)
+		{
+			state->mistate = palloc0(sizeof(TableMultiInsertState));
+			state->mistate->slots =
+							palloc0(sizeof(TupleTableSlot *) * mi_max_slots);
+			state->mistate->max_slots = mi_max_slots;
+			state->mistate->max_size = mi_max_size;
+			state->mistate->cur_slots = 0;
+			state->mistate->cur_size = 0;
+			state->mistate->cur_tup_size = -1;
+			state->mistate->clear_slots = true;
+			state->mistate->flushed	= false;
+
+			/*
+			 * Create a temporary memory context so that we can reset once per
+			 * multi insert batch.
+			 */
+			state->mistate->context =
+							AllocSetContextCreate(CurrentMemoryContext,
+												  "heap_multi_insert",
+												  ALLOCSET_DEFAULT_SIZES);
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid number of slots specified for heap multi inserts")));
+	}
+	else
+		state->mistate = NULL;
+
+	return state;
+}
+
+/* Insert a tuple from a slot into table AM routine. */
+void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+
+	/* Update the tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	/* Perform the insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->options, state->bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * Buffer the input slots and insert the tuples from the buffered slots at a
+ * time into a table.
+ *
+ * Compute the size of the tuple only if mi_max_size i.e. the total tuple size
+ * (in bytes) the buffered slots can hold until flush is specified and the
+ * current tuple size i.e. cur_tup_size is not known.
+ *
+ * Flush the tuples from the buffered slots into the table:
+ *  1) either if all the slots are filled up
+ *  2) or if the mi_max_size is specified and the total tuple size of the
+ * 	   currently buffered slots are >= mi_max_size.
+ */
+void heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	TupleTableSlot  *batchslot;
+
+	Assert(state->mistate != NULL);
+	Assert(state->mistate->slots != NULL);
+
+	if (state->mistate->slots[state->mistate->cur_slots] == NULL)
+		state->mistate->slots[state->mistate->cur_slots] =
+										table_slot_create(state->rel, NULL);
+
+	batchslot = state->mistate->slots[state->mistate->cur_slots];
+
+	ExecCopySlot(batchslot, slot);
+
+	/* Reset the flush state if previously set. */
+	if (state->mistate->flushed)
+		state->mistate->flushed = false;
+
+	/* Compute the tuple size only if asked to do so. */
+	if (state->mistate->max_size > 0 && state->mistate->cur_tup_size <= 0)
+	{
+		/* We are here when the tuple size is not known in the caller. */
+		Size sz;
+
+		/*
+		 * Calculate the tuple size after the original slot is copied. Because
+		 * the copied slot type and the tuple size may change.
+		 */
+		sz = GetTupleSize(batchslot, state->mistate->max_size);
+
+		state->mistate->cur_slots++;
+		state->mistate->cur_size += sz;
+
+	}
+	else if (state->mistate->max_size > 0 && state->mistate->cur_tup_size > 0)
+	{
+		/* Tuple size is known in the caller, just use and reset it. */
+		state->mistate->cur_slots++;
+		state->mistate->cur_size += state->mistate->cur_tup_size;
+		state->mistate->cur_tup_size = -1;
+	}
+
+	if (state->mistate->cur_slots >= state->mistate->max_slots ||
+		state->mistate->cur_size >= state->mistate->max_size)
+	{
+		heap_multi_insert_flush(state);
+	}
+}
+
+/*
+ * Flush the tuples from buffered slots if any.
+ *
+ * This function can be useful in cases, where one of the partition can not use
+ * multi inserts but others can and they have buffered few slots so far, which
+ * need to be flushed for visibility, before the partition that doesn't support
+ * can proceed with single inserts.
+ */
+void heap_multi_insert_flush(TableInsertState *state)
+{
+	Assert(state->mistate != NULL);
+
+	if (state->mistate->cur_slots > 0)
+	{
+		MemoryContext oldcontext;
+
+		oldcontext = MemoryContextSwitchTo(state->mistate->context);
+
+		heap_multi_insert(state->rel, state->mistate->slots,
+						state->mistate->cur_slots, state->cid,
+						state->options, state->bistate);
+
+		MemoryContextReset(state->mistate->context);
+		MemoryContextSwitchTo(oldcontext);
+
+		/*
+		 * Do not clear the slots always. Sometimes callers may want the slots
+		 * for index insertions or after row trigger executions in which case
+		 * they have to clear the tuples before using for the next insert
+		 * batch.
+		 */
+		if (state->mistate->clear_slots)
+		{
+			int i;
+
+			for (i = 0; i < state->mistate->cur_slots; i++)
+				ExecClearTuple(state->mistate->slots[i]);
+		}
+
+		state->mistate->cur_slots = 0;
+		state->mistate->cur_size = 0;
+		state->mistate->cur_tup_size = -1;
+		state->mistate->flushed = true;
+	}
+	else
+		state->mistate->flushed = false;
+}
+
+/*
+ * Clean up the TableInsertState.
+ *
+ * For multi inserts, ensure to flush all the remaining buffers with
+ * heap_multi_insert_flush before calling this function. Buffered slots are
+ * dropped, short-lived memory context is delted and mistate is freed up.
+ *
+ * Free up the bulk insert state using FreeBulkInsertState() if exists.
+ *
+ * Free up TableInsertState.
+ */
+void heap_insert_end(TableInsertState *state)
+{
+	if (state->mistate)
+	{
+		int i;
+
+		/*
+		 * Ensure to flush all the remaining buffers with
+		 * heap_multi_insert_flush before calling heap_insert_end.
+		 */
+		Assert(state->mistate->cur_slots == 0);
+
+		for (i = 0; i < state->mistate->max_slots && state->mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(state->mistate->slots[i]);
+
+		if (state->mistate->context)
+			MemoryContextDelete(state->mistate->context);
+
+		pfree(state->mistate->slots);
+		pfree(state->mistate);
+	}
+
+	if (state->bistate)
+		FreeBulkInsertState(state->bistate);
+
+	pfree(state);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3eea215b85..eb3da12d9c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2554,6 +2554,11 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.multi_insert_v2 = heap_multi_insert_v2,
+	.multi_insert_flush = heap_multi_insert_flush,
+	.tuple_insert_end = heap_insert_end,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 58de0743ba..6bec0659e4 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -78,6 +78,13 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->tuple_complete_speculative != NULL);
 
 	Assert(routine->multi_insert != NULL);
+
+	Assert(routine->tuple_insert_begin != NULL);
+	Assert(routine->tuple_insert_v2 != NULL);
+	Assert(routine->multi_insert_v2 != NULL);
+	Assert(routine->multi_insert_flush != NULL);
+	Assert(routine->tuple_insert_end != NULL);
+
 	Assert(routine->tuple_delete != NULL);
 	Assert(routine->tuple_update != NULL);
 	Assert(routine->tuple_lock != NULL);
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..f93b1a49a8 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -159,7 +159,11 @@ tts_virtual_materialize(TupleTableSlot *slot)
 	if (TTS_SHOULDFREE(slot))
 		return;
 
-	/* compute size of memory required */
+	/*
+	 * Compute size of memory required. This size calculation code is also
+	 * being used in GetTupleSize(), hence ensure to have the same changes or
+	 * fixes here and also there.
+	 */
 	for (int natt = 0; natt < desc->natts; natt++)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, natt);
@@ -1239,6 +1243,83 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
 	pfree(slot);
 }
 
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ *
+ * Important Notes:
+ * 1) Size calculation code for virtual slots is being used from
+ * 	  tts_virtual_materialize(), hence ensure to have the same changes or fixes
+ * 	  here and also there.
+ * 2) Currently, GetTupleSize() handles the existing heap, buffer, minmal and
+ * 	  virtual slots. Ensure to add related code in case any new slot type is
+ *    introduced.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+	Size sz = 0;
+	HeapTuple tuple = NULL;
+
+	if (TTS_IS_HEAPTUPLE(slot))
+		tuple = ((HeapTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_BUFFERTUPLE(slot))
+		tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+	else if(TTS_IS_MINIMALTUPLE(slot))
+		tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_VIRTUAL(slot))
+	{
+		/*
+		 * Size calculation code is being used from tts_virtual_materialize().
+		 * Ensure to have the same changes or fixes here and also in
+		 * tts_virtual_materialize().
+		 */
+		TupleDesc	desc = slot->tts_tupleDescriptor;
+
+		for (int natt = 0; natt < desc->natts; natt++)
+		{
+			Form_pg_attribute att = TupleDescAttr(desc, natt);
+			Datum		val;
+
+			if (att->attbyval)
+				sz += att->attlen;
+
+			if (slot->tts_isnull[natt])
+				continue;
+
+			val = slot->tts_values[natt];
+
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz += EOH_get_flat_size(DatumGetEOHP(val));
+			}
+			else
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz = att_addlength_datum(sz, att->attlen, val);
+			}
+
+			/*
+			 * We are not interested in proceeding further if the computed size
+			 * crosses maxsize limit that we are looking for.
+			 */
+			if (maxsize != 0 && sz >= maxsize)
+				break;
+		}
+	}
+
+	if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+		sz = tuple->t_len;
+
+	return sz;
+}
 
 /* ----------------------------------------------------------------
  *				  tuple table slot accessor functions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 54b2eb7378..d938efbbc5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -139,6 +139,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState* heap_insert_begin(Relation rel, CommandId cid,
+										   int options, bool alloc_bistate,
+										   bool is_multi, int32 mi_max_slots,
+										   int64 mi_max_size);
+extern void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot);
+extern void heap_multi_insert_v2(TableInsertState *state,
+								 TupleTableSlot *slot);
+extern void heap_multi_insert_flush(TableInsertState *state);
+extern void heap_insert_end(TableInsertState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..60d4cd8c8b 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -128,6 +128,80 @@ typedef struct TM_FailureData
 	bool		traversed;
 } TM_FailureData;
 
+/* Holds the multi insert related information. */
+typedef struct TableMultiInsertState
+{
+	/* Switch to short-lived memory context before flushing. */
+	MemoryContext       context;
+
+	/* Array of buffered slots. */
+	TupleTableSlot      **slots;
+
+	/* Maximum number of slots that can be buffered. */
+	int32               max_slots;
+
+	/* Number of slots that are currently buffered. */
+	int32               cur_slots;
+
+	/*
+	 * Maximum size (in bytes) of all the tuples that a single batch of
+	 * buffered slots can hold. This parameter is optional.
+	 */
+	int64               max_size;
+
+	/*
+	 * Total tuple size (in bytes) of the slots that are currently buffered. If
+	 * max_size is specified, then flush the buffered slots when cur_size >=
+	 * max_size.
+	 */
+	int64               cur_size;
+
+	/*
+	 * Current tuple size (in bytes). Set this each time before calling
+	 * table_multi_insert_v2, if the tuple size is known (as in the case of
+	 * COPY where each tuple size known after parsing the input lines).
+	 * table_multi_insert_v2 will not calculate the tuple size again to add to
+	 * cur_size, it just uses this value. table_multi_insert_v2 will set it to
+	 * -1 after it uses. Default is -1.
+	 */
+	int64               cur_tup_size;
+
+	/*
+	 * Whether to clear the buffered slots after each flush? If the relation
+	 * has indexes or after row triggers, the buffered slots are required
+	 * outside table_multi_insert_v2(), in which case, clean them in the caller
+	 * using ExecClearTuple() outside the table_multi_insert_v2. If true,
+	 * which is default, table_multi_insert_v2() will clear the slots.
+	 *
+	 * It is good to set clear_slots (by looking at whether the table is having
+	 * any indexes or after row triggers) at the beginning of multi insert
+	 * operation. It is better to set it to false for the final flush before
+	 * ending the multi insert operation. This is to save ExecClearTuple cost
+	 * while flushing as the buffered slots will anyways be dropped in the end
+	 * operation.
+	 */
+	bool                clear_slots;
+
+	/*
+	 * Initially false, set to true by table_multi_insert whenever it flushes
+	 * the buffered slots. Caller can use this flag to insert into indexes or
+	 * execute after row triggers and so on if any.
+	 */
+	bool				flushed;
+} TableMultiInsertState;
+
+/* Holds the table insert state. */
+typedef struct TableInsertState
+{
+	Relation                rel;
+	/* Bulk insert state if requested, otherwise NULL. */
+	struct BulkInsertStateData     *bistate;
+	/* Mulit insert state if requested, otherwise NULL. */
+	struct TableMultiInsertState   *mistate;
+	CommandId               cid;
+	int                     options;
+}TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -376,6 +450,19 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState* (*tuple_insert_begin) (Relation rel, CommandId cid,
+											 int options, bool alloc_bistate,
+											 bool is_multi, int32 mi_max_slots,
+											 int64 mi_max_size);
+
+	void (*tuple_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_flush) (TableInsertState *state);
+
+	void (*tuple_insert_end) (TableInsertState *state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -1237,6 +1324,39 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState*
+table_insert_begin(Relation rel, CommandId cid, int options,
+				   bool alloc_bistate, bool is_multi, int32 mi_max_slots,
+				   int64 mi_max_size)
+{
+	return rel->rd_tableam->tuple_insert_begin(rel, cid, options, alloc_bistate,
+										is_multi, mi_max_slots, mi_max_size);
+}
+
+static inline void
+table_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->multi_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_flush(TableInsertState *state)
+{
+	state->rel->rd_tableam->multi_insert_flush(state);
+}
+
+static inline void
+table_insert_end(TableInsertState *state)
+{
+	state->rel->rd_tableam->tuple_insert_end(state);
+}
+
 /*
  * Delete a tuple.
  *
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..d7c284d8e3 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -330,6 +330,7 @@ extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
 								 int lastAttNum);
 extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
 
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
 
 #ifndef FRONTEND
 
-- 
2.25.1

