From e5cb1c34706ef21c8942087511726d9c06e1fd65 Mon Sep 17 00:00:00 2001
From: root <root@localhost.localdomain>
Date: Tue, 15 Dec 2020 07:14:21 -0500
Subject: [PATCH 1/2] support pctas in append parallel inserts

---
 src/backend/commands/createas.c | 35 +++++++++++++++++++++++++++++++----
 src/backend/commands/explain.c  |  3 +--
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 3ffea41..6e6c467 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -351,9 +351,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		 * into the target table. We need plan state to be initialized by the
 		 * executor to decide whether to allow parallel inserts or not.
 		 */
-		if (IsParallelInsertInCTASAllowed(into, queryDesc,
-										  &query->CTASParallelInsInfo))
-			SetCTASParallelInsertState(queryDesc);
+		IsParallelInsertInCTASAllowed(into, queryDesc, &query->CTASParallelInsInfo);
 
 		/* run the plan to completion */
 		ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
@@ -663,6 +661,35 @@ intorel_destroy(DestReceiver *self)
 	pfree(self);
 }
 
+static bool
+PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps)
+{
+	bool parallel = false;
+	
+	if(ps == NULL)
+		return parallel;
+
+	if(IsA(ps, AppendState))
+	{
+		AppendState *aps = (AppendState *) ps;
+		for(int i = 0; i < aps->as_nplans; i++)
+		{
+			parallel |= PushDownCTASParallelInsertState(dest, aps->appendplans[i]);
+		}
+	}
+	else if(IsA(ps, GatherState) && !ps->ps_ProjInfo)
+	{
+		GatherState *gstate = (GatherState *) ps;
+		parallel = true;
+
+		((DR_intorel *) dest)->is_parallel = true;
+		gstate->dest = dest;
+		ps->plan->plan_rows = 0;
+	}
+
+	return parallel;
+}
+
 /*
  * IsParallelInsertInCTASAllowed --- determine whether or not parallel
  * insertion is possible.
@@ -698,7 +725,7 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc,
 		 * final phase i.e. merge the results by workers, so we do not allow
 		 * parallel inserts.
 		 */
-		allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo;
+		allow = PushDownCTASParallelInsertState(queryDesc->dest, ps); 
 
 		/*
 		 * It should not happen that in cost_gather we have ignored the
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index d0152de..136f6f4 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -570,8 +570,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 * target table. We need plan state to be initialized by the executor to
 	 * decide whether to allow parallel inserts or not.
 	 */
-	if (IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags))
-		SetCTASParallelInsertState(queryDesc);
+	IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags);
 
 	/* Execute the plan for statistics if asked for */
 	if (es->analyze)
-- 
1.8.3.1

