From 6252155dcdd0a21e15fec3d8e8fe77a9b2f2b29a Mon Sep 17 00:00:00 2001
From: Paul Guo <guopa@vmware.com>
Date: Tue, 10 Nov 2020 10:26:38 +0800
Subject: [PATCH v1] ctas using raw insert

---
 src/backend/access/heap/heapam_handler.c |  5 ++
 src/backend/access/heap/rewriteheap.c    | 84 ++++++++++++++++++++++++++++++++
 src/backend/commands/createas.c          | 10 +++-
 src/include/access/heapam.h              |  4 ++
 src/include/access/rewriteheap.h         |  2 +
 src/include/access/tableam.h             | 22 +++++++++
 6 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dcaea7135f..8caa5dfef9 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2529,6 +2529,11 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_raw_insert_begin = heap_raw_insert_begin,
+	.tuple_raw_insert = heap_raw_insert,
+	.tuple_raw_insert_end = heap_raw_insert_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 39e33763df..47f99ffa74 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -292,6 +292,90 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	return state;
 }
 
+#if 0
+typedef struct RawWriteStateData
+{
+	Relation	rs_new_rel;		/* destination heap */
+	Page		rs_buffer;		/* page currently being built */
+	BlockNumber rs_blockno;		/* block where page will go */
+	bool		rs_buffer_valid;	/* T if any tuples in buffer */
+	MemoryContext rs_cxt;
+} *RawWriteState;
+#endif
+
+void heap_raw_insert(RewriteState state, TupleTableSlot *slot)
+{
+	HeapTuple   tuple;
+	bool        shouldFree;
+
+	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	raw_heap_insert(state, tuple);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+RewriteState
+heap_raw_insert_begin(Relation new_heap)
+{
+	RewriteState state;
+	MemoryContext rw_cxt;
+	MemoryContext old_cxt;
+
+	rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
+								   "Heap raw write",
+								   ALLOCSET_DEFAULT_SIZES);
+	old_cxt = MemoryContextSwitchTo(rw_cxt);
+
+	/* Create and fill in the state struct */
+	state = palloc0(sizeof(RewriteStateData));
+
+	state->rs_new_rel = new_heap;
+	state->rs_buffer = (Page) palloc(BLCKSZ);
+	/* new_heap needn't be empty, just locked */
+	state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
+	state->rs_buffer_valid = false;
+	state->rs_cxt = rw_cxt;
+
+	MemoryContextSwitchTo(old_cxt);
+
+	return state;
+}
+
+void
+heap_raw_insert_end(RewriteState state)
+{
+	/* Write the last page, if any */
+	if (state->rs_buffer_valid)
+	{
+		if (RelationNeedsWAL(state->rs_new_rel))
+			log_newpage(&state->rs_new_rel->rd_node,
+						MAIN_FORKNUM,
+						state->rs_blockno,
+						state->rs_buffer,
+						true);
+		RelationOpenSmgr(state->rs_new_rel);
+
+		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
+
+		smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
+				   (char *) state->rs_buffer, true);
+	}
+
+	/*
+	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
+	 * reason is the same as in storage.c's RelationCopyStorage(): we're
+	 * writing data that's not in shared buffers, and so a CHECKPOINT
+	 * occurring during the rewriteheap operation won't have fsync'd data we
+	 * wrote before the checkpoint.
+	 */
+	if (RelationNeedsWAL(state->rs_new_rel))
+		smgrimmedsync(state->rs_new_rel->rd_smgr, MAIN_FORKNUM); /* test the time */
+
+	/* Deleting the context frees everything */
+	MemoryContextDelete(state->rs_cxt);
+}
+
 /*
  * End a rewrite.
  *
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..723226f029 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,7 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	RewriteState state;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -549,11 +550,13 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->rel = intoRelationDesc;
+	myState->state = table_tuple_raw_insert_begin(intoRelationDesc);
 	myState->reladdr = intoRelationAddr;
 	myState->output_cid = GetCurrentCommandId(true);
 	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 	myState->bistate = GetBulkInsertState();
 
+
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
 	 * This may be harmless, but this function hasn't planned for it.
@@ -578,12 +581,14 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * tuple's xmin), but since we don't do that here...
 	 */
 
+	table_tuple_raw_insert(myState->rel, myState->state, slot);
+#if 0
 	table_tuple_insert(myState->rel,
 					   slot,
 					   myState->output_cid,
 					   myState->ti_options,
 					   myState->bistate);
-
+#endif
 	/* We know this is a newly created relation, so there are no indexes */
 
 	return true;
@@ -599,7 +604,10 @@ intorel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
+#if 0
 	table_finish_bulk_insert(myState->rel, myState->ti_options);
+#endif
+	table_tuple_raw_insert_end(myState->rel, myState->state);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 92b19dba32..4b05d963c2 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -135,6 +135,10 @@ extern BulkInsertState GetBulkInsertState(void);
 extern void FreeBulkInsertState(BulkInsertState);
 extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
+extern  void heap_raw_insert(RewriteState state, TupleTableSlot *slot);
+extern RewriteState heap_raw_insert_begin(Relation new_heap);
+extern void heap_raw_insert_end(RewriteState state);
+  
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 						int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index e6d7fa1e65..17fa31f066 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -25,6 +25,8 @@ extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap,
 									   TransactionId OldestXmin, TransactionId FreezeXid,
 									   MultiXactId MultiXactCutoff);
 extern void end_heap_rewrite(RewriteState state);
+extern RewriteState heap_raw_insert_begin(Relation new_heap);
+extern void heap_raw_insert_end(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
 							   HeapTuple newTuple);
 extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..75e245bd56 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -23,6 +23,7 @@
 #include "utils/guc.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
+#include "access/rewriteheap.h"
 
 
 #define DEFAULT_TABLE_ACCESS_METHOD	"heap"
@@ -352,6 +353,9 @@ typedef struct TableAmRoutine
 	 * Manipulations of physical tuples.
 	 * ------------------------------------------------------------------------
 	 */
+	RewriteState (*tuple_raw_insert_begin)(Relation new_heap);
+	void (*tuple_raw_insert_end) (RewriteState state);
+	void (*tuple_raw_insert)(RewriteState state, TupleTableSlot *slot);
 
 	/* see table_tuple_insert() for reference about parameters */
 	void		(*tuple_insert) (Relation rel, TupleTableSlot *slot,
@@ -1140,6 +1144,24 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  * ----------------------------------------------------------------------------
  */
 
+static inline RewriteState
+table_tuple_raw_insert_begin(Relation rel)
+{
+	return rel->rd_tableam->tuple_raw_insert_begin(rel);
+}
+
+static inline void
+table_tuple_raw_insert(Relation rel, RewriteState state, TupleTableSlot *slot)
+{
+	rel->rd_tableam->tuple_raw_insert(state, slot);
+}
+
+static inline void
+table_tuple_raw_insert_end(Relation rel, RewriteState state)
+{
+	rel->rd_tableam->tuple_raw_insert_end(state);
+}
+
 /*
  * Insert a tuple from a slot into table AM routine.
  *
-- 
2.14.3

