diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index bfb1ea0d25..16deb328bb 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -153,6 +153,7 @@
 #define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
 #define PARALLEL_VACUUM_KEY_WAL_USAGE		5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS		6
 
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
@@ -206,14 +207,6 @@ typedef struct LVShared
 	Oid			relid;
 	int			elevel;
 
-	/*
-	 * An indication for vacuum workers to perform either index vacuum or
-	 * index cleanup.  first_time is true only if for_cleanup is true and
-	 * bulk-deletion is not performed yet.
-	 */
-	bool		for_cleanup;
-	bool		first_time;
-
 	/*
 	 * Fields for both index vacuum and cleanup.
 	 *
@@ -251,23 +244,18 @@ typedef struct LVShared
 	 */
 	pg_atomic_uint32 active_nworkers;
 
-	/*
-	 * Variables to control parallel vacuum.  We have a bitmap to indicate
-	 * which index has stats in shared memory.  The set bit in the map
-	 * indicates that the particular index supports a parallel vacuum.
-	 */
-	pg_atomic_uint32 idx;		/* counter for vacuuming and clean up */
-	uint32		offset;			/* sizeof header incl. bitmap */
-	bits8		bitmap[FLEXIBLE_ARRAY_MEMBER];	/* bit map of NULLs */
-
-	/* Shared index statistics data follows at end of struct */
+	/* Counter for vacuuming and cleanup */
+	pg_atomic_uint32 idx;
 } LVShared;
 
-#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
-#define GetSharedIndStats(s) \
-	((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
-#define IndStatsIsNull(s, i) \
-	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
+/* Status used during parallel index vacuum or cleanup */
+typedef enum LVIndVacStatus
+{
+	INDVAC_STATUS_INITIAL = 0,
+	INDVAC_STATUS_NEED_BULKDELETE,
+	INDVAC_STATUS_NEED_CLEANUP,
+	INDVAC_STATUS_COMPLETED,
+} LVIndVacStatus;
 
 /*
  * Struct for an index bulk-deletion statistic used for parallel vacuum.  This
@@ -275,7 +263,15 @@ typedef struct LVShared
  */
 typedef struct LVSharedIndStats
 {
-	bool		updated;		/* are the stats updated? */
+	LVIndVacStatus status;
+
+	/*
+	 * True if both leader and worker can process the index, otherwise only
+	 * leader can process it.
+	 */
+	bool	parallel_safe;
+
+	bool	istat_updated;		/* are the stats updated? */
 	IndexBulkDeleteResult istat;
 } LVSharedIndStats;
 
@@ -287,6 +283,9 @@ typedef struct LVParallelState
 	/* Shared information among parallel vacuum workers */
 	LVShared   *lvshared;
 
+	/* Shared index statistics among parallel vacuum workers */
+	LVSharedIndStats *lvsharedindstats;
+
 	/* Points to buffer usage area in DSM */
 	BufferUsage *buffer_usage;
 
@@ -416,18 +415,14 @@ static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
 									LVRelState *vacrel);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
-static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel);
-static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers);
-static void do_parallel_processing(LVRelState *vacrel,
-								   LVShared *lvshared);
-static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel,
-													LVShared *lvshared);
-static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel,
-														 IndexBulkDeleteResult *istat,
-														 LVShared *lvshared,
-														 LVSharedIndStats *shared_indstats,
-														 LVRelState *vacrel);
+static void parallel_lazy_vacuum_or_cleanup_all_indexes(LVRelState *vacrel, bool vacuum);
+static void prepare_parallel_index_processing(LVRelState *vacrel, bool vacuum);
+static void lazy_serial_process_indexes(LVRelState *vacrel);
+static void lazy_parallel_process_indexes(LVRelState *vacrel, LVShared *lvshared,
+										  LVSharedIndStats *indstats);
+static void lazy_parallel_process_one_index(LVRelState *vacrel, Relation indrel,
+											LVShared *lvshared,
+											LVSharedIndStats *stats);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
 													IndexBulkDeleteResult *istat,
@@ -450,16 +445,14 @@ static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int	compute_parallel_vacuum_workers(LVRelState *vacrel,
-											int nrequested,
-											bool *will_parallel_vacuum);
+static int	compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested);
 static void update_index_statistics(LVRelState *vacrel);
 static LVParallelState *begin_parallel_vacuum(LVRelState *vacrel,
 											  BlockNumber nblocks,
 											  int nrequested);
 static void end_parallel_vacuum(LVRelState *vacrel);
-static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx);
-static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared);
+static bool parallel_processing_is_safe(LVRelState *vacrel, Relation indrel,
+										bool vacuum);
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -2251,7 +2244,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	else
 	{
 		/* Outsource everything to parallel variant */
-		do_parallel_lazy_vacuum_all_indexes(vacrel);
+		parallel_lazy_vacuum_or_cleanup_all_indexes(vacrel, true);
 
 		/*
 		 * Do a postcheck to consider applying wraparound failsafe now.  Note
@@ -2625,78 +2618,32 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 	return false;
 }
 
-/*
- * Perform lazy_vacuum_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel)
-{
-	/* Tell parallel workers to do index vacuuming */
-	vacrel->lps->lvshared->for_cleanup = false;
-	vacrel->lps->lvshared->first_time = false;
-
-	/*
-	 * We can only provide an approximate value of num_heap_tuples, at least
-	 * for now.  Matches serial VACUUM case.
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
-	vacrel->lps->lvshared->estimated_count = true;
-
-	do_parallel_vacuum_or_cleanup(vacrel,
-								  vacrel->lps->nindexes_parallel_bulkdel);
-}
-
-/*
- * Perform lazy_cleanup_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel)
-{
-	int			nworkers;
-
-	/*
-	 * If parallel vacuum is active we perform index cleanup with parallel
-	 * workers.
-	 *
-	 * Tell parallel workers to do index cleanup.
-	 */
-	vacrel->lps->lvshared->for_cleanup = true;
-	vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0);
-
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
-	vacrel->lps->lvshared->estimated_count =
-		(vacrel->tupcount_pages < vacrel->rel_pages);
-
-	/* Determine the number of parallel workers to launch */
-	if (vacrel->lps->lvshared->first_time)
-		nworkers = vacrel->lps->nindexes_parallel_cleanup +
-			vacrel->lps->nindexes_parallel_condcleanup;
-	else
-		nworkers = vacrel->lps->nindexes_parallel_cleanup;
-
-	do_parallel_vacuum_or_cleanup(vacrel, nworkers);
-}
-
 /*
  * Perform index vacuum or index cleanup with parallel workers.  This function
- * must be used by the parallel vacuum leader process.  The caller must set
- * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
- * cleanup.
+ * must be used by the parallel vacuum leader process.
  */
 static void
-do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
+parallel_lazy_vacuum_or_cleanup_all_indexes(LVRelState *vacrel, bool vacuum)
 {
 	LVParallelState *lps = vacrel->lps;
+	int	nworkers;
 
 	Assert(!IsParallelWorker());
 	Assert(ParallelVacuumIsActive(vacrel));
 	Assert(vacrel->nindexes > 0);
 
+	/* Determine the number of parallel workers to launch */
+	if (vacuum)
+		nworkers = vacrel->lps->nindexes_parallel_bulkdel;
+	else
+	{
+		nworkers = vacrel->lps->nindexes_parallel_cleanup;
+
+		/* Add conditionally parallel-aware indexes if in the first time call */
+		if (vacrel->num_index_scans == 0)
+			nworkers += vacrel->lps->nindexes_parallel_condcleanup;
+	}
+
 	/* The leader process will participate */
 	nworkers--;
 
@@ -2707,17 +2654,18 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
 	 */
 	nworkers = Min(nworkers, lps->pcxt->nworkers);
 
+	/* Set data required for parallel index vacuum or cleanup */
+	prepare_parallel_index_processing(vacrel, vacuum);
+
+	/* Reset the parallel index processing counter */
+	pg_atomic_write_u32(&(lps->lvshared->idx), 0);
+
 	/* Setup the shared cost-based vacuum delay and launch workers */
 	if (nworkers > 0)
 	{
+		/* Reinitialize the parallel context to relaunch parallel workers */
 		if (vacrel->num_index_scans > 0)
-		{
-			/* Reset the parallel index processing counter */
-			pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
-			/* Reinitialize the parallel context to relaunch parallel workers */
 			ReinitializeParallelDSM(lps->pcxt);
-		}
 
 		/*
 		 * Set up shared cost balance and the number of active workers for
@@ -2750,28 +2698,28 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
 			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
 		}
 
-		if (lps->lvshared->for_cleanup)
+		if (vacuum)
 			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
-									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
 									 lps->pcxt->nworkers_launched),
 							lps->pcxt->nworkers_launched, nworkers)));
 		else
 			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
-									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
 									 lps->pcxt->nworkers_launched),
 							lps->pcxt->nworkers_launched, nworkers)));
 	}
 
 	/* Process the indexes that can be processed by only leader process */
-	do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared);
+	lazy_serial_process_indexes(vacrel);
 
 	/*
-	 * Join as a parallel worker.  The leader process alone processes all the
-	 * indexes in the case where no workers are launched.
+	 * Join as a parallel worker.  The leader process alone processes all
+	 * parallel-safe indexes in the case where no workers are launched.
 	 */
-	do_parallel_processing(vacrel, lps->lvshared);
+	lazy_parallel_process_indexes(vacrel, lps->lvshared, vacrel->lps->lvsharedindstats);
 
 	/*
 	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
@@ -2786,6 +2734,18 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
 			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
 	}
 
+	/*
+	 * Reset all index status back to invalid (while checking that we have
+	 * processed all indexes).
+	 */
+	for (int i = 0; i < vacrel->nindexes; i++)
+	{
+		LVSharedIndStats *stats = &(lps->lvsharedindstats[i]);
+
+		Assert(stats->status == INDVAC_STATUS_COMPLETED);
+		stats->status = INDVAC_STATUS_INITIAL;
+	}
+
 	/*
 	 * Carry the shared balance value to heap scan and disable shared costing
 	 */
@@ -2797,12 +2757,62 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
 	}
 }
 
+
+/*
+ * This function prepares the shared data for parallel index vacuum or cleanup,
+ * and set index vacuum status accordingly.
+ */
+static void
+prepare_parallel_index_processing(LVRelState *vacrel, bool vacuum)
+{
+	LVIndVacStatus next_status;
+
+	if (vacuum)
+	{
+		/*
+		 * We can only provide an approximate value of num_heap_tuples, at least
+		 * for now.  Matches serial VACUUM case.
+		 */
+		vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
+		vacrel->lps->lvshared->estimated_count = true;
+
+		next_status = INDVAC_STATUS_NEED_BULKDELETE;
+	}
+	else
+	{
+		/*
+		 * We can provide a better estimate of total number of surviving
+		 * tuples (we assume indexes are more interested in that than in the
+		 * number of nominally live tuples).
+		 */
+		vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
+		vacrel->lps->lvshared->estimated_count =
+			(vacrel->tupcount_pages < vacrel->rel_pages);
+
+		next_status = INDVAC_STATUS_NEED_CLEANUP;
+	}
+
+	/* Set index vacuum status and mark as parallel safe or not */
+	for (int i = 0; i < vacrel->nindexes; i++)
+	{
+		LVSharedIndStats *stats = &(vacrel->lps->lvsharedindstats[i]);
+
+		Assert(stats->status == INDVAC_STATUS_INITIAL);
+
+		stats->status = next_status;
+		stats->parallel_safe = parallel_processing_is_safe(vacrel,
+														   vacrel->indrels[i],
+														   vacuum);
+	}
+}
+
 /*
  * Index vacuum/cleanup routine used by the leader process and parallel
  * vacuum worker processes to process the indexes in parallel.
  */
 static void
-do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
+lazy_parallel_process_indexes(LVRelState *vacrel, LVShared *lvshared,
+							  LVSharedIndStats *indstats)
 {
 	/*
 	 * Increment the active worker count if we are able to launch any worker.
@@ -2810,13 +2820,10 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
 	if (VacuumActiveNWorkers)
 		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
 
-	/* Loop until all indexes are vacuumed */
 	for (;;)
 	{
-		int			idx;
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
+		int idx;
+		LVSharedIndStats *stats;
 
 		/* Get an index number to process */
 		idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
@@ -2825,28 +2832,17 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
 		if (idx >= vacrel->nindexes)
 			break;
 
-		/* Get the index statistics space from DSM, if any */
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
-
-		/* Skip indexes not participating in parallelism */
-		if (shared_istat == NULL)
-			continue;
-
-		indrel = vacrel->indrels[idx];
+		stats = &(indstats[idx]);
 
 		/*
-		 * Skip processing indexes that are unsafe for workers (these are
-		 * processed in do_serial_processing_for_unsafe_indexes() by leader)
+		 * Parallel unsafe indexes can be processed only by leader (these are
+		 * processed in lazy_serial_process_indexes() by leader.
 		 */
-		if (!parallel_processing_is_safe(indrel, lvshared))
+		if (!stats->parallel_safe)
 			continue;
 
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
+		lazy_parallel_process_one_index(vacrel, vacrel->indrels[idx],
+										lvshared, stats);
 	}
 
 	/*
@@ -2861,16 +2857,16 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
  * Perform parallel processing of indexes in leader process.
  *
  * Handles index vacuuming (or index cleanup) for indexes that are not
- * parallel safe.  It's possible that this will vary for a given index, based
- * on details like whether we're performing for_cleanup processing right now.
+ * parallel safe.
  *
  * Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by compute_parallel_vacuum_workers().  These indexes never get a
- * slot for statistics in DSM.
+ * enforced by compute_parallel_vacuum_workers().
  */
 static void
-do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
+lazy_serial_process_indexes(LVRelState *vacrel)
 {
+	LVParallelState *lps = vacrel->lps;
+
 	Assert(!IsParallelWorker());
 
 	/*
@@ -2879,30 +2875,16 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
 	if (VacuumActiveNWorkers)
 		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
 
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
+	for (int i = 0; i < vacrel->nindexes; i++)
 	{
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
-
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
-		indrel = vacrel->indrels[idx];
+		LVSharedIndStats *stats = &(lps->lvsharedindstats[i]);
 
-		/*
-		 * We're only here for the indexes that parallel workers won't
-		 * process.  Note that the shared_istat test ensures that we process
-		 * indexes that fell under initial size cutoff.
-		 */
-		if (shared_istat != NULL &&
-			parallel_processing_is_safe(indrel, lvshared))
+		/* Skip, safe indexes as they are processed by workers */
+		if (stats->parallel_safe)
 			continue;
 
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
+		lazy_parallel_process_one_index(vacrel, vacrel->indrels[i],
+										lps->lvshared, stats);
 	}
 
 	/*
@@ -2919,29 +2901,33 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
  * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
  * segment.
  */
-static IndexBulkDeleteResult *
-parallel_process_one_index(Relation indrel,
-						   IndexBulkDeleteResult *istat,
-						   LVShared *lvshared,
-						   LVSharedIndStats *shared_istat,
-						   LVRelState *vacrel)
+static void
+lazy_parallel_process_one_index(LVRelState *vacrel, Relation indrel, LVShared *lvshared,
+								LVSharedIndStats *stats)
 {
+	IndexBulkDeleteResult *istat = NULL;
 	IndexBulkDeleteResult *istat_res;
 
-	/*
-	 * Update the pointer to the corresponding bulk-deletion result if someone
-	 * has already updated it
-	 */
-	if (shared_istat && shared_istat->updated && istat == NULL)
-		istat = &shared_istat->istat;
+	/* Get the index statistics space, if already updated */
+	if (stats->istat_updated)
+		istat = &(stats->istat);
 
-	/* Do vacuum or cleanup of the index */
-	if (lvshared->for_cleanup)
-		istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples,
-										   lvshared->estimated_count, vacrel);
-	else
-		istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples,
-										  vacrel);
+	switch (stats->status)
+	{
+		case INDVAC_STATUS_NEED_BULKDELETE:
+			istat_res = lazy_vacuum_one_index(indrel, istat,
+											  lvshared->reltuples, vacrel);
+			break;
+		case INDVAC_STATUS_NEED_CLEANUP:
+			istat_res = lazy_cleanup_one_index(indrel, istat,
+											   lvshared->reltuples,
+											   lvshared->estimated_count,
+											   vacrel);
+				break;
+		default:
+			elog(ERROR, "unexpected parallel vacuum index status %d",
+				 stats->status);
+	}
 
 	/*
 	 * Copy the index bulk-deletion result returned from ambulkdelete and
@@ -2955,19 +2941,18 @@ parallel_process_one_index(Relation indrel,
 	 * Since all vacuum workers write the bulk-deletion result at different
 	 * slots we can write them without locking.
 	 */
-	if (shared_istat && !shared_istat->updated && istat_res != NULL)
+	if (!stats->istat_updated && istat_res != NULL)
 	{
-		memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult));
-		shared_istat->updated = true;
-
-		/* Free the locally-allocated bulk-deletion result */
+		memcpy(&(stats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+		stats->istat_updated = true;
 		pfree(istat_res);
-
-		/* return the pointer to the result from shared memory */
-		return &shared_istat->istat;
 	}
 
-	return istat_res;
+	/*
+	 * Update the status to completed. No need to lock here since each
+	 * worker touches different indexes.
+	 */
+	stats->status = INDVAC_STATUS_COMPLETED;
 }
 
 /*
@@ -3002,7 +2987,7 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 	else
 	{
 		/* Outsource everything to parallel variant */
-		do_parallel_lazy_cleanup_all_indexes(vacrel);
+		parallel_lazy_vacuum_or_cleanup_all_indexes(vacrel, false);
 	}
 }
 
@@ -3734,13 +3719,10 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
  *
  * nrequested is the number of parallel workers that user requested.  If
  * nrequested is 0, we compute the parallel degree based on nindexes, that is
- * the number of indexes that support parallel vacuum.  This function also
- * sets will_parallel_vacuum to remember indexes that participate in parallel
- * vacuum.
+ * the number of indexes that support parallel vacuum.
  */
 static int
-compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
-								bool *will_parallel_vacuum)
+compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested)
 {
 	int			nindexes_parallel = 0;
 	int			nindexes_parallel_bulkdel = 0;
@@ -3766,8 +3748,6 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
 			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
 			continue;
 
-		will_parallel_vacuum[idx] = true;
-
 		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
 			nindexes_parallel_bulkdel++;
 		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
@@ -3840,14 +3820,15 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 	Relation   *indrels = vacrel->indrels;
 	int			nindexes = vacrel->nindexes;
 	ParallelContext *pcxt;
+	LVSharedIndStats	*indstats;
 	LVShared   *shared;
 	LVDeadTuples *dead_tuples;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
-	bool	   *will_parallel_vacuum;
 	long		maxtuples;
-	Size		est_shared;
-	Size		est_deadtuples;
+	Size		est_indstats = 0;
+	Size		est_shared = 0;
+	Size		est_deadtuples = 0;
 	int			nindexes_mwm = 0;
 	int			parallel_workers = 0;
 	int			querylen;
@@ -3862,17 +3843,11 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 	/*
 	 * Compute the number of parallel vacuum workers to launch
 	 */
-	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = compute_parallel_vacuum_workers(vacrel,
-													   nrequested,
-													   will_parallel_vacuum);
+	parallel_workers = compute_parallel_vacuum_workers(vacrel, nrequested);
 
 	/* Can't perform vacuum in parallel */
 	if (parallel_workers <= 0)
-	{
-		pfree(will_parallel_vacuum);
 		return lps;
-	}
 
 	lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
 
@@ -3882,41 +3857,13 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 	Assert(pcxt->nworkers > 0);
 	lps->pcxt = pcxt;
 
-	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
-	est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		Relation	indrel = indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-		/*
-		 * Cleanup option should be either disabled, always performing in
-		 * parallel or conditionally performing in parallel.
-		 */
-		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
-			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
-		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
-
-		/* Skip indexes that don't participate in parallel vacuum */
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		if (indrel->rd_indam->amusemaintenanceworkmem)
-			nindexes_mwm++;
-
-		est_shared = add_size(est_shared, sizeof(LVSharedIndStats));
+	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
+	est_indstats = mul_size(sizeof(LVSharedIndStats), nindexes);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_indstats);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-		/*
-		 * Remember the number of indexes that support parallel operation for
-		 * each phase.
-		 */
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			lps->nindexes_parallel_bulkdel++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
-			lps->nindexes_parallel_cleanup++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
-			lps->nindexes_parallel_condcleanup++;
-	}
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared = MAXALIGN(sizeof(LVShared));
 	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
@@ -3953,6 +3900,45 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 
 	InitializeParallelDSM(pcxt);
 
+	/* Prepare index vacuum stats */
+	indstats = (LVSharedIndStats *) shm_toc_allocate(pcxt->toc, est_indstats);
+	MemSet(indstats, 0, est_indstats);
+	for (int idx = 0; idx < nindexes; idx++)
+	{
+		Relation	indrel = indrels[idx];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		/*
+		 * Cleanup option should be either disabled, always performing in
+		 * parallel or conditionally performing in parallel.
+		 */
+		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+		/* Skip indexes that don't participate in parallel vacuum */
+		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+			continue;
+
+		if (indrel->rd_indam->amusemaintenanceworkmem)
+			nindexes_mwm++;
+
+		/*
+		 * Remember the number of indexes that support parallel operation for
+		 * each phase.
+		 */
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			lps->nindexes_parallel_bulkdel++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+			lps->nindexes_parallel_cleanup++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+			lps->nindexes_parallel_condcleanup++;
+	}
+
+	shm_toc_insert(pcxt->toc,PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
+	lps->lvsharedindstats = indstats;
+
 	/* Prepare shared information */
 	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
 	MemSet(shared, 0, est_shared);
@@ -3966,21 +3952,6 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 	pg_atomic_init_u32(&(shared->cost_balance), 0);
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
 	pg_atomic_init_u32(&(shared->idx), 0);
-	shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-
-	/*
-	 * Initialize variables for shared index statistics, set NULL bitmap and
-	 * the size of stats for each index.
-	 */
-	memset(shared->bitmap, 0x00, BITMAPLEN(nindexes));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		/* Set NOT NULL as this index does support parallelism */
-		shared->bitmap[idx >> 3] |= 1 << (idx & 0x07);
-	}
 
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	lps->lvshared = shared;
@@ -4018,7 +3989,6 @@ begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
 					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
 	}
 
-	pfree(will_parallel_vacuum);
 	return lps;
 }
 
@@ -4043,21 +4013,12 @@ end_parallel_vacuum(LVRelState *vacrel)
 	/* Copy the updated statistics */
 	for (int idx = 0; idx < nindexes; idx++)
 	{
-		LVSharedIndStats *shared_istat;
-
-		shared_istat = parallel_stats_for_idx(lps->lvshared, idx);
+		LVSharedIndStats *stats = &(lps->lvsharedindstats[idx]);
 
-		/*
-		 * Skip index -- it must have been processed by the leader, from
-		 * inside do_serial_processing_for_unsafe_indexes()
-		 */
-		if (shared_istat == NULL)
-			continue;
-
-		if (shared_istat->updated)
+		if (stats->istat_updated)
 		{
 			indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-			memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult));
+			memcpy(indstats[idx], &stats->istat, sizeof(IndexBulkDeleteResult));
 		}
 		else
 			indstats[idx] = NULL;
@@ -4071,68 +4032,42 @@ end_parallel_vacuum(LVRelState *vacrel)
 	vacrel->lps = NULL;
 }
 
-/*
- * Return shared memory statistics for index at offset 'getidx', if any
- *
- * Returning NULL indicates that compute_parallel_vacuum_workers() determined
- * that the index is a totally unsuitable target for all parallel processing
- * up front.  For example, the index could be < min_parallel_index_scan_size
- * cutoff.
- */
-static LVSharedIndStats *
-parallel_stats_for_idx(LVShared *lvshared, int getidx)
-{
-	char	   *p;
-
-	if (IndStatsIsNull(lvshared, getidx))
-		return NULL;
-
-	p = (char *) GetSharedIndStats(lvshared);
-	for (int idx = 0; idx < getidx; idx++)
-	{
-		if (IndStatsIsNull(lvshared, idx))
-			continue;
-
-		p += sizeof(LVSharedIndStats);
-	}
-
-	return (LVSharedIndStats *) p;
-}
-
 /*
  * Returns false, if the given index can't participate in parallel index
- * vacuum or parallel index cleanup
+ * vacuum or parallel index cleanup.
  */
 static bool
-parallel_processing_is_safe(Relation indrel, LVShared *lvshared)
+parallel_processing_is_safe(LVRelState *vacrel, Relation indrel, bool vacuum)
 {
-	uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+	uint8	vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+	/*
+	 * Check if the index is a totally unsuitable target for all parallel
+	 * processing up front. For example, the index could be
+	 * < min_parallel_index_scan_size cufoff.
+	 */
+	if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+		RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+		return false;
 
-	/* first_time must be true only if for_cleanup is true */
-	Assert(lvshared->for_cleanup || !lvshared->first_time);
+	/* In parallel vacuum case, check if it supports parallel bulk-deletion */
+	if (vacuum)
+		return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
 
-	if (lvshared->for_cleanup)
-	{
-		/* Skip, if the index does not support parallel cleanup */
-		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
-			return true;
+	/* Not safe, if the index does not support parallel cleanup */
+	if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+		return false;
 
-		/*
-		 * Skip, if the index supports parallel cleanup conditionally, but we
-		 * have already processed the index (for bulkdelete).  See the
-		 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
-		 * when indexes support parallel cleanup conditionally.
-		 */
-		if (!lvshared->first_time &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-			return false;
-	}
-	else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
-	{
-		/* Skip if the index does not support parallel bulk deletion */
+	/*
+	 * Not safe, if the index supports parallel cleanup conditionally,
+	 * but we have already processed the index (for bulkdelete).  See the
+	 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
+	 * when indexes support parallel cleanup conditionally.
+	 */
+	if (vacrel->num_index_scans > 0 &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
 		return false;
-	}
 
 	return true;
 }
@@ -4148,6 +4083,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 {
 	Relation	rel;
 	Relation   *indrels;
+	LVSharedIndStats *lvindstats;
 	LVShared   *lvshared;
 	LVDeadTuples *dead_tuples;
 	BufferUsage *buffer_usage;
@@ -4161,10 +4097,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 										   false);
 	elevel = lvshared->elevel;
 
-	if (lvshared->for_cleanup)
-		elog(DEBUG1, "starting parallel vacuum worker for cleanup");
-	else
-		elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
+	elog(DEBUG1, "starting parallel vacuum worker");
 
 	/* Set debug_query_string for individual workers */
 	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
@@ -4185,6 +4118,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
 	Assert(nindexes > 0);
 
+	/* Set index statistics */
+	lvindstats = (LVSharedIndStats *) shm_toc_lookup(toc,
+													 PARALLEL_VACUUM_KEY_INDEX_STATS,
+													 false);
+
 	/* Set dead tuple space */
 	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
 												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
@@ -4230,7 +4168,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	InstrStartParallelQuery();
 
 	/* Process indexes to perform vacuum/cleanup */
-	do_parallel_processing(&vacrel, lvshared);
+	lazy_parallel_process_indexes(&vacrel, lvshared, lvindstats);
 
 	/* Report buffer/WAL usage during parallel execution */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out
index ddf0ee544b..a07f5b2b73 100644
--- a/src/test/regress/expected/vacuum_parallel.out
+++ b/src/test/regress/expected/vacuum_parallel.out
@@ -45,5 +45,25 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table;
 INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
 RESET max_parallel_maintenance_workers;
 RESET min_parallel_index_scan_size;
+CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off);
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g;
+CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a);
+CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a);
+CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b);
+CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a);
+CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1));
+DELETE FROM parallel_vacuum_table2;
+-- Parallel index vacuum for various types of indexes.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g;
+DELETE FROM parallel_vacuum_table2;
+SET maintenance_work_mem TO 1024;
+-- Parallel index vacuum for various types of indexes.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+RESET maintenance_work_mem;
 -- Deliberately don't drop table, to get further coverage from tools like
 -- pg_amcheck in some testing scenarios
diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql
index 1d23f33e39..49f4f4ce6d 100644
--- a/src/test/regress/sql/vacuum_parallel.sql
+++ b/src/test/regress/sql/vacuum_parallel.sql
@@ -42,5 +42,40 @@ INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
 RESET max_parallel_maintenance_workers;
 RESET min_parallel_index_scan_size;
 
+CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off);
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g;
+
+-- Create different types of indexes, i.g. having different parallelvacuumoptions.
+-- Also create a small index same as above.
+CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a);
+CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a);
+CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b);
+CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a);
+CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1));
+
+
+-- Parallel index vacuum for various types of indexes.
+DELETE FROM parallel_vacuum_table2;
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples
+-- with the minimum maintenance_work_mem.  However, it takes a long time to load.
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g;
+
+DELETE FROM parallel_vacuum_table2;
+
+SET maintenance_work_mem TO 1024;
+
+-- Parallel index vacuum for various types of indexes.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+RESET maintenance_work_mem;
+
 -- Deliberately don't drop table, to get further coverage from tools like
 -- pg_amcheck in some testing scenarios
