From 5009cfb4b36a4d856ea944143ca39983bbd013b5 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Wed, 21 Apr 2021 14:31:46 +0800
Subject: [PATCH 2/3] ALTER-TABLE-PARALLEL

Enabling users to declare that the table allows parallel data modification,
Add a table property that represents parallel safety of the table for DML statement execution. 
Users specify it as follows:

ALTER TABLE table_name PARALLEL { UNSAFE | RESTRICTED | SAFE };

This property is recorded in pg_class's relparallel column as 'u', 'r', or 's', just like pg_proc's proparallel.  The default is UNSAFE.

The planner assumes that all of the table, its descendant partitions, and their ancillary objects
have the specified parallel safety or safer one.  The user is responsible for its correctness.
If the parallel processes find an object that is less safer than the assumed parallel safety during
statement execution, it throws an ERROR and abort the statement execution.

---
 src/backend/catalog/heap.c                    |  1 +
 src/backend/commands/tablecmds.c              | 50 +++++++++++++++++++
 src/backend/optimizer/util/clauses.c          | 25 +++++++++-
 src/backend/parser/gram.y                     |  8 +++
 src/backend/utils/cache/relcache.c            |  3 ++
 src/bin/pg_dump/pg_dump.c                     | 47 +++++++++++++----
 src/bin/pg_dump/pg_dump.h                     |  1 +
 src/include/catalog/pg_class.h                |  3 ++
 src/include/nodes/parsenodes.h                |  3 +-
 .../test_ddl_deparse/test_ddl_deparse.c       |  3 ++
 10 files changed, 132 insertions(+), 12 deletions(-)

diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index ba03e8aa8f..55ac4719aa 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -960,6 +960,7 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
 	values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
 	values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
+	values[Anum_pg_class_relproparallel - 1] = CharGetDatum(rd_rel->relproparallel);
 	values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
 	values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 096a6f2891..66e7852ba7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,6 +40,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_trigger.h"
@@ -602,6 +603,7 @@ static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
 static void ATExecAlterCollationRefreshVersion(Relation rel, List *coll);
 static char GetAttributeCompression(Form_pg_attribute att, char *compression);
+static void ATExecParallelSafety(Relation rel, Node *def);
 
 
 /* ----------------------------------------------------------------
@@ -4206,6 +4208,7 @@ AlterTableGetLockLevel(List *cmds)
 			case AT_SetIdentity:
 			case AT_DropExpression:
 			case AT_SetCompression:
+			case AT_ParallelSafety:
 				cmd_lockmode = AccessExclusiveLock;
 				break;
 
@@ -4748,6 +4751,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			/* No command-specific prep needed */
 			pass = AT_PASS_MISC;
 			break;
+		case AT_ParallelSafety:
+			ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
+			/* No command-specific prep needed */
+			pass = AT_PASS_MISC;
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -5155,6 +5163,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 			Assert(rel->rd_rel->relkind == RELKIND_INDEX);
 			ATExecAlterCollationRefreshVersion(rel, cmd->object);
 			break;
+		case AT_ParallelSafety:
+			ATExecParallelSafety(rel, cmd->def);
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -18504,3 +18515,42 @@ GetAttributeCompression(Form_pg_attribute att, char *compression)
 
 	return cmethod;
 }
+
+static void
+ATExecParallelSafety(Relation rel, Node *def)
+{
+	Relation	pg_class;
+	Oid			relid;
+	HeapTuple	tuple;
+	char		proparallel = PROPARALLEL_SAFE;
+	char	   *parallel = strVal((Value *) def);
+
+	if (parallel)
+	{
+		if (strcmp(parallel, "safe") == 0)
+			proparallel = PROPARALLEL_SAFE;
+		else if (strcmp(parallel, "restricted") == 0)
+			proparallel = PROPARALLEL_RESTRICTED;
+		else if (strcmp(parallel, "unsafe") == 0)
+			proparallel = PROPARALLEL_UNSAFE;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE")));
+	}
+
+	relid = RelationGetRelid(rel);
+
+	pg_class = table_open(RelationRelationId, RowExclusiveLock);
+
+	tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for relation %u", relid);
+
+	((Form_pg_class) GETSTRUCT(tuple))->relproparallel = proparallel;
+	CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+
+	table_close(pg_class, RowExclusiveLock);
+	heap_freetuple(tuple);
+}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 0acc0e55f4..3f47bbb664 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -177,7 +177,7 @@ static Query *substitute_actual_srf_parameters(Query *expr,
 											   int nargs, List *args);
 static Node *substitute_actual_srf_parameters_mutator(Node *node,
 													  substitute_actual_srf_parameters_context *context);
-
+static bool max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context);
 static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel);
 
 /*****************************************************************************
@@ -645,6 +645,7 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
 char
 max_parallel_hazard(Query *parse)
 {
+	bool max_hazard_found;
 	max_parallel_hazard_context context;
 
 	context.max_hazard = PROPARALLEL_SAFE;
@@ -654,7 +655,27 @@ max_parallel_hazard(Query *parse)
 	context.func_oids = NIL;
 	context.partition_directory = NULL;
 
-	(void) max_parallel_hazard_walker((Node *) parse, &context);
+	max_hazard_found = max_parallel_hazard_walker((Node *) parse, &context);
+
+	if (!max_hazard_found &&
+		IsModifySupportedInParallelMode(parse->commandType))
+	{
+		RangeTblEntry *rte;
+		Relation target_rel;
+
+		rte = rt_fetch(parse->resultRelation, parse->rtable);
+
+		/*
+		 * The target table is already locked by the caller (this is done in the
+		 * parse/analyze phase), and remains locked until end-of-transaction.
+		 */
+		target_rel = table_open(rte->relid, NoLock);
+
+		(void) max_parallel_hazard_test(target_rel->rd_rel->relproparallel,
+										&context);
+		table_close(target_rel, NoLock);
+	}
+
 	return context.max_hazard;
 }
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 73494002ad..27a76d68c5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -2690,6 +2690,14 @@ alter_table_cmd:
 					n->object = $3;
 					$$ = (Node *)n;
 				}
+			/* ALTER TABLE <name> PARALLEL SAFE */
+			| PARALLEL ColId
+				{
+					AlterTableCmd *n = makeNode(AlterTableCmd);
+					n->subtype = AT_ParallelSafety;
+					n->def = (Node *)makeString($2);
+					$$ = (Node *)n;
+				}
 			| alter_generic_options
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 29702d6eab..45faf38b1d 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1868,6 +1868,7 @@ formrdesc(const char *relationName, Oid relationReltype,
 	relation->rd_rel->relkind = RELKIND_RELATION;
 	relation->rd_rel->relnatts = (int16) natts;
 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
+	relation->rd_rel->relproparallel = PROPARALLEL_UNSAFE;
 
 	/*
 	 * initialize attribute tuple form
@@ -3487,6 +3488,8 @@ RelationBuildLocalRelation(const char *relname,
 	else
 		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
 
+	rel->rd_rel->relproparallel = PROPARALLEL_UNSAFE;
+
 	/*
 	 * Insert relation physical and logical identifiers (OIDs) into the right
 	 * places.  For a mapped relation, we set relfilenode to zero and rely on
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e397b76356..ea743261c4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -6291,6 +6291,7 @@ getTables(Archive *fout, int *numTables)
 	int			i_relpersistence;
 	int			i_relispopulated;
 	int			i_relreplident;
+	int			i_relproparallel;
 	int			i_owning_tab;
 	int			i_owning_col;
 	int			i_reltablespace;
@@ -6395,7 +6396,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, am.amname, "
+						  "c.relreplident, c.relproparallel, c.relpages, am.amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
 						  "ELSE 0 END AS foreignserver, "
@@ -6487,7 +6488,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, "
+						  "c.relreplident, c.relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6540,7 +6541,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, "
+						  "c.relreplident, c.relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6593,7 +6594,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6646,7 +6647,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "c.relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6697,7 +6698,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
@@ -6745,7 +6746,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6793,7 +6794,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6840,7 +6841,7 @@ getTables(Archive *fout, int *numTables)
 						  "0 AS toid, "
 						  "0 AS tfrozenxid, 0 AS tminmxid,"
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, relpages, "
+						  "'d' AS relreplident, 'u' AS relproparallel, relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6909,6 +6910,7 @@ getTables(Archive *fout, int *numTables)
 	i_relpersistence = PQfnumber(res, "relpersistence");
 	i_relispopulated = PQfnumber(res, "relispopulated");
 	i_relreplident = PQfnumber(res, "relreplident");
+	i_relproparallel = PQfnumber(res, "relproparallel");
 	i_relpages = PQfnumber(res, "relpages");
 	i_foreignserver = PQfnumber(res, "foreignserver");
 	i_owning_tab = PQfnumber(res, "owning_tab");
@@ -6964,6 +6966,7 @@ getTables(Archive *fout, int *numTables)
 		tblinfo[i].hasoids = (strcmp(PQgetvalue(res, i, i_relhasoids), "t") == 0);
 		tblinfo[i].relispopulated = (strcmp(PQgetvalue(res, i, i_relispopulated), "t") == 0);
 		tblinfo[i].relreplident = *(PQgetvalue(res, i, i_relreplident));
+		tblinfo[i].relproparallel = *(PQgetvalue(res, i, i_relproparallel));
 		tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages));
 		tblinfo[i].frozenxid = atooid(PQgetvalue(res, i, i_relfrozenxid));
 		tblinfo[i].minmxid = atooid(PQgetvalue(res, i, i_relminmxid));
@@ -16542,6 +16545,32 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 		}
 	}
 
+	if (tbinfo->relkind == RELKIND_RELATION ||
+		tbinfo->relkind == RELKIND_PARTITIONED_TABLE ||
+		tbinfo->relkind == RELKIND_FOREIGN_TABLE)
+	{
+		appendPQExpBuffer(q, "\nALTER %sTABLE %s PARALLEL ",
+						tbinfo->relkind == RELKIND_FOREIGN_TABLE ? "FOREIGN " : "",
+						qualrelname);
+
+		switch (tbinfo->relproparallel)
+		{
+			case 's':
+				appendPQExpBuffer(q, "SAFE;\n");
+				break;
+			case 'r':
+				appendPQExpBuffer(q, "RESTRICTED;\n");
+				break;
+			case 'u':
+				appendPQExpBuffer(q, "UNSAFE;\n");
+				break;
+			default:
+				/* should not reach here */
+				appendPQExpBuffer(q, "UNSAFE;\n");
+				break;
+		}
+	}
+
 	if (tbinfo->forcerowsec)
 		appendPQExpBuffer(q, "\nALTER TABLE ONLY %s FORCE ROW LEVEL SECURITY;\n",
 						  qualrelname);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 5340843081..517795ad14 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -268,6 +268,7 @@ typedef struct _tableInfo
 	char		relpersistence; /* relation persistence */
 	bool		relispopulated; /* relation is populated */
 	char		relreplident;	/* replica identifier */
+	char		relproparallel; /* parallel safety of dml on the relation */
 	char	   *reltablespace;	/* relation tablespace */
 	char	   *reloptions;		/* options specified by WITH (...) */
 	char	   *checkoption;	/* WITH CHECK OPTION, if any */
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index 3e37729436..1ad816f4ae 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -116,6 +116,9 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83,Relat
 	/* see REPLICA_IDENTITY_xxx constants */
 	char		relreplident BKI_DEFAULT(n);
 
+	/* parallel safety of the dml on the relation */
+	char		relproparallel BKI_DEFAULT(u);
+
 	/* is relation a partition? */
 	bool		relispartition BKI_DEFAULT(f);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 7a44bccdd3..e5652911d2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1934,7 +1934,8 @@ typedef enum AlterTableType
 	AT_SetIdentity,				/* SET identity column options */
 	AT_DropIdentity,			/* DROP IDENTITY */
 	AT_AlterCollationRefreshVersion, /* ALTER COLLATION ... REFRESH VERSION */
-	AT_ReAddStatistics			/* internal to commands/tablecmds.c */
+	AT_ReAddStatistics,			/* internal to commands/tablecmds.c */
+	AT_ParallelSafety			/* PARALLEL SAFE/RESTRICTED/UNSAFE */
 } AlterTableType;
 
 typedef struct ReplicaIdentityStmt
diff --git a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
index 1bae1e5438..25ba3afc6f 100644
--- a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
+++ b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
@@ -276,6 +276,9 @@ get_altertable_subcmdtypes(PG_FUNCTION_ARGS)
 			case AT_NoForceRowSecurity:
 				strtype = "NO FORCE ROW SECURITY";
 				break;
+			case AT_ParallelSafety:
+				strtype = "PARALLEL SAFETY";
+				break;
 			case AT_GenericOptions:
 				strtype = "SET OPTIONS";
 				break;
-- 
2.18.4

