From b2d74947ffc762061e2335c802510f57d1af9a82 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Wed, 21 Apr 2021 17:12:24 +0800
Subject: [PATCH 3/3] check-parallel-safety-in-fmgr

---
 src/backend/access/transam/xact.c             |  26 +
 src/backend/executor/execExprInterp.c         |  26 +-
 src/backend/executor/execMain.c               |   3 +
 src/backend/optimizer/plan/planner.c          |  18 +-
 src/backend/utils/fmgr/fmgr.c                 |  28 +
 src/include/access/xact.h                     |   1 +
 src/include/fmgr.h                            |   5 +-
 src/test/regress/expected/insert_parallel.out | 530 ++++++++++++++++++
 src/test/regress/parallel_schedule            |   1 +
 src/test/regress/serial_schedule              |   1 +
 src/test/regress/sql/insert_parallel.sql      | 337 +++++++++++
 11 files changed, 952 insertions(+), 24 deletions(-)
 create mode 100644 src/test/regress/expected/insert_parallel.out
 create mode 100644 src/test/regress/sql/insert_parallel.sql

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 441445927e..2d68e4633a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1014,6 +1014,32 @@ IsInParallelMode(void)
 	return CurrentTransactionState->parallelModeLevel != 0;
 }
 
+/*
+ *	PrepareParallelModePlanExec
+ *
+ * Prepare for entering parallel mode plan execution, based on command-type.
+ */
+void
+PrepareParallelModePlanExec(CmdType commandType)
+{
+	if (IsModifySupportedInParallelMode(commandType))
+	{
+		Assert(!IsInParallelMode());
+
+		/*
+		 * Prepare for entering parallel mode by assigning a TransactionId.
+		 * Failure to do this now would result in heap_insert() subsequently
+		 * attempting to assign a TransactionId whilst in parallel-mode, which
+		 * is not allowed.
+		 *
+		 * This approach has a disadvantage in that if the underlying SELECT
+		 * does not return any rows, then the TransactionId is not used,
+		 * however that shouldn't happen in practice in many cases.
+		 */
+		(void) GetCurrentTransactionId();
+	}
+}
+
 /*
  *	CommandCounterIncrement
  */
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 094e22d392..c9e43ada7a 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -717,7 +717,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 			Datum		d;
 
 			fcinfo->isnull = false;
-			d = op->d.func.fn_addr(fcinfo);
+			d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 			*op->resvalue = d;
 			*op->resnull = fcinfo->isnull;
 
@@ -741,7 +741,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 				}
 			}
 			fcinfo->isnull = false;
-			d = op->d.func.fn_addr(fcinfo);
+			d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 			*op->resvalue = d;
 			*op->resnull = fcinfo->isnull;
 
@@ -1223,7 +1223,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 				Datum		eqresult;
 
 				fcinfo->isnull = false;
-				eqresult = op->d.func.fn_addr(fcinfo);
+				eqresult = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 				/* Must invert result of "="; safe to do even if null */
 				*op->resvalue = BoolGetDatum(!DatumGetBool(eqresult));
 				*op->resnull = fcinfo->isnull;
@@ -1252,7 +1252,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 				Datum		eqresult;
 
 				fcinfo->isnull = false;
-				eqresult = op->d.func.fn_addr(fcinfo);
+				eqresult = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 				*op->resvalue = eqresult;
 				*op->resnull = fcinfo->isnull;
 			}
@@ -1273,7 +1273,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 				Datum		result;
 
 				fcinfo->isnull = false;
-				result = op->d.func.fn_addr(fcinfo);
+				result = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 
 				/* if the arguments are equal return null */
 				if (!fcinfo->isnull && DatumGetBool(result))
@@ -1361,7 +1361,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
 
 			/* Apply comparison function */
 			fcinfo->isnull = false;
-			d = op->d.rowcompare_step.fn_addr(fcinfo);
+			d = FuncExprCallInvoke(op->d.rowcompare_step.fn_addr, fcinfo);
 			*op->resvalue = d;
 
 			/* force NULL result if NULL function result */
@@ -2152,7 +2152,7 @@ ExecJustApplyFuncToCase(ExprState *state, ExprContext *econtext, bool *isnull)
 		}
 	}
 	fcinfo->isnull = false;
-	d = op->d.func.fn_addr(fcinfo);
+	d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 	*isnull = fcinfo->isnull;
 	return d;
 }
@@ -2347,7 +2347,7 @@ ExecEvalFuncExprFusage(ExprState *state, ExprEvalStep *op,
 	pgstat_init_function_usage(fcinfo, &fcusage);
 
 	fcinfo->isnull = false;
-	d = op->d.func.fn_addr(fcinfo);
+	d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 	*op->resvalue = d;
 	*op->resnull = fcinfo->isnull;
 
@@ -2381,7 +2381,7 @@ ExecEvalFuncExprStrictFusage(ExprState *state, ExprEvalStep *op,
 	pgstat_init_function_usage(fcinfo, &fcusage);
 
 	fcinfo->isnull = false;
-	d = op->d.func.fn_addr(fcinfo);
+	d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo);
 	*op->resvalue = d;
 	*op->resnull = fcinfo->isnull;
 
@@ -3379,7 +3379,7 @@ ExecEvalScalarArrayOp(ExprState *state, ExprEvalStep *op)
 		else
 		{
 			fcinfo->isnull = false;
-			thisresult = op->d.scalararrayop.fn_addr(fcinfo);
+			thisresult = FuncExprCallInvoke(op->d.scalararrayop.fn_addr, fcinfo);
 		}
 
 		/* Combine results per OR or AND semantics */
@@ -3436,7 +3436,7 @@ saop_element_hash(struct saophash_hash *tb, Datum key)
 	fcinfo->args[0].value = key;
 	fcinfo->args[0].isnull = false;
 
-	hash = elements_tab->op->d.hashedscalararrayop.hash_fn_addr(fcinfo);
+	hash = FuncExprCallInvoke(elements_tab->op->d.hashedscalararrayop.hash_fn_addr, fcinfo);
 
 	return DatumGetUInt32(hash);
 }
@@ -3458,7 +3458,7 @@ saop_hash_element_match(struct saophash_hash *tb, Datum key1, Datum key2)
 	fcinfo->args[1].value = key2;
 	fcinfo->args[1].isnull = false;
 
-	result = elements_tab->op->d.hashedscalararrayop.fn_addr(fcinfo);
+	result = FuncExprCallInvoke(elements_tab->op->d.hashedscalararrayop.fn_addr, fcinfo);
 
 	return DatumGetBool(result);
 }
@@ -3619,7 +3619,7 @@ ExecEvalHashedScalarArrayOp(ExprState *state, ExprEvalStep *op, ExprContext *eco
 			fcinfo->args[1].value = (Datum) 0;
 			fcinfo->args[1].isnull = true;
 
-			result = op->d.hashedscalararrayop.fn_addr(fcinfo);
+			result = FuncExprCallInvoke(op->d.hashedscalararrayop.fn_addr, fcinfo);
 			resultnull = fcinfo->isnull;
 		}
 	}
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 2cf6dad768..3b339efbe0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1535,7 +1535,10 @@ ExecutePlan(EState *estate,
 
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
+	{
+		PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
 		EnterParallelMode();
+	}
 
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index dbc2827d20..7736813230 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -314,16 +314,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	/*
 	 * Assess whether it's feasible to use parallel mode for this query. We
 	 * can't do this in a standalone backend, or if the command will try to
-	 * modify any data, or if this is a cursor operation, or if GUCs are set
-	 * to values that don't permit parallelism, or if parallel-unsafe
-	 * functions are present in the query tree.
+	 * modify any data (except for Insert), or if this is a cursor operation,
+	 * or if GUCs are set to values that don't permit parallelism, or if
+	 * parallel-unsafe functions are present in the query tree.
 	 *
-	 * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
-	 * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
-	 * backend writes into a completely new table.  In the future, we can
-	 * extend it to allow workers to write into the table.  However, to allow
-	 * parallel updates and deletes, we have to solve other problems,
-	 * especially around combo CIDs.)
+	 * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
+	 * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
+	 * of now, only the leader backend writes into a completely new table. In
+	 * the future, we can extend it to allow workers to write into the table.
+	 * However, to allow parallel updates and deletes, we have to solve other
+	 * problems, especially around combo CIDs.)
 	 *
 	 * For now, we don't try to use parallel mode if we're running inside a
 	 * parallel worker.  We might eventually be able to relax this
diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c
index b6835c2c4c..1f95775011 100644
--- a/src/backend/utils/fmgr/fmgr.c
+++ b/src/backend/utils/fmgr/fmgr.c
@@ -16,6 +16,8 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
@@ -109,6 +111,30 @@ fmgr_lookupByName(const char *name)
 	return NULL;
 }
 
+/*
+ * Invoke a function given pointer to function or handler to be called
+ * and a filled-in FunctionCallInfoBaseData.
+ *
+ * Check function's parallel safety before invoking the funciton.
+ * If function are not allowed to be executed in parallel mode an error is raised.
+ */
+Datum
+FuncExprCallInvoke(PGFunction fn_addr, FunctionCallInfo fcinfo)
+{
+	char parallel_safety = fcinfo->flinfo->fn_parallel;
+
+	if (IsInParallelMode() &&
+	   ((IsParallelWorker() &&
+		parallel_safety == PROPARALLEL_RESTRICTED) ||
+		parallel_safety == PROPARALLEL_UNSAFE))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("parallel-safety execution violation of function \"%s\" (%c)",
+						get_func_name(fcinfo->flinfo->fn_oid), parallel_safety)));
+
+	return fn_addr(fcinfo);
+}
+
 /*
  * This routine fills a FmgrInfo struct, given the OID
  * of the function to be called.
@@ -174,6 +200,7 @@ fmgr_info_cxt_security(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt,
 		finfo->fn_stats = TRACK_FUNC_ALL;	/* ie, never track */
 		finfo->fn_addr = fbp->func;
 		finfo->fn_oid = functionId;
+		finfo->fn_parallel = PROPARALLEL_SAFE;
 		return;
 	}
 
@@ -186,6 +213,7 @@ fmgr_info_cxt_security(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt,
 	finfo->fn_nargs = procedureStruct->pronargs;
 	finfo->fn_strict = procedureStruct->proisstrict;
 	finfo->fn_retset = procedureStruct->proretset;
+	finfo->fn_parallel = procedureStruct->proparallel;
 
 	/*
 	 * If it has prosecdef set, non-null proconfig, or if a plugin wants to
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index c04e6a98d7..34cfaf542c 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -466,6 +466,7 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void PrepareParallelModePlanExec(CmdType commandType);
 
 /*
  * IsModifySupportedInParallelMode
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index ab7b85c86e..a747700818 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -64,6 +64,7 @@ typedef struct FmgrInfo
 	void	   *fn_extra;		/* extra space for use by handler */
 	MemoryContext fn_mcxt;		/* memory context to store fn_extra in */
 	fmNodePtr	fn_expr;		/* expression parse tree for call, or NULL */
+	char		fn_parallel;	/* parallel-safety: s/r/u */
 } FmgrInfo;
 
 /*
@@ -169,8 +170,8 @@ extern void fmgr_symbol(Oid functionId, char **mod, char **fn);
  * the fcinfo->isnull flag before each call, since callees are permitted to
  * assume that starts out false.
  */
-#define FunctionCallInvoke(fcinfo)	((* (fcinfo)->flinfo->fn_addr) (fcinfo))
-
+#define FunctionCallInvoke(fcinfo) FuncExprCallInvoke((fcinfo)->flinfo->fn_addr, fcinfo)
+Datum FuncExprCallInvoke(PGFunction fn_addr, FunctionCallInfo fcinfo);
 
 /*-------------------------------------------------------------------------
  *		Support macros to ease writing fmgr-compatible functions
diff --git a/src/test/regress/expected/insert_parallel.out b/src/test/regress/expected/insert_parallel.out
new file mode 100644
index 0000000000..9b9b397bfe
--- /dev/null
+++ b/src/test/regress/expected/insert_parallel.out
@@ -0,0 +1,530 @@
+--
+-- PARALLEL
+--
+--
+-- START: setup some tables and data needed by the tests.
+--
+-- Setup - index expressions test
+create function pg_class_relname(Oid)
+returns name language sql parallel unsafe
+as 'select relname from pg_class where $1 = oid';
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
+create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
+alter table names2 parallel safe;
+alter table names4 parallel safe;
+insert into names values
+    (1, 'albert', 'einstein'),
+    (2, 'niels', 'bohr'),
+    (3, 'erwin', 'schrodinger'),
+    (4, 'leonhard', 'euler'),
+    (5, 'stephen', 'hawking'),
+    (6, 'isaac', 'newton'),
+    (7, 'alan', 'turing'),
+    (8, 'richard', 'feynman');
+-- Setup - column default tests
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+	RETURN 5;
+end $$;
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+	RETURN 10;
+end $$;
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+	RETURN 20;
+end $$;
+create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
+create table test_data(a int);
+insert into test_data select * from generate_series(1,10);
+alter table testdef parallel safe;
+--
+-- END: setup some tables and data needed by the tests.
+--
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+create table para_insert_p1 (
+    unique1    int4 PRIMARY KEY,
+    stringu1    name
+);
+create table para_insert_f1 (
+    unique1    int4 REFERENCES para_insert_p1(unique1),
+    stringu1    name
+);
+alter table para_insert_p1 parallel safe;
+alter table para_insert_f1 parallel safe;
+-- Check FK trigger
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('para_insert_f1');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | r
+ pg_trigger       | r
+ pg_proc          | r
+ pg_trigger       | r
+(4 rows)
+
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_p1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+NOTICE:  truncate cascades to table "para_insert_f1"
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on para_insert_p1
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: tenk1.unique1
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+alter table test_data1 parallel safe;
+explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_data1
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+               Filter: (a = 10)
+(5 rows)
+
+insert into test_data1 select * from test_data where a = 10 returning a as data;
+ data 
+------
+   10
+(1 row)
+
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_f1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+alter table test_conflict_table parallel safe;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_conflict_table
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
+                      QUERY PLAN                      
+------------------------------------------------------
+ Insert on test_conflict_table
+   Conflict Resolution: UPDATE
+   Conflict Arbiter Indexes: test_conflict_table_pkey
+   ->  Seq Scan on test_data
+(4 rows)
+
+--
+-- Test INSERT with parallel-unsafe index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_index         | u
+(2 rows)
+
+alter table names2 parallel safe;
+insert into names2 select * from names returning *;
+ERROR:  parallel-safety execution violation of function "fullname_parallel_unsafe" (u)
+--
+-- Test INSERT with parallel-restricted index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | r
+ pg_index         | r
+(2 rows)
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+       QUERY PLAN        
+-------------------------
+ Insert on names5
+   ->  Seq Scan on names
+(2 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+alter table names6 parallel safe;
+explain (costs off) insert into names6 select * from names order by last_name returning *;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on names6
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names6 select * from names order by last_name returning *;
+ index | first_name |  last_name  
+-------+------------+-------------
+     2 | niels      | bohr
+     1 | albert     | einstein
+     4 | leonhard   | euler
+     8 | richard    | feynman
+     5 | stephen    | hawking
+     6 | isaac      | newton
+     3 | erwin      | schrodinger
+     7 | alan       | turing
+(8 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+alter table names7 parallel safe;
+explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on names7
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+ last_name_then_first_name 
+---------------------------
+ bohr, niels
+ einstein, albert
+ euler, leonhard
+ feynman, richard
+ hawking, stephen
+ newton, isaac
+ schrodinger, erwin
+ turing, alan
+(8 rows)
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+alter table temp_names parallel safe;
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('temp_names');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_class         | r
+(1 row)
+
+explain (costs off) insert into temp_names select * from names;
+               QUERY PLAN               
+----------------------------------------
+ Insert on temp_names
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+insert into temp_names select * from names;
+--
+-- Test INSERT with column defaults
+--
+--
+--
+-- Parallel INSERT with unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Parallel INSERT with restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on testdef
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+ a  | b  | c  | d  
+----+----+----+----
+  1 |  2 | 10 |  8
+  2 |  4 | 10 | 16
+  3 |  6 | 10 | 24
+  4 |  8 | 10 | 32
+  5 | 10 | 10 | 40
+  6 | 12 | 10 | 48
+  7 | 14 | 10 | 56
+  8 | 16 | 10 | 64
+  9 | 18 | 10 | 72
+ 10 | 20 | 10 | 80
+(10 rows)
+
+truncate testdef;
+--
+-- Parallel INSERT with restricted and unsafe column defaults, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+alter table parttable1 parallel safe;
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on parttable1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+ count 
+-------
+  5000
+(1 row)
+
+select count(*) from parttable1_2;
+ count 
+-------
+  5000
+(1 row)
+
+--
+-- Test table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+alter table table_check_b parallel safe;
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('table_check_b');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_constraint    | u
+(2 rows)
+
+insert into table_check_b select * from names;
+ERROR:  parallel-safety execution violation of function "check_b_unsafe" (u)
+--
+-- Test table with parallel-safe after stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+alter table names_with_safe_trigger parallel safe;
+create or replace function insert_after_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_safe';
+		return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_after_trigger_safe before insert on names_with_safe_trigger
+    for each statement execute procedure insert_after_trigger_safe();
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_safe_trigger');
+ pg_class_relname | proparallel 
+------------------+-------------
+(0 rows)
+
+insert into names_with_safe_trigger select * from names;
+NOTICE:  hello from insert_after_trigger_safe
+--
+-- Test table with parallel-unsafe after stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+alter table names_with_unsafe_trigger parallel safe;
+create or replace function insert_after_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_unsafe';
+		return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_after_trigger_unsafe before insert on names_with_unsafe_trigger
+    for each statement execute procedure insert_after_trigger_unsafe();
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_unsafe_trigger');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_trigger       | u
+(2 rows)
+
+insert into names_with_unsafe_trigger select * from names;
+ERROR:  parallel-safety execution violation of function "insert_after_trigger_unsafe" (u)
+--
+-- Test partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+alter table names_with_unsafe_trigger parallel safe;
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
+create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
+create trigger part_insert_after_trigger_unsafe before insert on part_unsafe_trigger_1
+    for each statement execute procedure insert_after_trigger_unsafe();
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('part_unsafe_trigger');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_trigger       | u
+(2 rows)
+
+insert into names_with_unsafe_trigger select * from names;
+ERROR:  parallel-safety execution violation of function "insert_after_trigger_unsafe" (u)
+--
+-- Test DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+create table dom_table_u (x inotnull_u, y int);
+-- Test DOMAIN column with parallel-unsafe CHECK constraint
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('dom_table_u');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_constraint    | u
+(2 rows)
+
+--
+-- Clean up anything not created in the transaction
+--
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index a091300857..c6741a98aa 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -95,6 +95,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 # run by itself so it can run parallel workers
 test: select_parallel
 test: write_parallel
+test: insert_parallel
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 5644847601..638b7a23d0 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -151,6 +151,7 @@ test: stats_ext
 test: collate.linux.utf8
 test: select_parallel
 test: write_parallel
+test: insert_parallel
 test: publication
 test: subscription
 test: select_views
diff --git a/src/test/regress/sql/insert_parallel.sql b/src/test/regress/sql/insert_parallel.sql
new file mode 100644
index 0000000000..b505a55caa
--- /dev/null
+++ b/src/test/regress/sql/insert_parallel.sql
@@ -0,0 +1,337 @@
+--
+-- PARALLEL
+--
+
+--
+-- START: setup some tables and data needed by the tests.
+--
+
+-- Setup - index expressions test
+
+create function pg_class_relname(Oid)
+returns name language sql parallel unsafe
+as 'select relname from pg_class where $1 = oid';
+
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+
+create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
+create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
+
+alter table names2 parallel safe;
+alter table names4 parallel safe;
+
+
+insert into names values
+    (1, 'albert', 'einstein'),
+    (2, 'niels', 'bohr'),
+    (3, 'erwin', 'schrodinger'),
+    (4, 'leonhard', 'euler'),
+    (5, 'stephen', 'hawking'),
+    (6, 'isaac', 'newton'),
+    (7, 'alan', 'turing'),
+    (8, 'richard', 'feynman');
+
+-- Setup - column default tests
+
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+	RETURN 5;
+end $$;
+
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+	RETURN 10;
+end $$;
+
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+	RETURN 20;
+end $$;
+
+create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
+create table test_data(a int);
+insert into test_data select * from generate_series(1,10);
+alter table testdef parallel safe;
+
+
+--
+-- END: setup some tables and data needed by the tests.
+--
+
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+
+create table para_insert_p1 (
+    unique1    int4 PRIMARY KEY,
+    stringu1    name
+);
+
+create table para_insert_f1 (
+    unique1    int4 REFERENCES para_insert_p1(unique1),
+    stringu1    name
+);
+
+alter table para_insert_p1 parallel safe;
+alter table para_insert_f1 parallel safe;
+
+-- Check FK trigger
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('para_insert_f1');
+
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+alter table test_data1 parallel safe;
+explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
+insert into test_data1 select * from test_data where a = 10 returning a as data;
+
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+alter table test_conflict_table parallel safe;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
+insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
+
+
+--
+-- Test INSERT with parallel-unsafe index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2');
+alter table names2 parallel safe;
+insert into names2 select * from names returning *;
+
+--
+-- Test INSERT with parallel-restricted index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4');
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+alter table names6 parallel safe;
+explain (costs off) insert into names6 select * from names order by last_name returning *;
+insert into names6 select * from names order by last_name returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+alter table names7 parallel safe;
+explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+alter table temp_names parallel safe;
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('temp_names');
+explain (costs off) insert into temp_names select * from names;
+insert into temp_names select * from names;
+
+--
+-- Test INSERT with column defaults
+--
+--
+
+--
+-- Parallel INSERT with unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+
+--
+-- Parallel INSERT with restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+truncate testdef;
+
+--
+-- Parallel INSERT with restricted and unsafe column defaults, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+
+alter table parttable1 parallel safe;
+
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+select count(*) from parttable1_2;
+
+--
+-- Test table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+alter table table_check_b parallel safe;
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('table_check_b');
+insert into table_check_b select * from names;
+
+--
+-- Test table with parallel-safe after stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+alter table names_with_safe_trigger parallel safe;
+
+create or replace function insert_after_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_safe';
+		return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_after_trigger_safe before insert on names_with_safe_trigger
+    for each statement execute procedure insert_after_trigger_safe();
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_safe_trigger');
+insert into names_with_safe_trigger select * from names;
+
+--
+-- Test table with parallel-unsafe after stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+alter table names_with_unsafe_trigger parallel safe;
+create or replace function insert_after_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_unsafe';
+		return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_after_trigger_unsafe before insert on names_with_unsafe_trigger
+    for each statement execute procedure insert_after_trigger_unsafe();
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_unsafe_trigger');
+insert into names_with_unsafe_trigger select * from names;
+
+--
+-- Test partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+alter table names_with_unsafe_trigger parallel safe;
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
+create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
+create trigger part_insert_after_trigger_unsafe before insert on part_unsafe_trigger_1
+    for each statement execute procedure insert_after_trigger_unsafe();
+
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('part_unsafe_trigger');
+insert into names_with_unsafe_trigger select * from names;
+
+--
+-- Test DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+
+create table dom_table_u (x inotnull_u, y int);
+
+
+-- Test DOMAIN column with parallel-unsafe CHECK constraint
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('dom_table_u');
+
+--
+-- Clean up anything not created in the transaction
+--
+
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;
-- 
2.18.4

