From 202024c8d24ac9fa393b8e3b36bfdcc14d7bae0e Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Wed, 21 Apr 2021 19:21:11 +0800
Subject: [PATCH] get-parallel-safety-function

provide a utility function pg_get_parallel_safety('table_name') that returns records of
(objid, classid, parallel_safety) that represent the parallel safety of objects that
determine the parallel safety of the specified table.  The user can use this function
to identify problematic objects when a parallel DML fails or is not parallelized in an expected manner.

When detecting an parallel unsafe/restricted function in index(or others can have an expression),
return both the function oid and the index oid.


---
 src/backend/optimizer/plan/planner.c |   3 +-
 src/backend/optimizer/util/clauses.c | 600 ++++++++++++++++++++++++++-
 src/backend/utils/adt/misc.c         |  75 ++++
 src/backend/utils/cache/typcache.c   |  14 +
 src/include/access/xact.h            |  14 +
 src/include/catalog/pg_proc.dat      |  18 +-
 src/include/optimizer/clauses.h      |   9 +
 src/include/utils/typcache.h         |   2 +
 8 files changed, 729 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 1868c4eff4..dbc2827d20 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -332,7 +332,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 */
 	if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
 		IsUnderPostmaster &&
-		parse->commandType == CMD_SELECT &&
+		(parse->commandType == CMD_SELECT ||
+		is_parallel_allowed_for_modify(parse)) &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
 		!IsParallelWorker())
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index d9ad4efc5e..3ec0bf26cb 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -19,13 +19,20 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/table.h"
+#include "access/xact.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/functions.h"
 #include "funcapi.h"
@@ -43,6 +50,9 @@
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
+#include "parser/parsetree.h"
+#include "partitioning/partdesc.h"
+#include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
@@ -51,6 +61,8 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -88,6 +100,9 @@ typedef struct
 	char		max_hazard;		/* worst proparallel hazard found so far */
 	char		max_interesting;	/* worst proparallel hazard of interest */
 	List	   *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+	bool		check_all;
+	List	   *func_oids;
+	PartitionDirectory partition_directory;
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -98,6 +113,20 @@ static bool contain_volatile_functions_walker(Node *node, void *context);
 static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
 static bool max_parallel_hazard_walker(Node *node,
 									   max_parallel_hazard_context *context);
+static List *target_rel_all_parallel_hazard_recurse(Relation relation,
+												   max_parallel_hazard_context *context);
+static List *target_rel_trigger_max_parallel_hazard(Relation rel,
+												   max_parallel_hazard_context *context);
+static List *index_expr_max_parallel_hazard(Relation index_rel, List *ii_Expressions, List *ii_Predicate,
+											bool check_all, char max_interesting, max_parallel_hazard_context *context);
+static List *target_rel_index_max_parallel_hazard(Relation rel,
+												 max_parallel_hazard_context *context);
+static List *target_rel_domain_max_parallel_hazard(Oid typid,
+												  max_parallel_hazard_context *context);
+static List *target_rel_partitions_max_parallel_hazard(Relation rel,
+													  max_parallel_hazard_context *context);
+static List *target_rel_chk_constr_max_parallel_hazard(Relation rel,
+													  max_parallel_hazard_context *context);
 static bool contain_nonstrict_functions_walker(Node *node, void *context);
 static bool contain_exec_param_walker(Node *node, List *param_ids);
 static bool contain_context_dependent_node(Node *clause);
@@ -149,6 +178,7 @@ static Query *substitute_actual_srf_parameters(Query *expr,
 static Node *substitute_actual_srf_parameters_mutator(Node *node,
 													  substitute_actual_srf_parameters_context *context);
 
+static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel);
 
 /*****************************************************************************
  *		Aggregate-function clause manipulation
@@ -620,6 +650,10 @@ max_parallel_hazard(Query *parse)
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_UNSAFE;
 	context.safe_param_ids = NIL;
+	context.check_all = false;
+	context.func_oids = NIL;
+	context.partition_directory = NULL;
+
 	(void) max_parallel_hazard_walker((Node *) parse, &context);
 	return context.max_hazard;
 }
@@ -651,6 +685,9 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_RESTRICTED;
 	context.safe_param_ids = NIL;
+	context.check_all = false;
+	context.func_oids = NIL;
+	context.partition_directory = NULL;
 
 	/*
 	 * The params that refer to the same or parent query level are considered
@@ -682,7 +719,7 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context)
 			break;
 		case PROPARALLEL_RESTRICTED:
 			/* increase max_hazard to RESTRICTED */
-			Assert(context->max_hazard != PROPARALLEL_UNSAFE);
+			Assert(context->check_all || context->max_hazard != PROPARALLEL_UNSAFE);
 			context->max_hazard = proparallel;
 			/* done if we are not expecting any unsafe functions */
 			if (context->max_interesting == proparallel)
@@ -699,6 +736,63 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context)
 	return false;
 }
 
+
+static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel)
+{
+	safety_object *object = (safety_object *) palloc(sizeof(safety_object));
+
+	object->objid = objid;
+	object->classid = classid;
+	object->proparallel = proparallel;
+
+	return object;
+}
+
+static bool
+parallel_safety_checker(Oid func_id, void *context)
+{
+	char proparallel;
+	bool max_hazard_found;
+	max_parallel_hazard_context *cont = (max_parallel_hazard_context *) context;
+
+	proparallel = func_parallel(func_id);
+	max_hazard_found = max_parallel_hazard_test(proparallel, cont);
+
+	if ((proparallel != PROPARALLEL_SAFE && cont->check_all) ||
+		max_hazard_found)
+	{
+		cont->func_oids = lappend(cont->func_oids,
+			make_safety_object(func_id, ProcedureRelationId, proparallel));
+	}
+
+	return max_hazard_found && !cont->check_all;
+}
+
+/* Check parallel unsafe/restricted function in expression */
+static bool
+parallel_safety_walker(Node *node, max_parallel_hazard_context *context)
+{
+	if (node == NULL)
+		return false;
+
+	/* Check for hazardous functions in node itself */
+	if (check_functions_in_node(node, parallel_safety_checker,
+								context))
+		return true;
+
+	if (IsA(node, CoerceToDomain))
+	{
+		if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *)node)->resulttype, context) != NIL &&
+		   !context->check_all)
+			return true;
+	}
+
+	/* Recurse to check arguments */
+	return expression_tree_walker(node,
+								  parallel_safety_walker,
+								  context);
+}
+
 /* check_functions_in_node callback */
 static bool
 max_parallel_hazard_checker(Oid func_id, void *context)
@@ -854,6 +948,510 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 								  context);
 }
 
+List*
+target_rel_max_parallel_hazard(RangeVar *relrv, bool findall, char max_interesting)
+{
+	max_parallel_hazard_context context;
+	Relation	targetRel;
+	List	   *objects;
+
+	context.check_all = findall;
+	context.func_oids = NIL;
+	context.max_hazard = PROPARALLEL_SAFE;
+	context.max_interesting = max_interesting;
+	context.safe_param_ids = NIL;
+	context.partition_directory = NULL;
+
+	targetRel = table_openrv(relrv, AccessShareLock);
+
+	objects = target_rel_all_parallel_hazard_recurse(targetRel, &context);
+	if (context.partition_directory)
+		DestroyPartitionDirectory(context.partition_directory);
+
+	table_close(targetRel, AccessShareLock);
+
+	return objects;
+}
+
+
+List *
+target_rel_all_parallel_hazard_recurse(Relation rel, max_parallel_hazard_context *context)
+{
+	TupleDesc	tupdesc;
+	int			attnum;
+
+	Assert(context != NULL && context->check_all);
+
+	/*
+	 * We can't support table modification in a parallel worker if it's a
+	 * foreign table/partition (no FDW API for supporting parallel access) or
+	 * a temporary table.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context);
+		context->func_oids = lappend(context->func_oids,
+			make_safety_object(rel->rd_rel->oid, RelationRelationId, PROPARALLEL_RESTRICTED));
+	}
+
+	/*
+	 * If a partitioned table, check that each partition is safe for
+	 * modification in parallel-mode.
+	 */
+	(void) target_rel_partitions_max_parallel_hazard(rel, context);
+
+	/*
+	 * If there are any index expressions or index predicate, check that they
+	 * are parallel-mode safe.
+	 */
+	(void) target_rel_index_max_parallel_hazard(rel, context);
+
+	/*
+	 * If any triggers exist, check that they are parallel-safe.
+	 */
+	(void) target_rel_trigger_max_parallel_hazard(rel, context);
+
+	/*
+	 * Column default expressions are only applicable to INSERT and UPDATE.
+	 * Note that even though column defaults may be specified separately for
+	 * each partition in a partitioned table, a partition's default value is
+	 * not applied when inserting a tuple through a partitioned table.
+	 */
+
+	tupdesc = RelationGetDescr(rel);
+	for (attnum = 0; attnum < tupdesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+		/* We don't need info for dropped or generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		if (att->atthasdef)
+		{
+			Node	   *defaultexpr;
+			defaultexpr = build_column_default(rel, attnum);
+			parallel_safety_walker((Node *) defaultexpr, context);
+		}
+
+		/*
+		 * If the column is of a DOMAIN type, determine whether that
+		 * domain has any CHECK expressions that are not parallel-mode
+		 * safe.
+		 */
+		if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+		{
+			(void) target_rel_domain_max_parallel_hazard(att->atttypid, context);
+		}
+	}
+
+	/*
+	 * CHECK constraints are only applicable to INSERT and UPDATE. If any
+	 * CHECK constraints exist, determine if they are parallel-safe.
+	 */
+	(void) target_rel_chk_constr_max_parallel_hazard(rel, context);
+
+	return context->func_oids;
+}
+
+/*
+ * target_rel_trigger_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified relation's
+ * trigger data.
+ */
+static List*
+target_rel_trigger_max_parallel_hazard(Relation rel,
+									   max_parallel_hazard_context *context)
+{
+	int		i;
+	char	proparallel;
+
+	Assert(context != NULL && context->check_all);
+
+	if (rel->trigdesc == NULL)
+		return context->func_oids;
+
+	/*
+	 * Care is needed here to avoid using the same relcache TriggerDesc field
+	 * across other cache accesses, because relcache doesn't guarantee that it
+	 * won't move.
+	 */
+	for (i = 0; i < rel->trigdesc->numtriggers; i++)
+	{
+		Oid			tgfoid = rel->trigdesc->triggers[i].tgfoid;
+		Oid			tgoid = rel->trigdesc->triggers[i].tgoid;
+
+		proparallel = func_parallel(tgfoid);
+
+		if (proparallel != PROPARALLEL_SAFE)
+		{
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(tgfoid, ProcedureRelationId, proparallel));
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(tgoid, TriggerRelationId, proparallel));
+		}
+	}
+
+	return context->func_oids;
+}
+
+static List*
+index_expr_max_parallel_hazard(Relation index_rel,
+						List *ii_Expressions, List *ii_Predicate,
+						bool check_all, char max_interesting,
+						max_parallel_hazard_context *context)
+{
+	int indnatts;
+	int nsupport;
+	Form_pg_index indexStruct;
+	int			i;
+	ListCell   *index_expr_item;
+
+	Assert(context != NULL && context->check_all);
+
+	indexStruct = index_rel->rd_index;
+	index_expr_item = list_head(ii_Expressions);
+
+	if (ii_Expressions != NIL)
+	{
+		for (i = 0; i < indexStruct->indnatts; i++)
+		{
+			int			keycol = indexStruct->indkey.values[i];
+
+			if (keycol == 0)
+			{
+				/* Found an index expression */
+				Node	   *index_expr;
+
+				Assert(index_expr_item != NULL);
+				if (index_expr_item == NULL)	/* shouldn't happen */
+					elog(ERROR, "too few entries in indexprs list");
+
+				index_expr = (Node *) lfirst(index_expr_item);
+
+				/* find some not safe objects */
+				parallel_safety_walker(index_expr, context);
+				index_expr_item = lnext(ii_Expressions, index_expr_item);
+			}
+		}
+	}
+
+	if (ii_Predicate != NIL)
+		parallel_safety_walker((Node *) ii_Predicate, context);
+
+	/*
+	 * Check parallel-safety of any index AM support functions.
+	 */
+	indnatts = IndexRelationGetNumberOfAttributes(index_rel);
+	nsupport = indnatts * index_rel->rd_indam->amsupport;
+	if (nsupport > 0)
+	{
+		for (i = 0; i < nsupport; i++)
+		{
+			char proparallel;
+
+			Oid funcOid = index_rel->rd_support[i];
+			if (!OidIsValid(funcOid))
+				continue;
+
+			proparallel = func_parallel(funcOid);
+			if (proparallel != PROPARALLEL_SAFE)
+			{
+				context->func_oids = lappend(context->func_oids,
+					make_safety_object(funcOid, ProcedureRelationId, proparallel));
+			}
+		}
+	}
+
+	return context->func_oids;
+}
+
+/*
+ * target_rel_index_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any existing index
+ * expressions or index predicate of a specified relation.
+ */
+static List*
+target_rel_index_max_parallel_hazard(Relation rel, max_parallel_hazard_context *context)
+{
+	List	   *index_oid_list;
+	ListCell   *lc;
+	LOCKMODE	lockmode = AccessShareLock;
+
+	Assert(context != NULL && context->check_all);
+
+	index_oid_list = RelationGetIndexList(rel);
+	foreach(lc, index_oid_list)
+	{
+		Relation	index_rel;
+		List	   *ii_Expressions;
+		List	   *ii_Predicate;
+		List	   *temp_objects;
+		Oid			index_oid = lfirst_oid(lc);
+
+		temp_objects = context->func_oids;
+		context->func_oids = NIL;
+		context->max_hazard = PROPARALLEL_SAFE;
+
+		index_rel = index_open(index_oid, lockmode);
+
+		/* Check index expression */
+		ii_Expressions = RelationGetIndexExpressions(index_rel);
+		ii_Predicate = RelationGetIndexPredicate(index_rel);
+
+		index_expr_max_parallel_hazard(index_rel, ii_Expressions,
+													 ii_Predicate, context->check_all,
+													 context->max_interesting,
+													 context);
+
+		/* Add the index itself to the objects list */
+		if (context->func_oids != NIL)
+		{
+			context->func_oids = lappend(context->func_oids,
+			make_safety_object(index_oid, IndexRelationId, context->max_hazard));
+		}
+
+		context->func_oids = list_concat(context->func_oids, temp_objects);
+		list_free(temp_objects);
+		index_close(index_rel, lockmode);
+	}
+
+	list_free(index_oid_list);
+
+	return context->func_oids;
+}
+
+/*
+ * target_rel_domain_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified DOMAIN type.
+ * Only any CHECK expressions are examined for parallel-safety.
+ */
+static List*
+target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context)
+{
+	ListCell *lc;
+	List *domain_list;
+	List       *temp_objects;
+
+	Assert(context != NULL && context->check_all);
+
+	domain_list = GetDomainConstraints(typid);
+
+	foreach(lc, domain_list)
+	{
+		DomainConstraintState *r = (DomainConstraintState *) lfirst(lc);
+
+		temp_objects = context->func_oids;
+		context->func_oids = NIL;
+		context->max_hazard = PROPARALLEL_SAFE;
+
+		parallel_safety_walker((Node *) r->check_expr, context);
+
+		/* Add the Constraint itself to the objects list */
+		if (context->func_oids != NIL)
+		{
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(get_domain_constraint_oid(typid, r->name, false),
+								   ConstraintRelationId,
+								   context->max_hazard));
+		}
+
+		context->func_oids = list_concat(context->func_oids, temp_objects);
+		list_free(temp_objects);
+	}
+
+	return context->func_oids;
+
+}
+
+/*
+ * target_rel_partitions_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any partitions of a
+ * of a specified relation.
+ */
+static List*
+target_rel_partitions_max_parallel_hazard(Relation rel,
+										  max_parallel_hazard_context *context)
+{
+	int			i;
+	PartitionDesc pdesc;
+	PartitionKey pkey;
+	ListCell   *partexprs_item;
+	int			partnatts;
+	List	   *partexprs, *qual;
+
+	Assert(context != NULL && context->check_all);
+
+	/* Check partition check expression */
+	qual = RelationGetPartitionQual(rel);
+	parallel_safety_walker((Node *) qual, context);
+
+	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		return context->func_oids;
+
+	pkey = RelationGetPartitionKey(rel);
+
+	partnatts = get_partition_natts(pkey);
+	partexprs = get_partition_exprs(pkey);
+
+	partexprs_item = list_head(partexprs);
+	for (i = 0; i < partnatts; i++)
+	{
+		Oid funcOid = pkey->partsupfunc[i].fn_oid;
+		if (OidIsValid(funcOid))
+		{
+			char proparallel = func_parallel(funcOid);
+			max_parallel_hazard_test(proparallel, context);
+
+			if (proparallel != PROPARALLEL_SAFE)
+				context->func_oids = lappend(context->func_oids,
+					make_safety_object(funcOid, ProcedureRelationId, proparallel));
+		}
+		/* Check parallel-safety of any expressions in the partition key */
+		if (get_partition_col_attnum(pkey, i) == 0)
+		{
+			Node	   *check_expr = (Node *) lfirst(partexprs_item);
+
+			parallel_safety_walker(check_expr, context);
+			partexprs_item = lnext(partexprs, partexprs_item);
+		}
+	}
+
+	/* Recursively check each partition ... */
+
+	/* Create the PartitionDirectory infrastructure if we didn't already */
+	if (context->partition_directory == NULL)
+		context->partition_directory =
+			CreatePartitionDirectory(CurrentMemoryContext, false);
+
+	pdesc = PartitionDirectoryLookup(context->partition_directory, rel);
+
+	for (i = 0; i < pdesc->nparts; i++)
+	{
+		Relation	part_rel;
+
+		part_rel = table_open(pdesc->oids[i], AccessShareLock);
+		(void) target_rel_all_parallel_hazard_recurse(part_rel, context);
+		table_close(part_rel, AccessShareLock);
+	}
+
+	return context->func_oids;
+}
+
+/*
+ * target_rel_chk_constr_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any CHECK expressions or
+ * CHECK constraints related to the specified relation.
+ */
+static List*
+target_rel_chk_constr_max_parallel_hazard(Relation rel,
+										  max_parallel_hazard_context *context)
+{
+	TupleDesc	tupdesc;
+	List	   *temp_objects;
+
+	Assert(context != NULL && context->check_all);
+
+	tupdesc = RelationGetDescr(rel);
+
+	/*
+	 * Determine if there are any CHECK constraints which are not
+	 * parallel-safe.
+	 */
+	if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+	{
+		int			i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr	   *check_expr = stringToNode(check[i].ccbin);
+
+			temp_objects = context->func_oids;
+			context->func_oids = NIL;
+			context->max_hazard = PROPARALLEL_SAFE;
+
+			parallel_safety_walker((Node *) check_expr, context);
+
+			if (context->func_oids != NIL)
+			{
+				context->func_oids = lappend(context->func_oids,
+				make_safety_object(get_relation_constraint_oid(rel->rd_rel->oid, check->ccname, true), ConstraintRelationId, context->max_hazard));
+			}
+
+			context->func_oids = list_concat(context->func_oids, temp_objects);
+			list_free(temp_objects);
+		}
+	}
+
+	return context->func_oids;
+}
+
+/*
+ * is_parallel_allowed_for_modify
+ *
+ * Check at a high-level if parallel mode is able to be used for the specified
+ * table-modification statement. Currently, we support only Inserts.
+ *
+ * It's not possible in the following cases:
+ *
+ *  1) INSERT...ON CONFLICT...DO UPDATE
+ *  2) INSERT without SELECT
+ *
+ * (Note: we don't do in-depth parallel-safety checks here, we do only the
+ * cheaper tests that can quickly exclude obvious cases for which
+ * parallelism isn't supported, to avoid having to do further parallel-safety
+ * checks for these)
+ */
+bool
+is_parallel_allowed_for_modify(Query *parse)
+{
+	bool		hasSubQuery;
+	RangeTblEntry *rte;
+	ListCell   *lc;
+
+	if (!IsModifySupportedInParallelMode(parse->commandType))
+		return false;
+
+	/*
+	 * UPDATE is not currently supported in parallel-mode, so prohibit
+	 * INSERT...ON CONFLICT...DO UPDATE...
+	 *
+	 * In order to support update, even if only in the leader, some further
+	 * work would need to be done. A mechanism would be needed for sharing
+	 * combo-cids between leader and workers during parallel-mode, since for
+	 * example, the leader might generate a combo-cid and it needs to be
+	 * propagated to the workers.
+	 */
+	if (parse->commandType == CMD_INSERT &&
+		parse->onConflict != NULL &&
+		parse->onConflict->action == ONCONFLICT_UPDATE)
+		return false;
+
+	/*
+	 * If there is no underlying SELECT, a parallel insert operation is not
+	 * desirable.
+	 */
+	hasSubQuery = false;
+	foreach(lc, parse->rtable)
+	{
+		rte = lfirst_node(RangeTblEntry, lc);
+		if (rte->rtekind == RTE_SUBQUERY)
+		{
+			hasSubQuery = true;
+			break;
+		}
+	}
+
+	return hasSubQuery;
+}
 
 /*****************************************************************************
  *		Check clauses for nonstrict functions
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 88faf4dfd7..33bc6b602d 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -23,6 +23,8 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/catalog.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
 #include "catalog/system_fk_info.h"
@@ -31,6 +33,7 @@
 #include "common/keywords.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "parser/scansup.h"
 #include "pgstat.h"
 #include "postmaster/syslogger.h"
@@ -43,6 +46,7 @@
 #include "utils/lsyscache.h"
 #include "utils/ruleutils.h"
 #include "utils/timestamp.h"
+#include "utils/varlena.h"
 
 /*
  * Common subroutine for num_nulls() and num_nonnulls().
@@ -605,6 +609,77 @@ pg_collation_for(PG_FUNCTION_ARGS)
 	PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid)));
 }
 
+/*
+ * Determine whether the target relation is safe to execute parallel modification.
+ *
+ * Return all the PARALLEL RESTRICTED/UNSAFE objects.
+ */
+Datum
+pg_get_parallel_safety(PG_FUNCTION_ARGS)
+{
+#define PG_GET_PARALLEL_SAFETY_COLS	3
+	List	   *objects;
+	ListCell   *object;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	RangeVar *relvar;
+	text	   *relname_text;
+	ReturnSetInfo *rsinfo;
+
+	relname_text = PG_GETARG_TEXT_PP(0);
+	rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+
+	MemoryContextSwitchTo(oldcontext);
+
+	relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
+	objects = target_rel_max_parallel_hazard(relvar, true, PROPARALLEL_UNSAFE);
+	foreach(object, objects)
+	{
+		Datum		values[PG_GET_PARALLEL_SAFETY_COLS];
+		bool		nulls[PG_GET_PARALLEL_SAFETY_COLS];
+		safety_object *sobject = (safety_object *) lfirst(object);
+
+		memset(nulls, 0, sizeof(nulls));
+
+		values[0] = sobject->objid;
+		values[1] = sobject->classid;
+		values[2] = sobject->proparallel;
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	/* clean up and return the tuplestore */
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
 
 /*
  * pg_relation_is_updatable - determine which update events the specified
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index 4915ef5934..260f5d45c8 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -2518,6 +2518,20 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2)
 		return 0;
 }
 
+
+List *GetDomainConstraints(Oid type_id)
+{
+	TypeCacheEntry *typentry;
+	List		   *constraints = NIL;
+
+	typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);
+
+	if(typentry->domainData != NULL)
+		constraints = typentry->domainData->constraints;
+
+	return constraints;
+}
+
 /*
  * Load (or re-load) the enumData member of the typcache entry.
  */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index f49a57b35e..c04e6a98d7 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -467,4 +467,18 @@ extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
 
+/*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * parallel-safety conditions.
+ */
+static inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+	/* Currently only INSERT is supported */
+	return (commandType == CMD_INSERT);
+}
+
 #endif							/* XACT_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b62abcd22c..a511a03021 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3765,6 +3765,16 @@
   provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass',
   prosrc => 'pg_get_replica_identity_index' },
 
+{ oid => '6122',
+  descr => 'information about the parallel unsafe or restricted objects in the target table',
+  proname => 'pg_get_parallel_safety', prorows => '100',
+  proretset => 't', provolatile => 'v', proparallel => 'u',
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,oid,oid,char}',
+  proargmodes => '{i,o,o,o}',
+  proargnames => '{table_name, objid, classid, proparallel}',
+  prosrc => 'pg_get_parallel_safety' },
+
 # Deferrable unique constraint trigger
 { oid => '1250', descr => 'deferred UNIQUE constraint check',
   proname => 'unique_key_recheck', provolatile => 'v', prorettype => 'trigger',
@@ -3772,11 +3782,11 @@
 
 # Generic referential integrity constraint triggers
 { oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger',
-  proargtypes => '', prosrc => 'RI_FKey_check_ins' },
+  proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r',
+  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' },
 { oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger',
-  proargtypes => '', prosrc => 'RI_FKey_check_upd' },
+  proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r',
+  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' },
 { oid => '1646', descr => 'referential integrity ON DELETE CASCADE',
   proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger',
   proargtypes => '', prosrc => 'RI_FKey_cascade_del' },
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 0673887a85..857d89e0d4 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -23,6 +23,13 @@ typedef struct
 	List	  **windowFuncs;	/* lists of WindowFuncs for each winref */
 } WindowFuncLists;
 
+typedef struct safety_object
+{
+	Oid objid;
+	Oid classid;
+	char proparallel;
+} safety_object;
+
 extern bool contain_agg_clause(Node *clause);
 
 extern bool contain_window_function(Node *clause);
@@ -52,5 +59,7 @@ extern void CommuteOpExpr(OpExpr *clause);
 
 extern Query *inline_set_returning_function(PlannerInfo *root,
 											RangeTblEntry *rte);
+extern bool is_parallel_allowed_for_modify(Query *parse);
+extern List *target_rel_max_parallel_hazard(RangeVar *relrv, bool findall, char max_interesting);
 
 #endif							/* CLAUSES_H */
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 1d68a9a4b7..28ca7d8a6e 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod);
 
 extern int	compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
 
+extern List *GetDomainConstraints(Oid type_id);
+
 extern size_t SharedRecordTypmodRegistryEstimate(void);
 
 extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
-- 
2.18.4

