From c865e25a6367eda8165db9028786b6ea36e855ff Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 11 Mar 2019 15:34:01 +0200
Subject: [PATCH 4/6] Move the page deletion logic to separate function v22

If a VACUUM does multiple index passes, I think we only want to do the
empty page deletion after the final pass. That saves effort, since we
only need to scan the internal pages once. But even if we wanted to do
it on every pass, I think having a separate function makes it more
readable.
---
 src/backend/access/gist/gistvacuum.c | 304 ++++++++++++++-------------
 1 file changed, 157 insertions(+), 147 deletions(-)

diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 99dbf1cbb7..9439dfb49c 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -23,25 +23,34 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 
-/* Working state needed by gistbulkdelete */
 typedef struct
 {
+	IndexBulkDeleteResult stats;
+
 	IndexVacuumInfo *info;
-	IndexBulkDeleteResult *stats;
+	BlockNumber numEmptyPages;
+	BlockSet	internalPagesMap;
+	BlockSet	emptyLeafPagesMap;
+} GistBulkDeleteResult;
+
+/* Working state needed by gistbulkdelete */
+typedef struct
+{
+	GistBulkDeleteResult *stats;
 	IndexBulkDeleteCallback callback;
 	void	   *callback_state;
 	GistNSN		startNSN;
 	BlockNumber totFreePages;	/* true total # of free pages */
-	BlockNumber emptyPages;
-
-	BlockSet	internalPagesMap;
-	BlockSet	emptyLeafPagesMap;
 } GistVacState;
 
-static void gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+static bool gistdeletepage(GistBulkDeleteResult *stats,
+			   Buffer buffer, Page page, OffsetNumber downlink,
+			   Buffer leafBuffer, Page leafPage, TransactionId txid);
+static void gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state);
 static void gistvacuumpage(GistVacState *vstate, BlockNumber blkno,
 			   BlockNumber orig_blkno);
+static void gistvacuum_recycle_pages(GistBulkDeleteResult *stats);
 
 /*
  * VACUUM bulkdelete stage: remove index entries.
@@ -50,13 +59,15 @@ IndexBulkDeleteResult *
 gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state)
 {
+	GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
 	/* allocate stats if first time through, else re-use existing struct */
-	if (stats == NULL)
-		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+	if (gist_stats == NULL)
+		gist_stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
 
-	gistvacuumscan(info, stats, callback, callback_state);
+	gistvacuumscan(info, gist_stats, callback, callback_state);
 
-	return stats;
+	return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
@@ -65,6 +76,8 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 IndexBulkDeleteResult *
 gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
+	GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
 	/* No-op in ANALYZE ONLY mode */
 	if (info->analyze_only)
 		return stats;
@@ -74,10 +87,10 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	 * stats from the latest gistbulkdelete call.  If it wasn't called, we
 	 * still need to do a pass over the index, to obtain index statistics.
 	 */
-	if (stats == NULL)
+	if (gist_stats == NULL)
 	{
-		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-		gistvacuumscan(info, stats, NULL, NULL);
+		gist_stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+		gistvacuumscan(info, gist_stats, NULL, NULL);
 	}
 
 	/*
@@ -88,11 +101,11 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	 */
 	if (!info->estimated_count)
 	{
-		if (stats->num_index_tuples > info->num_heap_tuples)
-			stats->num_index_tuples = info->num_heap_tuples;
+		if (gist_stats->stats.num_index_tuples > info->num_heap_tuples)
+			gist_stats->stats.num_index_tuples = info->num_heap_tuples;
 	}
 
-	return stats;
+	return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
@@ -101,7 +114,7 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
  * Does not remove last downlink.
  */
 static bool
-gistdeletepage(GistVacState *vstate,
+gistdeletepage(GistBulkDeleteResult *stats,
 			   Buffer buffer, Page page, OffsetNumber downlink,
 			   Buffer leafBuffer, Page leafPage, TransactionId txid)
 {
@@ -127,18 +140,17 @@ gistdeletepage(GistVacState *vstate,
 	GistPageSetDeleteXid(leafPage,txid);
 	GistPageSetDeleted(leafPage);
 	MarkBufferDirty(leafBuffer);
-	vstate->stats->pages_deleted++;
-	vstate->emptyPages--;
+	stats->stats.pages_deleted++;
+	stats->numEmptyPages--;
 
 	MarkBufferDirty(buffer);
 	/* Offsets are changed as long as we delete tuples from internal page */
 	PageIndexTupleDelete(page, downlink);
 
-	if (RelationNeedsWAL(vstate->info->index))
-		recptr 	= gistXLogSetDeleted(vstate->info->index->rd_node, leafBuffer,
-										txid, buffer, downlink);
+	if (RelationNeedsWAL(stats->info->index))
+		recptr 	= gistXLogPageDelete(leafBuffer, txid, buffer, downlink);
 	else
-		recptr = gistGetFakeLSN(vstate->info->index);
+		recptr = gistGetFakeLSN(stats->info->index);
 	PageSetLSN(page, recptr);
 	PageSetLSN(leafPage, recptr);
 
@@ -167,7 +179,7 @@ gistdeletepage(GistVacState *vstate,
  * whether any given heap tuple (identified by ItemPointer) is being deleted.
  */
 static void
-gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state)
 {
 	Relation	rel = info->index;
@@ -180,12 +192,12 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	 * Reset counts that will be incremented during the scan; needed in case
 	 * of multiple scans during a single VACUUM command.
 	 */
-	stats->estimated_count = false;
-	stats->num_index_tuples = 0;
-	stats->pages_deleted = 0;
+	stats->stats.estimated_count = false;
+	stats->stats.num_index_tuples = 0;
+	stats->stats.pages_deleted = 0;
 
 	/* Set up info to pass down to gistvacuumpage */
-	vstate.info = info;
+	stats->info = info;
 	vstate.stats = stats;
 	vstate.callback = callback;
 	vstate.callback_state = callback_state;
@@ -194,9 +206,6 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	else
 		vstate.startNSN = gistGetFakeLSN(rel);
 	vstate.totFreePages = 0;
-	vstate.emptyPages = 0;
-	vstate.internalPagesMap = NULL;
-	vstate.emptyLeafPagesMap = NULL;
 
 	/*
 	 * The outer loop iterates over all index pages, in physical order (we
@@ -257,114 +266,15 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	if (vstate.totFreePages > 0)
 		IndexFreeSpaceMapVacuum(rel);
 
-	/* update statistics */
-	stats->num_pages = num_pages;
-	stats->pages_free = vstate.totFreePages;
-
-	/* rescan all inner pages to find those that have empty child pages */
-	if (vstate.emptyPages > 0)
-	{
-		BlockNumber			x;
-
-		x = InvalidBlockNumber;
-		while (vstate.emptyPages > 0 &&
-			   (x = blockset_next(vstate.internalPagesMap, x)) != InvalidBlockNumber)
-		{
-			Buffer		buffer;
-			Page		page;
-			OffsetNumber off,
-				maxoff;
-			IndexTuple  idxtuple;
-			ItemId	    iid;
-			OffsetNumber todelete[MaxOffsetNumber];
-			Buffer		buftodelete[MaxOffsetNumber];
-			int			ntodelete = 0;
-
-			blkno = (BlockNumber) x;
-
-			buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-										info->strategy);
-
-			LockBuffer(buffer, GIST_EXCLUSIVE);
-			page = (Page) BufferGetPage(buffer);
-			if (PageIsNew(page) || GistPageIsDeleted(page) || GistPageIsLeaf(page))
-			{
-				UnlockReleaseBuffer(buffer);
-				continue;
-			}
-
-			maxoff = PageGetMaxOffsetNumber(page);
-			/* Check that leafs are still empty and decide what to delete */
-			for (off = FirstOffsetNumber; off <= maxoff && ntodelete < maxoff-1; off = OffsetNumberNext(off))
-			{
-				Buffer		leafBuffer;
-				BlockNumber leafBlockNo;
-
-				/* We must keep at least one leaf page per each */
-				if (ntodelete >= maxoff-1)
-					continue;
-
-				iid = PageGetItemId(page, off);
-				idxtuple = (IndexTuple) PageGetItem(page, iid);
-				/* if this page was not empty in previous scan - we do not consider it */
-				leafBlockNo = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
-				if (!blockset_get(leafBlockNo, vstate.emptyLeafPagesMap))
-					continue;
-
-				leafBuffer = ReadBufferExtended(rel, MAIN_FORKNUM, leafBlockNo,
-												RBM_NORMAL, info->strategy);
- 
-				buftodelete[ntodelete] = leafBuffer;
-				todelete[ntodelete++] = off;
-			}
-
-			/*
-			 * We will have to relock internal page in case of deletes:
-			 * we cannot lock child while holding parent lock without risk
-			 * of a deadlock
-			 */
-			LockBuffer(buffer, GIST_UNLOCK);
-
-			if (ntodelete)
-			{
-				/*
-				 * Like in _bt_unlink_halfdead_page we need an upper bound on xid
-				 * that could hold downlinks to this page. We use
-				 * ReadNewTransactionId() to instead of GetCurrentTransactionId
-				 * since we are in a VACUUM.
-				 */
-				TransactionId	txid = ReadNewTransactionId();
-
-				int deleted = 0;
-
-				for (off = 0; off < ntodelete; off++)
-				{
-					Buffer	leafBuffer = buftodelete[off];
-					Page	leafPage;
-					LockBuffer(leafBuffer, GIST_EXCLUSIVE);
-					gistcheckpage(rel, leafBuffer);
-					leafPage = (Page) BufferGetPage(leafBuffer);
-					if (GistPageIsLeaf(leafPage) /* not a leaf anymore */ 
-						&& PageGetMaxOffsetNumber(leafPage) == InvalidOffsetNumber /* Page is not empry */
-						&& !(GistFollowRight(leafPage) || GistPageGetNSN(page) > GistPageGetNSN(leafPage)) /* No follow-right */
-						)
-					{
-						LockBuffer(buffer, GIST_EXCLUSIVE);
-						page = (Page) BufferGetPage(buffer);
-						if (gistdeletepage(&vstate, buffer, page, todelete[off] - deleted, leafBuffer, leafPage, txid))
-							deleted++;
-						LockBuffer(buffer, GIST_UNLOCK);
-					}
-					UnlockReleaseBuffer(leafBuffer);
-				}
-			}
 
-			ReleaseBuffer(buffer);
-		}
-	}
+	/* Recycle empty pages */
+	gistvacuum_recycle_pages(stats);
 
-	blockset_free(vstate.emptyLeafPagesMap);
-	blockset_free(vstate.internalPagesMap);
+	blockset_free(stats->emptyLeafPagesMap);
+	blockset_free(stats->internalPagesMap);
+	/* update statistics */
+	stats->stats.num_pages = num_pages;
+	stats->stats.pages_free = vstate.totFreePages;
 }
 
 /*
@@ -381,8 +291,8 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 static void
 gistvacuumpage(GistVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
 {
-	IndexVacuumInfo *info = vstate->info;
-	IndexBulkDeleteResult *stats = vstate->stats;
+	GistBulkDeleteResult *stats = vstate->stats;
+	IndexVacuumInfo *info = stats->info;
 	IndexBulkDeleteCallback callback = vstate->callback;
 	void	   *callback_state = vstate->callback_state;
 	Relation	rel = info->index;
@@ -411,7 +321,7 @@ restart:
 		/* Okay to recycle this page */
 		RecordFreeIndexPage(rel, blkno);
 		vstate->totFreePages++;
-		stats->pages_deleted++;
+		stats->stats.pages_deleted++;
 	}
 	else if (GistPageIsLeaf(page))
 	{
@@ -486,7 +396,7 @@ restart:
 
 			END_CRIT_SECTION();
 
-			stats->tuples_removed += ntodelete;
+			stats->stats.tuples_removed += ntodelete;
 			/* must recompute maxoff */
 			maxoff = PageGetMaxOffsetNumber(page);
 		}
@@ -494,16 +404,14 @@ restart:
 		nremain = maxoff - FirstOffsetNumber + 1;
 		if (nremain == 0)
 		{
-			vstate->emptyLeafPagesMap = blockset_set(vstate->emptyLeafPagesMap, blkno);
-			vstate->emptyPages++;
+			stats->emptyLeafPagesMap = blockset_set(stats->emptyLeafPagesMap, blkno);
+			stats->numEmptyPages++;
 		}
 		else
-			stats->num_index_tuples += nremain;
+			stats->stats.num_index_tuples += nremain;
 	}
 	else
 	{
-		vstate->internalPagesMap = blockset_set(vstate->internalPagesMap, blkno);
-
 		/*
 		 * On an internal page, check for "invalid tuples", left behind by an
 		 * incomplete page split on PostgreSQL 9.0 or below.  These are not
@@ -528,6 +436,8 @@ restart:
 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
 						 errhint("Please REINDEX it.")));
 		}
+
+		stats->internalPagesMap = blockset_set(stats->internalPagesMap, blkno);
 	}
 
 	UnlockReleaseBuffer(buffer);
@@ -545,3 +455,103 @@ restart:
 		goto restart;
 	}
 }
+
+static void
+gistvacuum_recycle_pages(GistBulkDeleteResult *stats)
+{
+	IndexVacuumInfo *info = stats->info;
+	Relation	rel = info->index;
+	BlockNumber blkno;
+
+	/* rescan all inner pages to find those that have empty child pages */
+	blkno = InvalidBlockNumber;
+	while (stats->numEmptyPages > 0 &&
+		   (blkno = blockset_next(stats->internalPagesMap, blkno)) != InvalidBlockNumber)
+	{
+		Buffer		buffer;
+		Page		page;
+		OffsetNumber off,
+			maxoff;
+		IndexTuple  idxtuple;
+		ItemId	    iid;
+		OffsetNumber todelete[MaxOffsetNumber];
+		Buffer		buftodelete[MaxOffsetNumber];
+		int			ntodelete = 0;
+
+		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
+									info->strategy);
+
+		LockBuffer(buffer, GIST_EXCLUSIVE);
+		page = (Page) BufferGetPage(buffer);
+		if (PageIsNew(page) || GistPageIsDeleted(page) || GistPageIsLeaf(page))
+		{
+			UnlockReleaseBuffer(buffer);
+			continue;
+		}
+
+		maxoff = PageGetMaxOffsetNumber(page);
+		/* Check that leafs are still empty and decide what to delete */
+		for (off = FirstOffsetNumber; off <= maxoff && ntodelete < maxoff-1; off = OffsetNumberNext(off))
+		{
+			Buffer		leafBuffer;
+			BlockNumber leafBlockNo;
+
+			iid = PageGetItemId(page, off);
+			idxtuple = (IndexTuple) PageGetItem(page, iid);
+			/* if this page was not empty in previous scan - we do not consider it */
+			leafBlockNo = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+			if (!blockset_get(leafBlockNo, stats->emptyLeafPagesMap))
+				continue;
+
+			leafBuffer = ReadBufferExtended(rel, MAIN_FORKNUM, leafBlockNo,
+											RBM_NORMAL, info->strategy);
+
+			buftodelete[ntodelete] = leafBuffer;
+			todelete[ntodelete++] = off;
+		}
+
+		
+		/*
+		 * We will have to relock internal page in case of deletes:
+		 * we cannot lock child while holding parent lock without risk
+		 * of a deadlock
+		 */
+		LockBuffer(buffer, GIST_UNLOCK);
+
+		if (ntodelete)
+		{
+			/*
+			 * Like in _bt_unlink_halfdead_page we need an upper bound on xid
+			 * that could hold downlinks to this page. We use
+			 * ReadNewTransactionId() to instead of GetCurrentTransactionId
+			 * since we are in a VACUUM.
+			 */
+			TransactionId	txid = ReadNewTransactionId();
+
+			int deleted = 0;
+
+			for (off = 0; off < ntodelete; off++)
+			{
+				Buffer	leafBuffer = buftodelete[off];
+				Page	leafPage;
+				LockBuffer(leafBuffer, GIST_EXCLUSIVE);
+				gistcheckpage(rel, leafBuffer);
+				leafPage = (Page) BufferGetPage(leafBuffer);
+				if (GistPageIsLeaf(leafPage) /* not a leaf anymore */ 
+					&& PageGetMaxOffsetNumber(leafPage) == InvalidOffsetNumber /* Page is not empry */
+					&& !(GistFollowRight(leafPage) || GistPageGetNSN(page) > GistPageGetNSN(leafPage)) /* No follow-right */
+					)
+				{
+					LockBuffer(buffer, GIST_EXCLUSIVE);
+					page = (Page) BufferGetPage(buffer);
+					if (gistdeletepage(stats, buffer, page, todelete[off] - deleted, leafBuffer, leafPage, txid))
+						deleted++;
+					LockBuffer(buffer, GIST_UNLOCK);
+				}
+				UnlockReleaseBuffer(leafBuffer);
+			}
+		}
+
+		ReleaseBuffer(buffer);
+	}
+}
-- 
2.20.1

