From 7c90417a130c92f8930853a551046c2affc96c6b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 7 Jun 2019 15:25:45 +0900
Subject: [PATCH v25 1/2] Add parallel option to VACUUM command

This change adds PARALLEL option to VACUUM command that enable us to
perform index vacuuming and index cleanup with background
workers. Indivisual indexes is processed by one vacuum
process. Therefore parallel vacuum can be used when the table has at
least two indexes and it cannot specify larger parallel degree than
the number of indexes that the table has.

The parallel degree is either specified by user or determined based on
the number of indexes that the table has, and further limited by
max_parallel_maintenance_workers. The table size and index size don't
affect it.
---
 doc/src/sgml/config.sgml              |  14 +-
 doc/src/sgml/ref/vacuum.sgml          |  33 ++
 src/backend/access/heap/vacuumlazy.c  | 888 ++++++++++++++++++++++++++++++----
 src/backend/access/transam/parallel.c |   4 +
 src/backend/commands/vacuum.c         |  27 ++
 src/backend/postmaster/autovacuum.c   |   2 +
 src/include/access/heapam.h           |   3 +
 src/include/commands/vacuum.h         |   5 +
 src/test/regress/expected/vacuum.out  |  10 +
 src/test/regress/sql/vacuum.sql       |   7 +
 10 files changed, 887 insertions(+), 106 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 84341a3..0cb32a1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2251,13 +2251,13 @@ include_dir 'conf.d'
        <listitem>
         <para>
          Sets the maximum number of parallel workers that can be
-         started by a single utility command.  Currently, the only
-         parallel utility command that supports the use of parallel
-         workers is <command>CREATE INDEX</command>, and only when
-         building a B-tree index.  Parallel workers are taken from the
-         pool of processes established by <xref
-         linkend="guc-max-worker-processes"/>, limited by <xref
-         linkend="guc-max-parallel-workers"/>.  Note that the requested
+         started by a single utility command.  Currently, the parallel
+         utility commands that support the use of parallel workers are
+         <command>CREATE INDEX</command> only when building a B-tree index,
+         and <command>VACUUM</command> without <literal>FULL</literal>
+         option. Parallel workers are taken from the pool of processes
+         established by <xref linkend="guc-max-worker-processes"/>, limited
+         by <xref linkend="guc-max-parallel-workers"/>.  Note that the requested
          number of workers may not actually be available at run time.
          If this occurs, the utility operation will run with fewer
          workers than expected.  The default value is 2.  Setting this
diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index f9b0fb8..c3347f2 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -224,6 +224,27 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
    </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL</literal></term>
+    <listitem>
+     <para>
+      Perform vacuum index and cleanup index phases of <command>VACUUM</command>
+      in parallel using <replaceable class="parameter">integer</replaceable>
+      background workers (for the detail of each vacuum phases, please refer
+      to <xref linkend="vacuum-phases"/>). If the parallel degree
+      <replaceable class="parameter">integer</replaceable> is omitted, then
+      <command>VACUUM</command> decides the number of workers based on number of
+      indexes on the relation which further limited by
+      <xref linkend="guc-max-parallel-workers-maintenance"/>. Only one worker
+      can be used per index. So parallel workers are launched only when
+      there are at least <literal>2</literal> indexes in the table. Workers
+      for vacuum launches before starting each phases and exit at the end of
+      the phase. These behaviors might change in a future release. This
+      option can not use with <literal>FULL</literal> option.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><replaceable class="parameter">boolean</replaceable></term>
     <listitem>
      <para>
@@ -238,6 +259,18 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
    </varlistentry>
 
    <varlistentry>
+    <term><replaceable class="parameter">integer</replaceable></term>
+    <listitem>
+     <para>
+      Specifies a positive integer value passed to the selected option.
+      The <replaceable class="parameter">integer</replaceable> value can
+      also be omitted, in which case the default value of the selected
+      option is used.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><replaceable class="parameter">table_name</replaceable></term>
     <listitem>
      <para>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index a3c4a1d..e8f4199 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -22,6 +22,20 @@
  * of index scans performed.  So we don't use maintenance_work_mem memory for
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
+ * Lazy vacuum supports parallel execution with parallel worker processes. In
+ * parallel lazy vacuum, we perform both index vacuuming and index cleanup with
+ * parallel worker processes. Individual indexes is processed by one vacuum
+ * process. At beginning of lazy vacuum (at lazy_scan_heap) we prepare the
+ * parallel context and initialize the DSM segment that contains shared information
+ * as well as the memory space for storing dead tuples. When starting either
+ * index vacuuming or index cleanup, we launch parallel worker processes. Once
+ * all indexes are processed the parallel worker processes exit and the leader
+ * process re-initializes the DSM segment while keeping recorded dead tuples.
+ * Note that all parallel workers live during one either index vacuuming or
+ * index cleanup but the leader process neither exits from the parallel mode
+ * nor destroys the parallel context. For updating the index statistics, since
+ * any updates are not allowed during parallel mode we update the index
+ * statistics after exited from parallel mode.
  *
  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -41,8 +55,10 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/storage.h"
 #include "commands/dbcommands.h"
@@ -55,6 +71,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -110,6 +127,94 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
+/*
+ * DSM keys for parallel lazy vacuum. Unlike other parallel execution code,
+ * since we don't need to worry about DSM keys conflicting with plan_node_id
+ * we can use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED			1
+#define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+
+/*
+ * Macro to check if we in a parallel lazy vacuum. If true, we are in parallel
+ * mode and prepared the DSM segments.
+ */
+#define ParallelVacuumIsActive(lps) (((LVParallelState *) (lps)) != NULL)
+
+/*
+ * LVDeadTuples stores the dead tuple TIDs collected during heap scan.
+ * This is allocated in the DSM segment when parallel lazy vacuum
+ * mode, otherwise allocated in a local memory.
+ */
+typedef struct LVDeadTuples
+{
+	int			max_tuples;	/* # slots allocated in array */
+	int			num_tuples;	/* current # of entries */
+	/* List of TIDs of tuples we intend to delete */
+	/* NB: this list is ordered by TID address */
+	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of ItemPointerData */
+} LVDeadTuples;
+#define SizeOfLVDeadTuples offsetof(LVDeadTuples, itemptrs) + sizeof(ItemPointerData)
+
+/*
+ * Struct for an index bulk-deletion statistic used for parallel lazy vacuum.
+ * This is allocated in the DSM segment.
+ */
+typedef struct LVSharedIndStats
+{
+	IndexBulkDeleteResult	stats;
+	bool					updated;	/* are the stats updated? */
+} LVSharedIndStats;
+
+/*
+ * Shared information among parallel workers. So this is allocated in
+ * the DSM segment.
+ */
+typedef struct LVShared
+{
+	/*
+	 * Target table relid and log level. These fields are not modified during
+	 * the lazy vacuum.
+	 */
+	Oid		relid;
+	int		elevel;
+
+	/*
+	 * An indication for vacuum workers of doing either index vacuuming or
+	 * index cleanup.
+	 */
+	bool	for_cleanup;
+
+	/*
+	 * Fields for both index vacuuming and index cleanup.
+	 *
+	 * reltuples is the total number of input heap tuples. We set either an
+	 * old live tuples in index vacuuming case or the new live tuples in index
+	 * cleanup case.
+	 *
+	 * estimated_count is true if the reltuples is estimated value.
+	 */
+	double	reltuples;
+	bool	estimated_count;
+
+	/*
+	 * Variables to control parallel index vacuuming. An variable-sized field
+	 * 'indstats' must come last.
+	 */
+	pg_atomic_uint32	nprocessed;
+	LVSharedIndStats	indstats[FLEXIBLE_ARRAY_MEMBER];
+} LVShared;
+#define SizeOfLVShared offsetof(LVShared, indstats) + sizeof(LVSharedIndStats)
+
+/* Struct for parallel lazy vacuum */
+typedef struct LVParallelState
+{
+	ParallelContext	*pcxt;
+	LVShared		*lvshared;
+	int				nworkers_requested;	/* user-requested parallel degree */
+} LVParallelState;
+
 typedef struct LVRelStats
 {
 	/* useindex = true means two-pass strategy; false means one-pass */
@@ -128,17 +233,12 @@ typedef struct LVRelStats
 	BlockNumber pages_removed;
 	double		tuples_deleted;
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
-	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	int			num_dead_tuples;	/* current # of entries */
-	int			max_dead_tuples;	/* # slots allocated in array */
-	ItemPointer dead_tuples;	/* array of ItemPointerData */
+	LVDeadTuples *dead_tuples;
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
 } LVRelStats;
 
-
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
 
@@ -155,12 +255,11 @@ static void lazy_scan_heap(Relation onerel, VacuumParams *params,
 						   bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
-static void lazy_vacuum_index(Relation indrel,
-							  IndexBulkDeleteResult **stats,
-							  LVRelStats *vacrelstats);
+static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+							  LVDeadTuples *dead_tuples, double reltuples);
 static void lazy_cleanup_index(Relation indrel,
-							   IndexBulkDeleteResult *stats,
-							   LVRelStats *vacrelstats);
+							   IndexBulkDeleteResult **stats,
+							   double reltuples, bool estimated_count);
 static int	lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 							 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
 static bool should_attempt_truncation(VacuumParams *params,
@@ -169,12 +268,33 @@ static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 											LVRelStats *vacrelstats);
 static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
+static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples,
 								   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
+static void lazy_update_index_statistics(Relation indrel, IndexBulkDeleteResult *stats);
+static LVParallelState *begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid,
+											  BlockNumber nblocks, int nindexes,
+											  int nrequested);
+static void end_parallel_vacuum(LVParallelState *lps, Relation *Irel, int nindexes);
+static void lazy_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+										   int nindexes,
+										   IndexBulkDeleteResult **stats,
+										   LVParallelState *lps, bool for_cleanup);
+static void lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats,
+													Relation *Irel,
+													int nindexes,
+													IndexBulkDeleteResult **stats,
+													LVParallelState *lps,
+													bool for_cleanup);
+static void do_parallel_vacuum_or_cleanup_indexes(Relation *Irel, int nindexes,
+												  IndexBulkDeleteResult **stats,
+												  LVShared *lvshared,
+												  LVDeadTuples *dead_tuples);
+static int compute_parallel_workers(Relation onerel, int nrequested, int nindexes);
+static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
 
 
 /*
@@ -488,6 +608,17 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
  *		to reclaim dead line pointers.
  *
+ *		If the table has at least two indexes and parallel lazy vacuum is
+ *		requested, we execute both index vacuuming and index cleanup with
+ *		parallel workers. In parallel lazy vacuum, we enter parallel mode and
+ *		create the parallel context and the DSM segment before starting heap
+ *		scan. All parallel workers are launched at beginning of index vacuuming
+ *		and index cleanup and they exit once done with all indexes. At end of
+ *		this function we exit from parallel mode. Index bulk-deletion results
+ *		are stored in the DSM segment and update index statistics as a whole
+ *		after exited from parallel mode since all writes are not allowed during
+ *		parallel mode.
+ *
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
@@ -496,6 +627,8 @@ static void
 lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			   Relation *Irel, int nindexes, bool aggressive)
 {
+	LVParallelState *lps = NULL;
+	LVDeadTuples *dead_tuples;
 	BlockNumber nblocks,
 				blkno;
 	HeapTupleData tuple;
@@ -518,6 +651,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	int			parallel_workers = 0;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -553,13 +687,45 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	/*
+	 * If parallel lazy vacuum is requested and we vacuum indexes, compute
+	 * the number of parallel vacuum worker to launch.
+	 */
+	if (params->nworkers >= 0 && vacrelstats->useindex)
+		parallel_workers = compute_parallel_workers(onerel,
+													params->nworkers,
+													nindexes);
+
+	if (parallel_workers > 0)
+	{
+		/*
+		 * Enter parallel mode, create the parallel context and allocate the
+		 * DSM segment.
+		 */
+		lps = begin_parallel_vacuum(vacrelstats,
+									RelationGetRelid(onerel),
+									nblocks, nindexes,
+									parallel_workers);
+
+		/* Remember the user-requested parallel degree to reporting */
+		lps->nworkers_requested = params->nworkers;
+	}
+	else
+	{
+		/*
+		 * Use single process vacuum, allocate the memory space for dead
+		 * tuples locally.
+		 */
+		lazy_space_alloc(vacrelstats, nblocks);
+	}
+
+	dead_tuples = vacrelstats->dead_tuples;
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
-	initprog_val[2] = vacrelstats->max_dead_tuples;
+	initprog_val[2] = dead_tuples->max_tuples;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
 	/*
@@ -737,8 +903,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * If we are close to overrunning the available space for dead-tuple
 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
 		 */
-		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
-			vacrelstats->num_dead_tuples > 0)
+		if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
+			dead_tuples->num_tuples > 0)
 		{
 			const int	hvp_index[] = {
 				PROGRESS_VACUUM_PHASE,
@@ -766,10 +932,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 										 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 			/* Remove index entries */
-			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
-								  &indstats[i],
-								  vacrelstats);
+			lazy_vacuum_or_cleanup_indexes(vacrelstats, Irel, nindexes,
+										   indstats, lps, false);
 
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
@@ -789,7 +953,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 			vacrelstats->num_index_scans++;
 
 			/*
@@ -985,7 +1149,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
+		prev_dead_count = dead_tuples->num_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -1024,7 +1188,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 */
 			if (ItemIdIsDead(itemid))
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				all_visible = false;
 				continue;
 			}
@@ -1170,7 +1334,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 
 			if (tupgone)
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
 													   &vacrelstats->latestRemovedXid);
 				tups_vacuumed += 1;
@@ -1240,7 +1404,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * doing a second scan. Also we don't do that but forget dead tuples
 		 * when index cleanup is disabled.
 		 */
-		if (!vacrelstats->useindex && vacrelstats->num_dead_tuples > 0)
+		if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
 		{
 			if (nindexes == 0)
 			{
@@ -1269,7 +1433,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 
 			/*
 			 * Periodically do incremental FSM vacuuming to make newly-freed
@@ -1384,7 +1548,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 		 * page, so remember its free space as-is.  (This path will always be
 		 * taken if there are no indexes.)
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
+		if (dead_tuples->num_tuples == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
@@ -1418,7 +1582,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
-	if (vacrelstats->num_dead_tuples > 0)
+	if (dead_tuples->num_tuples > 0)
 	{
 		const int	hvp_index[] = {
 			PROGRESS_VACUUM_PHASE,
@@ -1434,10 +1598,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 									 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 		/* Remove index entries */
-		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
-							  &indstats[i],
-							  vacrelstats);
+		lazy_vacuum_or_cleanup_indexes(vacrelstats, Irel, nindexes,
+										   indstats, lps, false);
 
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1463,11 +1625,20 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
-	/* Do post-vacuum cleanup and statistics update for each index */
+	/*
+	 * Do post-vacuum cleanup, and statistics update for each index if
+	 * we're not in parallel lazy vacuum. If in parallel lazy vacuum, do
+	 * only post-vacum cleanup and update statistics at the end of parallel
+	 * lazy vacuum.
+	 */
 	if (vacrelstats->useindex)
+		lazy_vacuum_or_cleanup_indexes(vacrelstats, Irel, nindexes,
+									   indstats, lps, true);
+
+	if (ParallelVacuumIsActive(lps))
 	{
-		for (i = 0; i < nindexes; i++)
-			lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+		/* End parallel mode and update index statistics */
+		end_parallel_vacuum(lps, Irel, nindexes);
 	}
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
@@ -1534,7 +1705,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 	npages = 0;
 
 	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
+	while (tupindex < vacrelstats->dead_tuples->num_tuples)
 	{
 		BlockNumber tblk;
 		Buffer		buf;
@@ -1543,7 +1714,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 
 		vacuum_delay_point();
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
 								 vac_strategy);
 		if (!ConditionalLockBufferForCleanup(buf))
@@ -1591,6 +1762,7 @@ static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
 {
+	LVDeadTuples	*dead_tuples = vacrelstats->dead_tuples;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
@@ -1601,16 +1773,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+	for (; tupindex < dead_tuples->num_tuples; tupindex++)
 	{
 		BlockNumber tblk;
 		OffsetNumber toff;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
 		if (tblk != blkno)
 			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+		toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
 		itemid = PageGetItemId(page, toff);
 		ItemIdSetUnused(itemid);
 		unused[uncnt++] = toff;
@@ -1731,19 +1903,284 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
 	return false;
 }
 
+/*
+ * Vacuum or cleanup indexes with parallel workers. This function must be used
+ * by the parallel vacuum leader process.
+ */
+static void
+lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+										int nindexes, IndexBulkDeleteResult **stats,
+										LVParallelState *lps, bool for_cleanup)
+{
+	StringInfoData buf;
+
+	Assert(!IsParallelWorker());
+	Assert(ParallelVacuumIsActive(lps));
+	Assert(nindexes > 0);
+
+	/* Set shared information to tell parallel workers */
+	lps->lvshared->for_cleanup = for_cleanup;
+	if (!for_cleanup)
+	{
+		/*
+		 * We can only provide an approximate value of num_heap_tuples in
+		 * vacuum cases.
+		 */
+		lps->lvshared->reltuples = vacrelstats->old_live_tuples;
+		lps->lvshared->estimated_count = true;
+	}
+	else
+	{
+		/*
+		 * Now we can provide a better estimate of total number of surviving
+		 * tuples (we assume indexes are more interested in that than in the
+		 * number of nominally live tuples).
+		 */
+		lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
+		lps->lvshared->estimated_count =
+			(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+
+	}
+
+	LaunchParallelWorkers(lps->pcxt);
+
+	/* Create the log message to report */
+	initStringInfo(&buf);
+	if (lps->pcxt->nworkers_launched == 0)
+	{
+		/*
+		 * If no workers launched, the leader process vacuums all indexes alone.
+		 * Since there is hope that we can launch parallel workers in the next
+		 * index vacuuming time we don't end parallel mode yet.
+		 */
+		if (for_cleanup)
+		{
+			if (lps->nworkers_requested > 0)
+				appendStringInfo(&buf,
+								 gettext_noop("could not launch parallel vacuum worker for index cleanup (planned: %d, requested: %d)"),
+								 lps->pcxt->nworkers, lps->nworkers_requested);
+			else
+				appendStringInfo(&buf,
+								 gettext_noop("could not launch parallel vacuum worker for index cleanup (planned: %d)"),
+								 lps->pcxt->nworkers);
+		}
+		else
+		{
+			if (lps->nworkers_requested > 0)
+				appendStringInfo(&buf,
+								 gettext_noop("could not launch parallel vacuum worker for index vacuuming (planned: %d, requested: %d)"),
+								 lps->pcxt->nworkers, lps->nworkers_requested);
+			else
+				appendStringInfo(&buf,
+								 gettext_noop("could not launch parallel vacuum worker for index vacuuming (planned: %d)"),
+								 lps->pcxt->nworkers);
+		}
+	}
+	else
+	{
+		if (for_cleanup)
+		{
+			if (lps->nworkers_requested > 0)
+				appendStringInfo(&buf,
+								 ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d, requested %d)",
+										  "launched %d parallel vacuum workers for index cleanup (planned: %d, requsted %d)",
+										  lps->pcxt->nworkers_launched),
+								 lps->pcxt->nworkers_launched,
+								 lps->pcxt->nworkers,
+								 lps->nworkers_requested);
+			else
+				appendStringInfo(&buf,
+								 ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+										  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+										  lps->pcxt->nworkers_launched),
+								 lps->pcxt->nworkers_launched,
+								 lps->pcxt->nworkers);
+		}
+		else
+		{
+			if (lps->nworkers_requested > 0)
+				appendStringInfo(&buf,
+								 ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d, requested %d)",
+										  "launched %d parallel vacuum workers for index vacuuming (planned: %d, requested %d)",
+										  lps->pcxt->nworkers_launched),
+								 lps->pcxt->nworkers_launched,
+								 lps->pcxt->nworkers,
+								 lps->nworkers_requested);
+			else
+				appendStringInfo(&buf,
+								 ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+										  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+										  lps->pcxt->nworkers_launched),
+								 lps->pcxt->nworkers_launched,
+								 lps->pcxt->nworkers);
+		}
+	}
+
+	ereport(elevel, (errmsg("%s", buf.data)));
+
+	/*
+	 * Do index vacuuming or index cleanup with parallel workers or by
+	 * the leader process alone in case where no workers could launch.
+	 */
+	do_parallel_vacuum_or_cleanup_indexes(Irel, nindexes, stats,
+										  lps->lvshared,
+										  vacrelstats->dead_tuples);
+
+	/* Wait for all vacuum workers to finish */
+	WaitForParallelWorkersToFinish(lps->pcxt);
+
+	/*
+	 * If we are doing index cleanup, we don't need to reinitialize the
+	 * parallel context as no more index vacuuming and index cleanup will
+	 * be performed after that.
+	 */
+	if (!for_cleanup)
+	{
+		/* Reset the processing count */
+		pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0);
+
+		/*
+		 * Reinitialize the parallel context to relaunch parallel workers
+		 * for the next execution.
+		 */
+		ReinitializeParallelDSM(lps->pcxt);
+	}
+}
+
+/*
+ * Parallel Index vacuuming and index cleanup routine used by both the leader
+ * process and worker processes. Unlike single process vacuum, we don't update
+ * index statistics after cleanup index since it is not allowed during
+ * parallel mode, instead copy index bulk-deletion results from the local
+ * memory to the DSM segment and update them at the end of parallel lazy
+ * vacuum.
+ */
+static void
+do_parallel_vacuum_or_cleanup_indexes(Relation *Irel, int nindexes,
+									  IndexBulkDeleteResult **stats,
+									  LVShared *lvshared,
+									  LVDeadTuples *dead_tuples)
+{
+	/* Loop until all indexes are vacuumed */
+	for (;;)
+	{
+		int idx;
+
+		/* Get an index number to process */
+		idx = pg_atomic_fetch_add_u32(&(lvshared->nprocessed), 1);
+
+		/* Done for all indexes? */
+		if (idx >= nindexes)
+			break;
+
+		/*
+		 * Update the pointer to the corresponding bulk-deletion result
+		 * if someone has already updated it.
+		 */
+		if (lvshared->indstats[idx].updated &&
+			stats[idx] == NULL)
+			stats[idx] = &(lvshared->indstats[idx].stats);
+
+		/* Do vacuum or cleanup one index */
+		if (!lvshared->for_cleanup)
+			lazy_vacuum_index(Irel[idx], &stats[idx], dead_tuples,
+							  lvshared->reltuples);
+		else
+			lazy_cleanup_index(Irel[idx], &stats[idx], lvshared->reltuples,
+							   lvshared->estimated_count);
+
+		/*
+		 * Copy the index bulk-deletion result returned from ambulkdelete and
+		 * amvacuumcleanup to the DSM segment if it's the first time to get it
+		 * from them, because they allocate it locally and it's possible that an
+		 * index will be vacuumed by the different vacuum process at the next
+		 * time. The copying the result normally happens only after the first
+		 * time of index vacuuming. From the second time, we pass the result on
+		 * the DSM segment so that they then update it directly.
+		 *
+		 * Since all vacuum workers write the bulk-deletion result at different
+		 * slots we can write them without locking.
+		 */
+		if (!lvshared->indstats[idx].updated &&
+			stats[idx] != NULL)
+		{
+			memcpy(&(lvshared->indstats[idx].stats),
+				   stats[idx], sizeof(IndexBulkDeleteResult));
+			lvshared->indstats[idx].updated = true;
+
+			/*
+			 * no longer need the locally allocated result and now stats[idx]
+			 * points to the DSM segment.
+			 */
+			pfree(stats[idx]);
+			stats[idx] = &(lvshared->indstats[idx].stats);
+		}
+	}
+}
+
+/*
+ * Vacuum or cleanup indexes. If we're ready for parallel lazy vacuum it's
+ * performed with parallel workers, otherwise do vacuum or cleanup indexes.
+ * In parallel vacuum case, this function must be used by the parallel
+ * vacuum leader process. for_cleanup is true if the caller requests index
+ * cleanup, otherwise index vacuuming.
+ */
+static void
+lazy_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
+							   int nindexes, IndexBulkDeleteResult **stats,
+							   LVParallelState *lps, bool for_cleanup)
+{
+	int		idx;
+
+	Assert(!IsParallelWorker());
+
+	/* no job if the table has no index */
+	if (nindexes <= 0)
+		return;
+
+	if (ParallelVacuumIsActive(lps))
+	{
+		/* Do parallel index vacuuming or index cleanup */
+		lazy_parallel_vacuum_or_cleanup_indexes(vacrelstats, Irel,
+												nindexes, stats,
+												lps, for_cleanup);
+		return;
+	}
+
+	/* We are in single process vacuum, do index vacuuming or index cleanup */
+	for (idx = 0; idx < nindexes; idx++)
+	{
+		if (!for_cleanup)
+			lazy_vacuum_index(Irel[idx], &stats[idx], vacrelstats->dead_tuples,
+							  vacrelstats->old_live_tuples);
+		else
+		{
+			/* Cleanup one index and update index statistics */
+			lazy_cleanup_index(Irel[idx], &stats[idx], vacrelstats->new_rel_tuples,
+							   vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+
+			lazy_update_index_statistics(Irel[idx], stats[idx]);
+
+			if (stats[idx])
+				pfree(stats[idx]);
+		}
+	}
+}
 
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
  *		Delete all the index entries pointing to tuples listed in
  *		vacrelstats->dead_tuples, and update running statistics.
+ *		reltuples is the number of heap tuples to be passed to the
+ *		bulk delete callback.
  */
 static void
-lazy_vacuum_index(Relation indrel,
-				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats)
+lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+				  LVDeadTuples *dead_tuples, double reltuples)
 {
 	IndexVacuumInfo ivinfo;
+	char		*msgfmt;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1753,30 +2190,38 @@ lazy_vacuum_index(Relation indrel,
 	ivinfo.report_progress = false;
 	ivinfo.estimated_count = true;
 	ivinfo.message_level = elevel;
-	/* We can only provide an approximate value of num_heap_tuples here */
-	ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
 	/* Do bulk deletion */
 	*stats = index_bulk_delete(&ivinfo, *stats,
-							   lazy_tid_reaped, (void *) vacrelstats);
+							   lazy_tid_reaped, (void *) dead_tuples);
+
+	if (IsParallelWorker())
+		msgfmt = gettext_noop("scanned index \"%s\" to remove %d row versions by parallel vacuum worker");
+	else
+		msgfmt = gettext_noop("scanned index \"%s\" to remove %d row versions");
 
 	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
+			(errmsg(msgfmt,
 					RelationGetRelationName(indrel),
-					vacrelstats->num_dead_tuples),
+					dead_tuples->num_tuples),
 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
 }
 
 /*
  *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
+ *
+ *		reltuples is the number of heap tuples and estimated_count is true
+ *		if the reltuples is an estimated value.
  */
 static void
 lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats)
+				   IndexBulkDeleteResult **stats,
+				   double reltuples, bool estimated_count)
 {
 	IndexVacuumInfo ivinfo;
+	char		*msgfmt;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1784,49 +2229,55 @@ lazy_cleanup_index(Relation indrel,
 	ivinfo.index = indrel;
 	ivinfo.analyze_only = false;
 	ivinfo.report_progress = false;
-	ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
 
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
-	stats = index_vacuum_cleanup(&ivinfo, stats);
+	*stats = index_vacuum_cleanup(&ivinfo, *stats);
 
-	if (!stats)
+	if (!(*stats))
 		return;
 
-	/*
-	 * Now update statistics in pg_class, but only if the index says the count
-	 * is accurate.
-	 */
-	if (!stats->estimated_count)
-		vac_update_relstats(indrel,
-							stats->num_pages,
-							stats->num_index_tuples,
-							0,
-							false,
-							InvalidTransactionId,
-							InvalidMultiXactId,
-							false);
+	if (IsParallelWorker())
+		msgfmt = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages, reported by parallel vacuum worker");
+	else
+		msgfmt = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages");
 
 	ereport(elevel,
-			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+			(errmsg(msgfmt,
 					RelationGetRelationName(indrel),
-					stats->num_index_tuples,
-					stats->num_pages),
+					(*stats)->num_index_tuples,
+					(*stats)->num_pages),
 			 errdetail("%.0f index row versions were removed.\n"
 					   "%u index pages have been deleted, %u are currently reusable.\n"
 					   "%s.",
-					   stats->tuples_removed,
-					   stats->pages_deleted, stats->pages_free,
+					   (*stats)->tuples_removed,
+					   (*stats)->pages_deleted, (*stats)->pages_free,
 					   pg_rusage_show(&ru0))));
+}
+
+/*
+ * Update index statistics in pg_class if the statistics is accurate.
+ */
+static void
+lazy_update_index_statistics(Relation indrel, IndexBulkDeleteResult *stats)
+{
+	Assert(!IsInParallelMode());
+
+	if (!stats || stats->estimated_count)
+		return;
 
-	pfree(stats);
+	/* Update index statistics */
+	vac_update_relstats(indrel,
+						stats->num_pages,
+						stats->num_index_tuples,
+						0,
+						false,
+						InvalidTransactionId,
+						InvalidMultiXactId,
+						false);
 }
 
 /*
@@ -2134,19 +2585,17 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 }
 
 /*
- * lazy_space_alloc - space allocation decisions for lazy vacuum
- *
- * See the comments at the head of this file for rationale.
+ * Return the maximum number of dead tuples we can record.
  */
-static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+static long
+compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	if (vacrelstats->useindex)
+	if (useindex)
 	{
 		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
 		maxtuples = Min(maxtuples, INT_MAX);
@@ -2160,34 +2609,49 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
 	}
 	else
-	{
 		maxtuples = MaxHeapTuplesPerPage;
-	}
 
-	vacrelstats->num_dead_tuples = 0;
-	vacrelstats->max_dead_tuples = (int) maxtuples;
-	vacrelstats->dead_tuples = (ItemPointer)
-		palloc(maxtuples * sizeof(ItemPointerData));
+	return maxtuples;
+}
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ */
+static void
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+{
+	LVDeadTuples	*dead_tuples = NULL;
+	long		maxtuples;
+
+	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
+
+	dead_tuples = (LVDeadTuples *)
+		palloc(SizeOfLVDeadTuples + maxtuples * sizeof(ItemPointerData));
+	dead_tuples->num_tuples = 0;
+	dead_tuples->max_tuples = (int) maxtuples;
+
+	vacrelstats->dead_tuples = dead_tuples;
 }
 
 /*
  * lazy_record_dead_tuple - remember one deletable tuple
  */
 static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr)
+lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
 {
 	/*
 	 * The array shouldn't overflow under normal behavior, but perhaps it
 	 * could if we are given a really small maintenance_work_mem. In that
 	 * case, just forget the last few tuples (we'll get 'em next time).
 	 */
-	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+	if (dead_tuples->num_tuples < dead_tuples->max_tuples)
 	{
-		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
-		vacrelstats->num_dead_tuples++;
+		dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
+		dead_tuples->num_tuples++;
 		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 vacrelstats->num_dead_tuples);
+									 dead_tuples->num_tuples);
 	}
 }
 
@@ -2201,12 +2665,12 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
 static bool
 lazy_tid_reaped(ItemPointer itemptr, void *state)
 {
-	LVRelStats *vacrelstats = (LVRelStats *) state;
+	LVDeadTuples	*dead_tuples = (LVDeadTuples *) state;
 	ItemPointer res;
 
 	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) vacrelstats->dead_tuples,
-								vacrelstats->num_dead_tuples,
+								(void *) dead_tuples->itemptrs,
+								dead_tuples->num_tuples,
 								sizeof(ItemPointerData),
 								vac_cmp_itemptr);
 
@@ -2354,3 +2818,229 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
 
 	return all_visible;
 }
+
+/*
+ * Compute the number of parallel worker process to request. Both index
+ * vacuuming and index cleanup can be executed together with parallel workers
+ * if the table has more than one index. The relation sizes of table and
+ * indexes don't affect to the parallel degree for now. nrequested is the
+ * number of parallel workers that user requested and nindexes is the number
+ * of indexes that the table has.
+ */
+static int
+compute_parallel_workers(Relation onerel, int nrequested, int nindexes)
+{
+	int parallel_workers = 0;
+
+	Assert(nrequested >= 0);
+
+	if (nindexes <= 1)
+		return 0;
+
+	if (nrequested > 0)
+	{
+		/* At least one index is taken by the leader process */
+		parallel_workers = Min(nrequested, nindexes - 1);
+	}
+	else
+	{
+		/*
+		 * The parallel degree is not requested. Compute it based on the
+		 * number of indexes.
+		 */
+		parallel_workers = nindexes - 1;
+	}
+
+	/* cap by max_parallel_maintenace_workers */
+	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+	return parallel_workers;
+}
+
+/*
+ * Enter parallel mode, allocate and initialize the DSM segment.
+ */
+static LVParallelState *
+begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
+					  int nindexes, int nrequested)
+{
+	LVParallelState *lps = (LVParallelState *) palloc(sizeof(LVParallelState));
+	LVShared	*shared;
+	ParallelContext *pcxt;
+	LVDeadTuples	*tidmap;
+	long	maxtuples;
+	char	*sharedquery;
+	Size	est_shared;
+	Size	est_deadtuples;
+	int		querylen;
+	int		keys = 0;
+
+	Assert(nrequested > 0);
+	Assert(nindexes > 0);
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "heap_parallel_vacuum_main",
+								 nrequested);
+	lps->pcxt = pcxt;
+	Assert(pcxt->nworkers > 0);
+
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared = MAXALIGN(add_size(SizeOfLVShared,
+								   mul_size(sizeof(LVSharedIndStats), nindexes)));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
+	keys++;
+
+	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
+	maxtuples = compute_max_dead_tuples(nblocks, true);
+	est_deadtuples = MAXALIGN(add_size(sizeof(LVDeadTuples),
+									   mul_size(sizeof(ItemPointerData), maxtuples)));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
+	keys++;
+
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+	/* Finally, estimate VACUUM_KEY_QUERY_TEXT space */
+	querylen = strlen(debug_query_string);
+	shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	InitializeParallelDSM(pcxt);
+
+	/* prepare shared information */
+	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
+	shared->relid = relid;
+	shared->elevel = elevel;
+	pg_atomic_init_u32(&(shared->nprocessed), 0);
+	MemSet(shared->indstats, 0, sizeof(LVSharedIndStats) * nindexes);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+	lps->lvshared = shared;
+
+	/* prepare the dead tuple space */
+	tidmap = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
+	tidmap->max_tuples = maxtuples;
+	tidmap->num_tuples = 0;
+	MemSet(tidmap->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, tidmap);
+	vacrelstats->dead_tuples = tidmap;
+
+	/* Store query string for workers */
+	sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+	memcpy(sharedquery, debug_query_string, querylen + 1);
+	sharedquery[querylen] = '\0';
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+
+	lps->nworkers_requested = 0;
+
+	return lps;
+}
+
+/*
+ * Shutdown workers, destroy the parallel context, and end parallel mode.
+ * Update index statistics after exited from parallel mode.
+ */
+static void
+end_parallel_vacuum(LVParallelState *lps, Relation *Irel, int nindexes)
+{
+	IndexBulkDeleteResult *copied_stats = NULL;
+	int *updated_idx = palloc(sizeof(int) * nindexes);
+	int nupdated = 0;
+	int i;
+
+	Assert(!IsParallelWorker());
+	Assert(Irel != NULL && nindexes > 0);
+
+	/*
+	 * All writes are not allowed during parallel mode and it might not be
+	 * safe to exit from parallel mode while keeping the parallel context.
+	 * So we copy the updated index statistics to a temporary space and
+	 * update them after exited from parallel mode.
+	 */
+	for (i = 0; i < nindexes; i++)
+	{
+		if (lps->lvshared->indstats[i].updated)
+			updated_idx[nupdated++] = i;
+	}
+
+	copied_stats = palloc(sizeof(IndexBulkDeleteResult) * nupdated);
+
+	for (i = 0; i < nupdated; i++)
+		memcpy(&(copied_stats[i]),
+			   &(lps->lvshared->indstats[updated_idx[i]].stats),
+			   sizeof(IndexBulkDeleteResult));
+
+	/* Shutdown worker processes and destroy the parallel context */
+	WaitForParallelWorkersToFinish(lps->pcxt);
+	DestroyParallelContext(lps->pcxt);
+	ExitParallelMode();
+
+	/* Update index statistics */
+	for (i = 0; i < nupdated; i++)
+		lazy_update_index_statistics(Irel[updated_idx[i]],
+									 &(copied_stats[i]));
+
+	pfree(copied_stats);
+	pfree(updated_idx);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers work only within index vacuuming and index
+ * cleanup, no need to report the progress information.
+ */
+void
+heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+	Relation	onerel;
+	Relation	*indrels;
+	LVShared	*lvshared;
+	LVDeadTuples	*dead_tuples;
+	int			nindexes;
+	char		*sharedquery;
+	IndexBulkDeleteResult **stats;
+
+	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
+										   false);
+	elevel = lvshared->elevel;
+
+	ereport(DEBUG1,
+			(errmsg("starting parallel lazy vacuum worker for %s",
+					lvshared->for_cleanup ? "cleanup" : "vacuuming")));
+
+	/* Set debug_query_string for individual workers */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Open table */
+	onerel = heap_open(lvshared->relid, ShareUpdateExclusiveLock);
+
+	/*
+	 * Open all indexes. indrels are sorted in order by OID, which should
+	 * be matched to the leader's one.
+	 */
+	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
+	Assert(nindexes > 0);
+
+	/* Set dead tuple space */
+	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
+												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
+												  false);
+
+	/* Set cost-based vacuum delay */
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+
+	stats = (IndexBulkDeleteResult **)
+		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+	/* Do either vacuuming indexes or cleaning indexes */
+	do_parallel_vacuum_or_cleanup_indexes(indrels, nindexes, stats,
+										  lvshared, dead_tuples);
+
+	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+	heap_close(onerel, ShareUpdateExclusiveLock);
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 55d129a..86511b2 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -140,6 +141,9 @@ static const struct
 	},
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
+	},
+	{
+		"heap_parallel_vacuum_main", heap_parallel_vacuum_main
 	}
 };
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index e7b379d..89dfc3b 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -99,6 +99,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 	/* Set default value */
 	params.index_cleanup = VACOPT_TERNARY_DEFAULT;
 	params.truncate = VACOPT_TERNARY_DEFAULT;
+	params.nworkers = -1;
 
 	/* Parse options list */
 	foreach(lc, vacstmt->options)
@@ -129,6 +130,27 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 			params.index_cleanup = get_vacopt_ternary_value(opt);
 		else if (strcmp(opt->defname, "truncate") == 0)
 			params.truncate = get_vacopt_ternary_value(opt);
+		else if (strcmp(opt->defname, "parallel") == 0)
+		{
+			if (opt->arg == NULL)
+			{
+				/*
+				 * Parallel lazy vacuum is requested but user didn't specify
+				 * the parallel degree. The parallel degree will be determined
+				 * at the start of lazy vacuum.
+				 */
+				params.nworkers = 0;
+			}
+			else
+			{
+				params.nworkers = defGetInt32(opt);
+				if (params.nworkers < 1)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("parallel vacuum degree must be at least 1"),
+							 parser_errposition(pstate, opt->location)));
+			}
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -170,6 +192,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 		}
 	}
 
+	if ((params.options & VACOPT_FULL) && params.nworkers >= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot specify FULL option with PARALLEL option")));
+
 	/*
 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
 	 * them as -1 which means to use the default values.
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index fd85b9c..88e2fb0 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -2888,6 +2888,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 			(!wraparound ? VACOPT_SKIP_LOCKED : 0);
 		tab->at_params.index_cleanup = VACOPT_TERNARY_DEFAULT;
 		tab->at_params.truncate = VACOPT_TERNARY_DEFAULT;
+		/* parallel lazy vacuum is not supported for autovacuum */
+		tab->at_params.nworkers = -1;
 		tab->at_params.freeze_min_age = freeze_min_age;
 		tab->at_params.freeze_table_age = freeze_table_age;
 		tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b88bd8a..464d34d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -24,6 +24,8 @@
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
 #include "storage/lockdefs.h"
+#include "storage/shm_toc.h"
+#include "storage/dsm.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -195,6 +197,7 @@ extern Size SyncScanShmemSize(void);
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation onerel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 128f7ae..43702f2 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -184,6 +184,11 @@ typedef struct VacuumParams
 										 * default value depends on reloptions */
 	VacOptTernaryValue truncate;	/* Truncate empty pages at the end,
 									 * default value depends on reloptions */
+	/*
+	 * The number of parallel vacuum workers. -1 by default for no workers
+	 * and 0 for choosing based on the number of indexes.
+	 */
+	int			nworkers;
 } VacuumParams;
 
 /* GUC parameters */
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index 21a167f..cbb0bbb 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,16 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+-- PARALLEL option
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
+VACUUM (PARALLEL 0) vaccluster; -- error
+ERROR:  parallel vacuum degree must be at least 1
+LINE 1: VACUUM (PARALLEL 0) vaccluster;
+                ^
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) vaccluster;
+VACUUM (PARALLEL 2, FULL TRUE) vaccluster; -- error, cannot use both PARALLEL and FULL
+ERROR:  cannot specify FULL option with PARALLEL option
 -- INDEX_CLEANUP option
 CREATE TABLE no_index_cleanup (i INT PRIMARY KEY) WITH (vacuum_index_cleanup = false);
 VACUUM (INDEX_CLEANUP FALSE) vaccluster;
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index a558580..97a3140 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -62,6 +62,13 @@ VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
 
+-- PARALLEL option
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
+VACUUM (PARALLEL 0) vaccluster; -- error
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) vaccluster;
+VACUUM (PARALLEL 2, FULL TRUE) vaccluster; -- error, cannot use both PARALLEL and FULL
+
 -- INDEX_CLEANUP option
 CREATE TABLE no_index_cleanup (i INT PRIMARY KEY) WITH (vacuum_index_cleanup = false);
 VACUUM (INDEX_CLEANUP FALSE) vaccluster;
-- 
2.10.5

