From 688498b003c8077bc516aaae06fdb8e8c89f5c53 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Sat, 30 Jul 2022 10:47:59 -0700
Subject: [PATCH v4] Shrink freeze WAL records via deduplication.

Make heapam WAL records that describe freezing performed by VACUUM more
space efficient by storing each distinct "freeze plan" once, alongside
an array of associated page offset numbers (one per freeze plan).  The
freeze plans required for most heap pages tend to naturally have a great
deal of redundancy, so this technique is very effective.  It routinely
results in a WAL record that is 5x smaller than an equivalent record
generated using the previous approach.

The freeze plan concept was introduced by bugfix commit 3b97e6823b,
which fixed shortcomings in how VACUUM processed MultiXacts.  We retain
the general idea of freeze plans, but return to using space efficient
page offset number arrays.  This makes the WAL records about as space
efficient as they were before the bugfix, without losing flexibility.

In passing refactor some of the details around how recovery conflicts
are handled during REDO of freeze WAL records, as well as visibility map
WAL records.  This makes it clearer that latestRemovedXid format cutoffs
are used by both record types, just like in prune WAL records.  The name
latestRemovedXid is slightly inappropriate here because nothing is
actually removed, but that is arguably a preexisting issue.  It can be
dealt with separately by bulk renaming latestRemovedXid fields across
the entire tree.

Author: Peter Geoghegan <pg@bowt.ie>
Reviewed-By: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-By: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-By: Justin Pryzby <pryzby@telsasoft.com>
Discussion: https://postgr.es/m/CAH2-Wz=XytErMnb8FAyFd+OQEbiipB0Q2FmFdXrggPL4VBnRYQ@mail.gmail.com
---
 src/include/access/heapam.h            |  25 ++
 src/include/access/heapam_xlog.h       |  42 ++-
 src/backend/access/heap/heapam.c       | 341 ++++++++++++++++++++-----
 src/backend/access/heap/vacuumlazy.c   |  38 +--
 src/backend/access/rmgrdesc/heapdesc.c |   8 +-
 5 files changed, 323 insertions(+), 131 deletions(-)

diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 5b0787574..9940663bc 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -99,6 +99,19 @@ typedef enum
 	HEAPTUPLE_DELETE_IN_PROGRESS	/* deleting xact is still in progress */
 } HTSV_Result;
 
+/* heap_prepare_freeze_tuple state describing how to freeze a tuple */
+typedef struct HeapFreezeTuple
+{
+	/* Fields describing how to process tuple */
+	uint16		t_infomask2;
+	uint16		t_infomask;
+	TransactionId xmax;
+	uint8		frzflags;
+
+	/* Page offset number for tuple */
+	OffsetNumber offset;
+} HeapFreezeTuple;
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -164,6 +177,18 @@ extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 Buffer *buffer, struct TM_FailureData *tmfd);
 
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
+									  TransactionId relfrozenxid,
+									  TransactionId relminmxid,
+									  TransactionId cutoff_xid,
+									  TransactionId cutoff_multi,
+									  HeapFreezeTuple *frz,
+									  bool *totally_frozen,
+									  TransactionId *relfrozenxid_out,
+									  MultiXactId *relminmxid_out);
+extern void heap_freeze_prepared_tuples(Relation rel, Buffer buffer,
+										TransactionId FreezeLimit,
+										HeapFreezeTuple *tuples, int ntuples);
 extern bool heap_freeze_tuple(HeapTupleHeader tuple,
 							  TransactionId relfrozenxid, TransactionId relminmxid,
 							  TransactionId cutoff_xid, TransactionId cutoff_multi);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 34220d93c..dee0a6864 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -315,34 +315,39 @@ typedef struct xl_heap_inplace
 
 /*
  * This struct represents a 'freeze plan', which is what we need to know about
- * a single tuple being frozen during vacuum.
+ * a group of tuples being frozen from the same page.
  */
 /* 0x01 was XLH_FREEZE_XMIN */
 #define		XLH_FREEZE_XVAC		0x02
 #define		XLH_INVALID_XVAC	0x04
 
-typedef struct xl_heap_freeze_tuple
+typedef struct xl_heap_freeze_plan
 {
 	TransactionId xmax;
-	OffsetNumber offset;
+	uint16		ntuples;
 	uint16		t_infomask2;
 	uint16		t_infomask;
 	uint8		frzflags;
-} xl_heap_freeze_tuple;
+} xl_heap_freeze_plan;
 
 /*
  * This is what we need to know about a block being frozen during vacuum
  *
- * Backup block 0's data contains an array of xl_heap_freeze_tuple structs,
- * one for each tuple.
+ * Backup block 0's data contains an array of xl_heap_freeze_plan structs
+ * (with nplans elements), followed by one or more page offset number arrays.
+ * Each such page offset number array corresponds to a single freeze plan
+ * (REDO routine freezes corresponding heap tuples using freeze plan).
  */
 typedef struct xl_heap_freeze_page
 {
-	TransactionId cutoff_xid;
-	uint16		ntuples;
+	TransactionId latestRemovedXid;
+	uint16		nplans;
+
+	/* FREEZE PLANS FOLLOW */
+	/* OFFSET NUMBER ARRAY FOLLOWS */
 } xl_heap_freeze_page;
 
-#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, ntuples) + sizeof(uint16))
+#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) + sizeof(uint16))
 
 /*
  * This is what we need to know about setting a visibility map bit
@@ -352,7 +357,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
-	TransactionId cutoff_xid;
+	TransactionId latestRemovedXid;
 	uint8		flags;
 } xl_heap_visible;
 
@@ -401,21 +406,8 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
-								  TransactionId cutoff_xid, xl_heap_freeze_tuple *tuples,
-								  int ntuples);
-extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
-									  TransactionId relfrozenxid,
-									  TransactionId relminmxid,
-									  TransactionId cutoff_xid,
-									  TransactionId cutoff_multi,
-									  xl_heap_freeze_tuple *frz,
-									  bool *totally_frozen,
-									  TransactionId *relfrozenxid_out,
-									  MultiXactId *relminmxid_out);
-extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
-									  xl_heap_freeze_tuple *frz);
 extern XLogRecPtr log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer,
-								   Buffer vm_buffer, TransactionId cutoff_xid, uint8 vmflags);
+								   Buffer vm_buffer, TransactionId latestRemovedXid,
+								   uint8 vmflags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 560f1c81a..b69bf28ba 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -110,6 +110,9 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
+static int	heap_xlog_freeze_plan(HeapFreezeTuple *tuples, int ntuples,
+								  xl_heap_freeze_plan *plans_out,
+								  OffsetNumber *offsets_out);
 
 
 /*
@@ -6439,7 +6442,9 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
  * will be totally frozen after these operations are performed and false if
  * more freezing will eventually be required.
  *
- * Caller must set frz->offset itself, before heap_execute_freeze_tuple call.
+ * VACUUM caller must assemble HeapFreezeTuple entries for every tuple that we
+ * returned true for when called.  A later heap_freeze_prepared_tuples call
+ * will execute freezing for caller's page as a whole.
  *
  * It is assumed that the caller has checked the tuple with
  * HeapTupleSatisfiesVacuum() and determined that it is not HEAPTUPLE_DEAD
@@ -6463,15 +6468,12 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
  * It will be set as tuple's new xmax when our *frz output is processed within
  * heap_execute_freeze_tuple later on.  If the tuple is in a shared buffer
  * then caller had better have an exclusive lock on it already.
- *
- * NB: It is not enough to set hint bits to indicate an XID committed/aborted.
- * The *frz WAL record we output completely removes all old XIDs during REDO.
  */
 bool
 heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 						  TransactionId relfrozenxid, TransactionId relminmxid,
 						  TransactionId cutoff_xid, TransactionId cutoff_multi,
-						  xl_heap_freeze_tuple *frz, bool *totally_frozen,
+						  HeapFreezeTuple *frz, bool *totally_frozen,
 						  TransactionId *relfrozenxid_out,
 						  MultiXactId *relminmxid_out)
 {
@@ -6762,10 +6764,12 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
  * tuple status.  Also, getting exclusive lock makes it safe to adjust the
  * infomask bits.
  *
- * NB: All code in here must be safe to execute during crash recovery!
+ * NB: All code in here must be safe to execute during crash recovery!  It is
+ * not enough to set hint bits to indicate an XID committed/aborted, either.
+ * Caller's REDO routine must reliably remove all old XIDs.
  */
-void
-heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *frz)
+static inline void
+heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapFreezeTuple *frz)
 {
 	HeapTupleHeaderSetXmax(tuple, frz->xmax);
 
@@ -6779,6 +6783,90 @@ heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *frz)
 	tuple->t_infomask2 = frz->t_infomask2;
 }
 
+/*
+ * heap_freeze_prepared_tuples
+ *		Execute freezing of prepared tuple plans.
+ *
+ * If we need to freeze any tuples we'll mark the buffer dirty, and write a
+ * WAL record recording the changes.  We must log the changes to be crash-safe
+ * against future truncation of CLOG.
+ *
+ * Caller must always set 'offset' page offset number from each tuple plan.
+ *
+ * NB: We sort the tuples array in-place.  Caller should be finished with it
+ * by now.
+ */
+void
+heap_freeze_prepared_tuples(Relation rel, Buffer buffer,
+							TransactionId FreezeLimit,
+							HeapFreezeTuple *tuples, int ntuples)
+{
+	Page		page = BufferGetPage(buffer);
+
+	Assert(ntuples > 0);
+	Assert(TransactionIdIsValid(FreezeLimit));
+
+	START_CRIT_SECTION();
+
+	MarkBufferDirty(buffer);
+
+	/* execute collected freezes */
+	for (int i = 0; i < ntuples; i++)
+	{
+		HeapTupleHeader htup;
+		ItemId		itemid = PageGetItemId(page, tuples[i].offset);
+
+		htup = (HeapTupleHeader) PageGetItem(page, itemid);
+		heap_execute_freeze_tuple(htup, &tuples[i]);
+	}
+
+	/* Now WAL-log freezing if necessary */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
+		OffsetNumber offsets[MaxHeapTuplesPerPage];
+		int			nplans;
+		xl_heap_freeze_page xlrec;
+		XLogRecPtr	recptr;
+		TransactionId latestRemovedXid;
+
+		/* Prepare deduplicated representation for use in WAL record */
+		nplans = heap_xlog_freeze_plan(tuples, ntuples, plans, offsets);
+
+		/*
+		 * latestRemovedXid describes the latest processed XID, whereas
+		 * FreezeLimit is (approximately) the first XID not frozen by VACUUM.
+		 * Back up caller's FreezeLimit to avoid false conflicts when
+		 * FreezeLimit is precisely equal to VACUUM's OldestXmin cutoff.
+		 */
+		latestRemovedXid = FreezeLimit;
+		TransactionIdRetreat(latestRemovedXid);
+
+		xlrec.latestRemovedXid = latestRemovedXid;
+		xlrec.nplans = nplans;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
+
+		/*
+		 * The freeze plan array and offset array are not actually in the
+		 * buffer, but pretend that they are.  When XLogInsert stores the
+		 * whole buffer, the arrays need not be stored too.
+		 */
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBufData(0, (char *) plans,
+							nplans * sizeof(xl_heap_freeze_plan));
+		XLogRegisterBufData(0, (char *) offsets,
+							ntuples * sizeof(OffsetNumber));
+
+		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
+
+		PageSetLSN(page, recptr);
+	}
+
+	END_CRIT_SECTION();
+}
+
 /*
  * heap_freeze_tuple
  *		Freeze tuple in place, without WAL logging.
@@ -6790,7 +6878,7 @@ heap_freeze_tuple(HeapTupleHeader tuple,
 				  TransactionId relfrozenxid, TransactionId relminmxid,
 				  TransactionId cutoff_xid, TransactionId cutoff_multi)
 {
-	xl_heap_freeze_tuple frz;
+	HeapFreezeTuple frz;
 	bool		do_freeze;
 	bool		tuple_totally_frozen;
 	TransactionId relfrozenxid_out = cutoff_xid;
@@ -8151,54 +8239,23 @@ bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate)
 	return nblocksfavorable;
 }
 
-/*
- * Perform XLogInsert for a heap-freeze operation.  Caller must have already
- * modified the buffer and marked it dirty.
- */
-XLogRecPtr
-log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
-				xl_heap_freeze_tuple *tuples, int ntuples)
-{
-	xl_heap_freeze_page xlrec;
-	XLogRecPtr	recptr;
-
-	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
-	/* nor when there are no tuples to freeze */
-	Assert(ntuples > 0);
-
-	xlrec.cutoff_xid = cutoff_xid;
-	xlrec.ntuples = ntuples;
-
-	XLogBeginInsert();
-	XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
-
-	/*
-	 * The freeze plan array is not actually in the buffer, but pretend that
-	 * it is.  When XLogInsert stores the whole buffer, the freeze plan need
-	 * not be stored too.
-	 */
-	XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-	XLogRegisterBufData(0, (char *) tuples,
-						ntuples * sizeof(xl_heap_freeze_tuple));
-
-	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
-
-	return recptr;
-}
-
 /*
  * Perform XLogInsert for a heap-visible operation.  'block' is the block
  * being marked all-visible, and vm_buffer is the buffer containing the
  * corresponding visibility map block.  Both should have already been modified
  * and dirtied.
  *
+ * latestRemovedXid comes from the largest xmin on the page being marked
+ * all-visible.  REDO routines uses it as a latestRemovedXid to generate
+ * recovery conflicts in the standard way (even though nothing has been
+ * removed).  Passed to visibilitymap_set() as cutoff_xid argument by VACUUM.
+ *
  * If checksums are enabled, we also generate a full-page image of
  * heap_buffer, if necessary.
  */
 XLogRecPtr
 log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
-				 TransactionId cutoff_xid, uint8 vmflags)
+				 TransactionId latestRemovedXid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
 	XLogRecPtr	recptr;
@@ -8207,7 +8264,7 @@ log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
-	xlrec.cutoff_xid = cutoff_xid;
+	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
 	XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
@@ -8810,7 +8867,7 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rlocator);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rlocator);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -8896,7 +8953,7 @@ heap_xlog_visible(XLogReaderState *record)
 		visibilitymap_pin(reln, blkno, &vmbuffer);
 
 		visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-						  xlrec->cutoff_xid, xlrec->flags);
+						  xlrec->latestRemovedXid, xlrec->flags);
 
 		ReleaseBuffer(vmbuffer);
 		FreeFakeRelcacheEntry(reln);
@@ -8905,6 +8962,141 @@ heap_xlog_visible(XLogReaderState *record)
 		UnlockReleaseBuffer(vmbuffer);
 }
 
+/*
+ * qsort comparator used to deduplicate tuple-based freeze plans for WAL
+ */
+static int
+heap_xlog_freeze_cmp(const void *arg1, const void *arg2)
+{
+	HeapFreezeTuple *frz1 = (HeapFreezeTuple *) arg1;
+	HeapFreezeTuple *frz2 = (HeapFreezeTuple *) arg2;
+
+	/* Compare fields that describe actions required to freeze this tuple */
+	if (frz1->t_infomask2 < frz2->t_infomask2)
+		return -1;
+	else if (frz1->t_infomask2 > frz2->t_infomask2)
+		return 1;
+
+	if (frz1->t_infomask < frz2->t_infomask)
+		return -1;
+	else if (frz1->t_infomask > frz2->t_infomask)
+		return 1;
+
+	if (frz1->xmax < frz2->xmax)
+		return -1;
+	else if (frz1->xmax > frz2->xmax)
+		return 1;
+
+	if (frz1->frzflags < frz2->frzflags)
+		return -1;
+	else if (frz1->frzflags > frz2->frzflags)
+		return 1;
+
+	/*
+	 * heap_xlog_freeze_eq would report that this pair of tuple-wise freeze
+	 * plans as equal.  Their canonicalized freeze plan will only be stored
+	 * once in final WAL record.
+	 *
+	 * Tiebreak on page offset number, just to keep things tidy.
+	 */
+	if (frz1->offset < frz2->offset)
+		return -1;
+	else if (frz1->offset > frz2->offset)
+		return 1;
+
+	Assert(false);
+	return 0;
+}
+
+/*
+ * Compare fields that describe actions required to freeze tuple with caller's
+ * open plan.  If everything matches then the frz tuple plan is equivalent to
+ * caller's plan.
+ */
+static bool
+heap_xlog_freeze_eq(xl_heap_freeze_plan *plan, HeapFreezeTuple *frz)
+{
+	if (plan->t_infomask2 == frz->t_infomask2 &&
+		plan->t_infomask == frz->t_infomask &&
+		plan->xmax == frz->xmax && plan->frzflags == frz->frzflags)
+		return true;
+
+	/* Caller must call heap_xlog_new_freeze_plan again for frz */
+	return false;
+}
+
+/*
+ * Start new plan initialized using tuple-level actions.  At least one tuple
+ * will have steps required to freeze described by caller's plan during REDO.
+ */
+static void
+heap_xlog_new_freeze_plan(xl_heap_freeze_plan *plan, HeapFreezeTuple *frz)
+{
+	plan->xmax = frz->xmax;
+	plan->t_infomask = frz->t_infomask;
+	plan->t_infomask2 = frz->t_infomask2;
+	plan->frzflags = frz->frzflags;
+	plan->ntuples = 1;
+}
+
+/*
+ * Deduplicate tuple-based freeze plans so that each distinct set of
+ * processing steps is only stored once in final WAL record.
+ *
+ * Return value is number of plans set in *plans_out for caller.  *offsets_out
+ * is set to offset numbers for use in WAL record by caller.
+ */
+static int
+heap_xlog_freeze_plan(HeapFreezeTuple *tuples, int ntuples,
+					  xl_heap_freeze_plan *plans_out,
+					  OffsetNumber *offsets_out)
+{
+	int			nplans = 0;
+
+	/* Sort tuple-based freeze plans in the order required to deduplicate */
+	qsort(tuples, ntuples, sizeof(HeapFreezeTuple), heap_xlog_freeze_cmp);
+
+	for (int i = 0; i < ntuples; i++)
+	{
+		HeapFreezeTuple *frz = tuples + i;
+
+		if (i == 0)
+		{
+			/* New canonical freeze plan starting with first tup */
+			heap_xlog_new_freeze_plan(plans_out, frz);
+			nplans++;
+		}
+		else if (heap_xlog_freeze_eq(plans_out, frz))
+		{
+			/* tup matches open canonical plan -- include tup in it */
+			Assert(offsets_out[i - 1] < frz->offset);
+			plans_out->ntuples++;
+		}
+		else
+		{
+			/* Tup doesn't match current plan -- done with it now */
+			plans_out++;
+
+			/* New canonical freeze plan starting with this tup */
+			heap_xlog_new_freeze_plan(plans_out, frz);
+			nplans++;
+		}
+
+		/*
+		 * Save page offset number in dedicated buffer in passing.
+		 *
+		 * REDO routine relies on the record's offset numbers array grouping
+		 * offset numbers by freeze plan.  The sort order within each grouping
+		 * is ascending offset number order, just to keep things tidy.
+		 */
+		offsets_out[i] = frz->offset;
+	}
+
+	Assert(nplans > 0 && nplans <= ntuples);
+
+	return nplans;
+}
+
 /*
  * Replay XLOG_HEAP2_FREEZE_PAGE records
  */
@@ -8913,9 +9105,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 {
 	XLogRecPtr	lsn = record->EndRecPtr;
 	xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
-	TransactionId cutoff_xid = xlrec->cutoff_xid;
 	Buffer		buffer;
-	int			ntup;
 
 	/*
 	 * In Hot Standby mode, ensure that there's no queries running which still
@@ -8924,33 +9114,50 @@ heap_xlog_freeze_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		RelFileLocator rlocator;
-		TransactionId latestRemovedXid = cutoff_xid;
-
-		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rlocator);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rlocator);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
 	{
 		Page		page = BufferGetPage(buffer);
-		xl_heap_freeze_tuple *tuples;
+		xl_heap_freeze_plan *plans;
+		OffsetNumber *offsets;
+		int			curoff = 0;
 
-		tuples = (xl_heap_freeze_tuple *) XLogRecGetBlockData(record, 0, NULL);
-
-		/* now execute freeze plan for each frozen tuple */
-		for (ntup = 0; ntup < xlrec->ntuples; ntup++)
+		plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
+		offsets = (OffsetNumber *) ((char *) plans +
+									(xlrec->nplans *
+									 sizeof(xl_heap_freeze_plan)));
+		for (int p = 0; p < xlrec->nplans; p++)
 		{
-			xl_heap_freeze_tuple *xlrec_tp;
-			ItemId		lp;
-			HeapTupleHeader tuple;
+			xl_heap_freeze_plan plan;
+			HeapFreezeTuple frz;
 
-			xlrec_tp = &tuples[ntup];
-			lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */
-			tuple = (HeapTupleHeader) PageGetItem(page, lp);
+			memcpy(&plan, &plans[p], sizeof(xl_heap_freeze_plan));
 
-			heap_execute_freeze_tuple(tuple, xlrec_tp);
+			/*
+			 * Convert freeze plan representation into the per-tuple format
+			 * used by heap_execute_freeze_tuple (matches original execution)
+			 */
+			frz.t_infomask2 = plan.t_infomask2;
+			frz.t_infomask = plan.t_infomask;
+			frz.xmax = plan.xmax;
+			frz.frzflags = plan.frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused */
+
+			/* Iterate through tuples for plan, then freeze each tuple */
+			for (int i = 0; i < plan.ntuples; i++)
+			{
+				ItemId		lp;
+				HeapTupleHeader tuple;
+				OffsetNumber offset = offsets[curoff++];
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
 		}
 
 		PageSetLSN(page, lsn);
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index dfbe37472..78192172f 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1566,7 +1566,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	TransactionId NewRelfrozenXid;
 	MultiXactId NewRelminMxid;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
-	xl_heap_freeze_tuple frozen[MaxHeapTuplesPerPage];
+	HeapFreezeTuple frozen[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
 
@@ -1825,40 +1825,8 @@ retry:
 
 		vacrel->frozen_pages++;
 
-		/*
-		 * At least one tuple with storage needs to be frozen -- execute that
-		 * now.
-		 *
-		 * If we need to freeze any tuples we'll mark the buffer dirty, and
-		 * write a WAL record recording the changes.  We must log the changes
-		 * to be crash-safe against future truncation of CLOG.
-		 */
-		START_CRIT_SECTION();
-
-		MarkBufferDirty(buf);
-
-		/* execute collected freezes */
-		for (int i = 0; i < tuples_frozen; i++)
-		{
-			HeapTupleHeader htup;
-
-			itemid = PageGetItemId(page, frozen[i].offset);
-			htup = (HeapTupleHeader) PageGetItem(page, itemid);
-
-			heap_execute_freeze_tuple(htup, &frozen[i]);
-		}
-
-		/* Now WAL-log freezing if necessary */
-		if (RelationNeedsWAL(vacrel->rel))
-		{
-			XLogRecPtr	recptr;
-
-			recptr = log_heap_freeze(vacrel->rel, buf, vacrel->FreezeLimit,
-									 frozen, tuples_frozen);
-			PageSetLSN(page, recptr);
-		}
-
-		END_CRIT_SECTION();
+		heap_freeze_prepared_tuples(vacrel->rel, buf, vacrel->FreezeLimit,
+									frozen, tuples_frozen);
 	}
 
 	/*
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 923d3bc43..bc86a62a9 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -140,15 +140,15 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) rec;
 
-		appendStringInfo(buf, "cutoff xid %u ntuples %u",
-						 xlrec->cutoff_xid, xlrec->ntuples);
+		appendStringInfo(buf, "latestRemovedXid %u nplans %u",
+						 xlrec->latestRemovedXid, xlrec->nplans);
 	}
 	else if (info == XLOG_HEAP2_VISIBLE)
 	{
 		xl_heap_visible *xlrec = (xl_heap_visible *) rec;
 
-		appendStringInfo(buf, "cutoff xid %u flags 0x%02X",
-						 xlrec->cutoff_xid, xlrec->flags);
+		appendStringInfo(buf, "latestRemovedXid %u flags 0x%02X",
+						 xlrec->latestRemovedXid, xlrec->flags);
 	}
 	else if (info == XLOG_HEAP2_MULTI_INSERT)
 	{
-- 
2.34.1

