From 3ecf046af0d80d6e09c5d0af07f1f4e7d2327dfc Mon Sep 17 00:00:00 2001
From: yoli <yoli@ebay.com>
Date: Fri, 26 Jul 2024 00:36:46 -0700
Subject: [PATCH v2] Most access methods (i.e. nbtree and hash) use a separate
 file with "xlog" in its name for its WAL replay logic.  Heap is one exception
 of this convention.  To make it easier for newcomers to find the WAL replay
 logic for the heap access method, this patch isolates heap's replay logic in
 a new heapam_xlog.c file. This patch is a pure refactoring with no change to
 the logic.

---
 src/backend/access/heap/Makefile      |    1 +
 src/backend/access/heap/heapam.c      | 1339 -------------------------
 src/backend/access/heap/heapam_xlog.c | 1321 ++++++++++++++++++++++++
 src/backend/access/heap/meson.build   |    1 +
 src/include/access/heapam.h           |   25 +
 5 files changed, 1348 insertions(+), 1339 deletions(-)
 create mode 100644 src/backend/access/heap/heapam_xlog.c

diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index af0bd1888e..394534172f 100644
--- a/src/backend/access/heap/Makefile
+++ b/src/backend/access/heap/Makefile
@@ -16,6 +16,7 @@ OBJS = \
 	heapam.o \
 	heapam_handler.o \
 	heapam_visibility.o \
+	heapam_xlog.o \
 	heaptoast.o \
 	hio.o \
 	pruneheap.o \
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b20147a0..f167107257 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -31,42 +31,24 @@
  */
 #include "postgres.h"
 
-#include "access/bufmask.h"
 #include "access/heapam.h"
-#include "access/heapam_xlog.h"
 #include "access/heaptoast.h"
 #include "access/hio.h"
 #include "access/multixact.h"
-#include "access/parallel.h"
-#include "access/relscan.h"
 #include "access/subtrans.h"
 #include "access/syncscan.h"
-#include "access/sysattr.h"
-#include "access/tableam.h"
-#include "access/transam.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
-#include "access/xact.h"
-#include "access/xlog.h"
 #include "access/xloginsert.h"
-#include "access/xlogutils.h"
-#include "catalog/catalog.h"
 #include "commands/vacuum.h"
-#include "miscadmin.h"
 #include "pgstat.h"
-#include "port/atomics.h"
 #include "port/pg_bitutils.h"
-#include "storage/bufmgr.h"
-#include "storage/freespace.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
-#include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/injection_point.h"
 #include "utils/inval.h"
-#include "utils/relcache.h"
-#include "utils/snapmgr.h"
 #include "utils/spccache.h"
 
 
@@ -6811,30 +6793,6 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	return freeze_xmin || replace_xvac || replace_xmax || freeze_xmax;
 }
 
-/*
- * heap_execute_freeze_tuple
- *		Execute the prepared freezing of a tuple with caller's freeze plan.
- *
- * Caller is responsible for ensuring that no other backend can access the
- * storage underlying this tuple, either by holding an exclusive lock on the
- * buffer containing it (which is what lazy VACUUM does), or by having it be
- * in private storage (which is what CLUSTER and friends do).
- */
-static inline void
-heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
-{
-	HeapTupleHeaderSetXmax(tuple, frz->xmax);
-
-	if (frz->frzflags & XLH_FREEZE_XVAC)
-		HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
-
-	if (frz->frzflags & XLH_INVALID_XVAC)
-		HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
-
-	tuple->t_infomask = frz->t_infomask;
-	tuple->t_infomask2 = frz->t_infomask2;
-}
-
 /*
  * Perform xmin/xmax XID status sanity checks before actually executing freeze
  * plans.
@@ -8745,1303 +8703,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	return key_tuple;
 }
 
-/*
- * Replay XLOG_HEAP2_PRUNE_* records.
- */
-static void
-heap_xlog_prune_freeze(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	char	   *maindataptr = XLogRecGetData(record);
-	xl_heap_prune xlrec;
-	Buffer		buffer;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	XLogRedoAction action;
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-	memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
-	maindataptr += SizeOfHeapPrune;
-
-	/*
-	 * We will take an ordinary exclusive lock or a cleanup lock depending on
-	 * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
-	 * lock, we better not be doing anything that requires moving existing
-	 * tuple data.
-	 */
-	Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
-		   (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
-
-	/*
-	 * We are about to remove and/or freeze tuples.  In Hot Standby mode,
-	 * ensure that there are no queries running for which the removed tuples
-	 * are still visible or which still consider the frozen xids as running.
-	 * The conflict horizon XID comes after xl_heap_prune.
-	 */
-	if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
-	{
-		TransactionId snapshot_conflict_horizon;
-
-		/* memcpy() because snapshot_conflict_horizon is stored unaligned */
-		memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
-		maindataptr += sizeof(TransactionId);
-
-		if (InHotStandby)
-			ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
-												(xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
-												rlocator);
-	}
-
-	/*
-	 * If we have a full-page image, restore it and we're done.
-	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
-										   (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
-										   &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *redirected;
-		OffsetNumber *nowdead;
-		OffsetNumber *nowunused;
-		int			nredirected;
-		int			ndead;
-		int			nunused;
-		int			nplans;
-		Size		datalen;
-		xlhp_freeze_plan *plans;
-		OffsetNumber *frz_offsets;
-		char	   *dataptr = XLogRecGetBlockData(record, 0, &datalen);
-
-		heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
-											   &nplans, &plans, &frz_offsets,
-											   &nredirected, &redirected,
-											   &ndead, &nowdead,
-											   &nunused, &nowunused);
-
-		/*
-		 * Update all line pointers per the record, and repair fragmentation
-		 * if needed.
-		 */
-		if (nredirected > 0 || ndead > 0 || nunused > 0)
-			heap_page_prune_execute(buffer,
-									(xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
-									redirected, nredirected,
-									nowdead, ndead,
-									nowunused, nunused);
-
-		/* Freeze tuples */
-		for (int p = 0; p < nplans; p++)
-		{
-			HeapTupleFreeze frz;
-
-			/*
-			 * Convert freeze plan representation from WAL record into
-			 * per-tuple format used by heap_execute_freeze_tuple
-			 */
-			frz.xmax = plans[p].xmax;
-			frz.t_infomask2 = plans[p].t_infomask2;
-			frz.t_infomask = plans[p].t_infomask;
-			frz.frzflags = plans[p].frzflags;
-			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
-
-			for (int i = 0; i < plans[p].ntuples; i++)
-			{
-				OffsetNumber offset = *(frz_offsets++);
-				ItemId		lp;
-				HeapTupleHeader tuple;
-
-				lp = PageGetItemId(page, offset);
-				tuple = (HeapTupleHeader) PageGetItem(page, lp);
-				heap_execute_freeze_tuple(tuple, &frz);
-			}
-		}
-
-		/* There should be no more data */
-		Assert((char *) frz_offsets == dataptr + datalen);
-
-		/*
-		 * Note: we don't worry about updating the page's prunability hints.
-		 * At worst this will cause an extra prune cycle to occur soon.
-		 */
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-
-	/*
-	 * If we released any space or line pointers, update the free space map.
-	 *
-	 * Do this regardless of a full-page image being applied, since the FSM
-	 * data is not in the page anyway.
-	 */
-	if (BufferIsValid(buffer))
-	{
-		if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
-						   XLHP_HAS_DEAD_ITEMS |
-						   XLHP_HAS_NOW_UNUSED_ITEMS))
-		{
-			Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-
-			UnlockReleaseBuffer(buffer);
-
-			XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-		}
-		else
-			UnlockReleaseBuffer(buffer);
-	}
-}
-
-/*
- * Replay XLOG_HEAP2_VISIBLE record.
- *
- * The critical integrity requirement here is that we must never end up with
- * a situation where the visibility map bit is set, and the page-level
- * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
- * page modification would fail to clear the visibility map bit.
- */
-static void
-heap_xlog_visible(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
-	Buffer		vmbuffer = InvalidBuffer;
-	Buffer		buffer;
-	Page		page;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	XLogRedoAction action;
-
-	Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
-
-	XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
-
-	/*
-	 * If there are any Hot Standby transactions running that have an xmin
-	 * horizon old enough that this page isn't all-visible for them, they
-	 * might incorrectly decide that an index-only scan can skip a heap fetch.
-	 *
-	 * NB: It might be better to throw some kind of "soft" conflict here that
-	 * forces any index-only scan that is in flight to perform heap fetches,
-	 * rather than killing the transaction outright.
-	 */
-	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
-											rlocator);
-
-	/*
-	 * Read the heap page, if it still exists. If the heap file has dropped or
-	 * truncated later in recovery, we don't need to update the page, but we'd
-	 * better still update the visibility map.
-	 */
-	action = XLogReadBufferForRedo(record, 1, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		/*
-		 * We don't bump the LSN of the heap page when setting the visibility
-		 * map bit (unless checksums or wal_hint_bits is enabled, in which
-		 * case we must). This exposes us to torn page hazards, but since
-		 * we're not inspecting the existing page contents in any way, we
-		 * don't care.
-		 */
-		page = BufferGetPage(buffer);
-
-		PageSetAllVisible(page);
-
-		if (XLogHintBitIsNeeded())
-			PageSetLSN(page, lsn);
-
-		MarkBufferDirty(buffer);
-	}
-	else if (action == BLK_RESTORED)
-	{
-		/*
-		 * If heap block was backed up, we already restored it and there's
-		 * nothing more to do. (This can only happen with checksums or
-		 * wal_log_hints enabled.)
-		 */
-	}
-
-	if (BufferIsValid(buffer))
-	{
-		Size		space = PageGetFreeSpace(BufferGetPage(buffer));
-
-		UnlockReleaseBuffer(buffer);
-
-		/*
-		 * Since FSM is not WAL-logged and only updated heuristically, it
-		 * easily becomes stale in standbys.  If the standby is later promoted
-		 * and runs VACUUM, it will skip updating individual free space
-		 * figures for pages that became all-visible (or all-frozen, depending
-		 * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
-		 * propagates too optimistic free space values to upper FSM layers;
-		 * later inserters try to use such pages only to find out that they
-		 * are unusable.  This can cause long stalls when there are many such
-		 * pages.
-		 *
-		 * Forestall those problems by updating FSM's idea about a page that
-		 * is becoming all-visible or all-frozen.
-		 *
-		 * Do this regardless of a full-page image being applied, since the
-		 * FSM data is not in the page anyway.
-		 */
-		if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
-			XLogRecordPageWithFreeSpace(rlocator, blkno, space);
-	}
-
-	/*
-	 * Even if we skipped the heap page update due to the LSN interlock, it's
-	 * still safe to update the visibility map.  Any WAL record that clears
-	 * the visibility map bit does so before checking the page LSN, so any
-	 * bits that need to be cleared will still be cleared.
-	 */
-	if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
-									  &vmbuffer) == BLK_NEEDS_REDO)
-	{
-		Page		vmpage = BufferGetPage(vmbuffer);
-		Relation	reln;
-		uint8		vmbits;
-
-		/* initialize the page if it was read as zeros */
-		if (PageIsNew(vmpage))
-			PageInit(vmpage, BLCKSZ, 0);
-
-		/* remove VISIBILITYMAP_XLOG_* */
-		vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
-
-		/*
-		 * XLogReadBufferForRedoExtended locked the buffer. But
-		 * visibilitymap_set will handle locking itself.
-		 */
-		LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
-
-		reln = CreateFakeRelcacheEntry(rlocator);
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-
-		visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-						  xlrec->snapshotConflictHorizon, vmbits);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-	else if (BufferIsValid(vmbuffer))
-		UnlockReleaseBuffer(vmbuffer);
-}
-
-/*
- * Given an "infobits" field from an XLog record, set the correct bits in the
- * given infomask and infomask2 for the tuple touched by the record.
- *
- * (This is the reverse of compute_infobits).
- */
-static void
-fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
-{
-	*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
-				   HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
-	*infomask2 &= ~HEAP_KEYS_UPDATED;
-
-	if (infobits & XLHL_XMAX_IS_MULTI)
-		*infomask |= HEAP_XMAX_IS_MULTI;
-	if (infobits & XLHL_XMAX_LOCK_ONLY)
-		*infomask |= HEAP_XMAX_LOCK_ONLY;
-	if (infobits & XLHL_XMAX_EXCL_LOCK)
-		*infomask |= HEAP_XMAX_EXCL_LOCK;
-	/* note HEAP_XMAX_SHR_LOCK isn't considered here */
-	if (infobits & XLHL_XMAX_KEYSHR_LOCK)
-		*infomask |= HEAP_XMAX_KEYSHR_LOCK;
-
-	if (infobits & XLHL_KEYS_UPDATED)
-		*infomask2 |= HEAP_KEYS_UPDATED;
-}
-
-static void
-heap_xlog_delete(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-	BlockNumber blkno;
-	RelFileLocator target_locator;
-	ItemPointerData target_tid;
-
-	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-	ItemPointerSetBlockNumber(&target_tid, blkno);
-	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(target_locator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
-			lp = PageGetItemId(page, xlrec->offnum);
-
-		if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		HeapTupleHeaderClearHotUpdated(htup);
-		fix_infomask_from_infobits(xlrec->infobits_set,
-								   &htup->t_infomask, &htup->t_infomask2);
-		if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
-			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-		else
-			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-
-		/* Mark the page as a candidate for pruning */
-		PageSetPrunable(page, XLogRecGetXid(record));
-
-		if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* Make sure t_ctid is set correctly */
-		if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
-			HeapTupleHeaderSetMovedPartitions(htup);
-		else
-			htup->t_ctid = target_tid;
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_insert(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	HeapTupleHeader htup;
-	xl_heap_header xlhdr;
-	uint32		newlen;
-	Size		freespace = 0;
-	RelFileLocator target_locator;
-	BlockNumber blkno;
-	ItemPointerData target_tid;
-	XLogRedoAction action;
-
-	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-	ItemPointerSetBlockNumber(&target_tid, blkno);
-	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(target_locator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/*
-	 * If we inserted the first and only tuple on the page, re-initialize the
-	 * page from scratch.
-	 */
-	if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-	{
-		buffer = XLogInitBufferForRedo(record, 0);
-		page = BufferGetPage(buffer);
-		PageInit(page, BufferGetPageSize(buffer), 0);
-		action = BLK_NEEDS_REDO;
-	}
-	else
-		action = XLogReadBufferForRedo(record, 0, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Size		datalen;
-		char	   *data;
-
-		page = BufferGetPage(buffer);
-
-		if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
-			elog(PANIC, "invalid max offset number");
-
-		data = XLogRecGetBlockData(record, 0, &datalen);
-
-		newlen = datalen - SizeOfHeapHeader;
-		Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
-		memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
-		data += SizeOfHeapHeader;
-
-		htup = &tbuf.hdr;
-		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-		memcpy((char *) htup + SizeofHeapTupleHeader,
-			   data,
-			   newlen);
-		newlen += SizeofHeapTupleHeader;
-		htup->t_infomask2 = xlhdr.t_infomask2;
-		htup->t_infomask = xlhdr.t_infomask;
-		htup->t_hoff = xlhdr.t_hoff;
-		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		htup->t_ctid = target_tid;
-
-		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
-						true, true) == InvalidOffsetNumber)
-			elog(PANIC, "failed to add tuple");
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-
-		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-			PageSetAllVisible(page);
-
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-
-	/*
-	 * If the page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
-}
-
-/*
- * Handles MULTI_INSERT record type.
- */
-static void
-heap_xlog_multi_insert(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_multi_insert *xlrec;
-	RelFileLocator rlocator;
-	BlockNumber blkno;
-	Buffer		buffer;
-	Page		page;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	HeapTupleHeader htup;
-	uint32		newlen;
-	Size		freespace = 0;
-	int			i;
-	bool		isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
-	XLogRedoAction action;
-
-	/*
-	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
-	 * required.
-	 */
-	xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-
-	/* check that the mutually exclusive flags are not both set */
-	Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
-			 (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, blkno, &vmbuffer);
-		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (isinit)
-	{
-		buffer = XLogInitBufferForRedo(record, 0);
-		page = BufferGetPage(buffer);
-		PageInit(page, BufferGetPageSize(buffer), 0);
-		action = BLK_NEEDS_REDO;
-	}
-	else
-		action = XLogReadBufferForRedo(record, 0, &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		char	   *tupdata;
-		char	   *endptr;
-		Size		len;
-
-		/* Tuples are stored as block data */
-		tupdata = XLogRecGetBlockData(record, 0, &len);
-		endptr = tupdata + len;
-
-		page = (Page) BufferGetPage(buffer);
-
-		for (i = 0; i < xlrec->ntuples; i++)
-		{
-			OffsetNumber offnum;
-			xl_multi_insert_tuple *xlhdr;
-
-			/*
-			 * If we're reinitializing the page, the tuples are stored in
-			 * order from FirstOffsetNumber. Otherwise there's an array of
-			 * offsets in the WAL record, and the tuples come after that.
-			 */
-			if (isinit)
-				offnum = FirstOffsetNumber + i;
-			else
-				offnum = xlrec->offsets[i];
-			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-				elog(PANIC, "invalid max offset number");
-
-			xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
-			tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
-
-			newlen = xlhdr->datalen;
-			Assert(newlen <= MaxHeapTupleSize);
-			htup = &tbuf.hdr;
-			MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-			/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-			memcpy((char *) htup + SizeofHeapTupleHeader,
-				   (char *) tupdata,
-				   newlen);
-			tupdata += newlen;
-
-			newlen += SizeofHeapTupleHeader;
-			htup->t_infomask2 = xlhdr->t_infomask2;
-			htup->t_infomask = xlhdr->t_infomask;
-			htup->t_hoff = xlhdr->t_hoff;
-			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-			HeapTupleHeaderSetCmin(htup, FirstCommandId);
-			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
-			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
-
-			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-			if (offnum == InvalidOffsetNumber)
-				elog(PANIC, "failed to add tuple");
-		}
-		if (tupdata != endptr)
-			elog(PANIC, "total tuple length mismatch");
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-
-		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-			PageSetAllVisible(page);
-
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-
-	/*
-	 * If the page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-}
-
-/*
- * Handles UPDATE and HOT_UPDATE
- */
-static void
-heap_xlog_update(XLogReaderState *record, bool hot_update)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-	RelFileLocator rlocator;
-	BlockNumber oldblk;
-	BlockNumber newblk;
-	ItemPointerData newtid;
-	Buffer		obuffer,
-				nbuffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleData oldtup;
-	HeapTupleHeader htup;
-	uint16		prefixlen = 0,
-				suffixlen = 0;
-	char	   *newp;
-	union
-	{
-		HeapTupleHeaderData hdr;
-		char		data[MaxHeapTupleSize];
-	}			tbuf;
-	xl_heap_header xlhdr;
-	uint32		newlen;
-	Size		freespace = 0;
-	XLogRedoAction oldaction;
-	XLogRedoAction newaction;
-
-	/* initialize to keep the compiler quiet */
-	oldtup.t_data = NULL;
-	oldtup.t_len = 0;
-
-	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
-	if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
-	{
-		/* HOT updates are never done across pages */
-		Assert(!hot_update);
-	}
-	else
-		oldblk = newblk;
-
-	ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, oldblk, &vmbuffer);
-		visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/*
-	 * In normal operation, it is important to lock the two pages in
-	 * page-number order, to avoid possible deadlocks against other update
-	 * operations going the other way.  However, during WAL replay there can
-	 * be no other update happening, so we don't need to worry about that. But
-	 * we *do* need to worry that we don't expose an inconsistent state to Hot
-	 * Standby queries --- so the original page can't be unlocked before we've
-	 * added the new tuple to the new page.
-	 */
-
-	/* Deal with old tuple version */
-	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-									  &obuffer);
-	if (oldaction == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(obuffer);
-		offnum = xlrec->old_offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		oldtup.t_data = htup;
-		oldtup.t_len = ItemIdGetLength(lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		if (hot_update)
-			HeapTupleHeaderSetHotUpdated(htup);
-		else
-			HeapTupleHeaderClearHotUpdated(htup);
-		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-		/* Set forward chain link in t_ctid */
-		htup->t_ctid = newtid;
-
-		/* Mark the page as a candidate for pruning */
-		PageSetPrunable(page, XLogRecGetXid(record));
-
-		if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(obuffer);
-	}
-
-	/*
-	 * Read the page the new tuple goes into, if different from old.
-	 */
-	if (oldblk == newblk)
-	{
-		nbuffer = obuffer;
-		newaction = oldaction;
-	}
-	else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-	{
-		nbuffer = XLogInitBufferForRedo(record, 0);
-		page = (Page) BufferGetPage(nbuffer);
-		PageInit(page, BufferGetPageSize(nbuffer), 0);
-		newaction = BLK_NEEDS_REDO;
-	}
-	else
-		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-	{
-		Relation	reln = CreateFakeRelcacheEntry(rlocator);
-		Buffer		vmbuffer = InvalidBuffer;
-
-		visibilitymap_pin(reln, newblk, &vmbuffer);
-		visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	/* Deal with new tuple */
-	if (newaction == BLK_NEEDS_REDO)
-	{
-		char	   *recdata;
-		char	   *recdata_end;
-		Size		datalen;
-		Size		tuplen;
-
-		recdata = XLogRecGetBlockData(record, 0, &datalen);
-		recdata_end = recdata + datalen;
-
-		page = BufferGetPage(nbuffer);
-
-		offnum = xlrec->new_offnum;
-		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-			elog(PANIC, "invalid max offset number");
-
-		if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
-		{
-			Assert(newblk == oldblk);
-			memcpy(&prefixlen, recdata, sizeof(uint16));
-			recdata += sizeof(uint16);
-		}
-		if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
-		{
-			Assert(newblk == oldblk);
-			memcpy(&suffixlen, recdata, sizeof(uint16));
-			recdata += sizeof(uint16);
-		}
-
-		memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
-		recdata += SizeOfHeapHeader;
-
-		tuplen = recdata_end - recdata;
-		Assert(tuplen <= MaxHeapTupleSize);
-
-		htup = &tbuf.hdr;
-		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-
-		/*
-		 * Reconstruct the new tuple using the prefix and/or suffix from the
-		 * old tuple, and the data stored in the WAL record.
-		 */
-		newp = (char *) htup + SizeofHeapTupleHeader;
-		if (prefixlen > 0)
-		{
-			int			len;
-
-			/* copy bitmap [+ padding] [+ oid] from WAL record */
-			len = xlhdr.t_hoff - SizeofHeapTupleHeader;
-			memcpy(newp, recdata, len);
-			recdata += len;
-			newp += len;
-
-			/* copy prefix from old tuple */
-			memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
-			newp += prefixlen;
-
-			/* copy new tuple data from WAL record */
-			len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
-			memcpy(newp, recdata, len);
-			recdata += len;
-			newp += len;
-		}
-		else
-		{
-			/*
-			 * copy bitmap [+ padding] [+ oid] + data from record, all in one
-			 * go
-			 */
-			memcpy(newp, recdata, tuplen);
-			recdata += tuplen;
-			newp += tuplen;
-		}
-		Assert(recdata == recdata_end);
-
-		/* copy suffix from old tuple */
-		if (suffixlen > 0)
-			memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
-
-		newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
-		htup->t_infomask2 = xlhdr.t_infomask2;
-		htup->t_infomask = xlhdr.t_infomask;
-		htup->t_hoff = xlhdr.t_hoff;
-
-		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
-		/* Make sure there is no forward chain link in t_ctid */
-		htup->t_ctid = newtid;
-
-		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-		if (offnum == InvalidOffsetNumber)
-			elog(PANIC, "failed to add tuple");
-
-		if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-			PageClearAllVisible(page);
-
-		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(nbuffer);
-	}
-
-	if (BufferIsValid(nbuffer) && nbuffer != obuffer)
-		UnlockReleaseBuffer(nbuffer);
-	if (BufferIsValid(obuffer))
-		UnlockReleaseBuffer(obuffer);
-
-	/*
-	 * If the new page is running low on free space, update the FSM as well.
-	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-	 * better than that without knowing the fill-factor for the table.
-	 *
-	 * However, don't update the FSM on HOT updates, because after crash
-	 * recovery, either the old or the new tuple will certainly be dead and
-	 * prunable. After pruning, the page will have roughly as much free space
-	 * as it did before the update, assuming the new tuple is about the same
-	 * size as the old one.
-	 *
-	 * XXX: Don't do this if the page was restored from full page image. We
-	 * don't bother to update the FSM in that case, it doesn't need to be
-	 * totally accurate anyway.
-	 */
-	if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
-		XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
-}
-
-static void
-heap_xlog_confirm(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		/*
-		 * Confirm tuple as actually inserted
-		 */
-		ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-	{
-		RelFileLocator rlocator;
-		Buffer		vmbuffer = InvalidBuffer;
-		BlockNumber block;
-		Relation	reln;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-		reln = CreateFakeRelcacheEntry(rlocator);
-
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = (Page) BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-
-		/*
-		 * Clear relevant update flags, but only if the modified infomask says
-		 * there's no update.
-		 */
-		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
-		{
-			HeapTupleHeaderClearHotUpdated(htup);
-			/* Make sure there is no forward chain link in t_ctid */
-			ItemPointerSet(&htup->t_ctid,
-						   BufferGetBlockNumber(buffer),
-						   offnum);
-		}
-		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock_updated(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_lock_updated *xlrec;
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-
-	xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
-
-	/*
-	 * The visibility map may need to be fixed even if the heap page is
-	 * already up-to-date.
-	 */
-	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-	{
-		RelFileLocator rlocator;
-		Buffer		vmbuffer = InvalidBuffer;
-		BlockNumber block;
-		Relation	reln;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-		reln = CreateFakeRelcacheEntry(rlocator);
-
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-		ReleaseBuffer(vmbuffer);
-		FreeFakeRelcacheEntry(reln);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-								   &htup->t_infomask2);
-		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_inplace(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-	Buffer		buffer;
-	Page		page;
-	OffsetNumber offnum;
-	ItemId		lp = NULL;
-	HeapTupleHeader htup;
-	uint32		oldlen;
-	Size		newlen;
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		char	   *newtup = XLogRecGetBlockData(record, 0, &newlen);
-
-		page = BufferGetPage(buffer);
-
-		offnum = xlrec->offnum;
-		if (PageGetMaxOffsetNumber(page) >= offnum)
-			lp = PageGetItemId(page, offnum);
-
-		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-			elog(PANIC, "invalid lp");
-
-		htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-		oldlen = ItemIdGetLength(lp) - htup->t_hoff;
-		if (oldlen != newlen)
-			elog(PANIC, "wrong tuple length");
-
-		memcpy((char *) htup + htup->t_hoff, newtup, newlen);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
-void
-heap_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	/*
-	 * These operations don't overwrite MVCC data so no conflict processing is
-	 * required. The ones in heap2 rmgr do.
-	 */
-
-	switch (info & XLOG_HEAP_OPMASK)
-	{
-		case XLOG_HEAP_INSERT:
-			heap_xlog_insert(record);
-			break;
-		case XLOG_HEAP_DELETE:
-			heap_xlog_delete(record);
-			break;
-		case XLOG_HEAP_UPDATE:
-			heap_xlog_update(record, false);
-			break;
-		case XLOG_HEAP_TRUNCATE:
-
-			/*
-			 * TRUNCATE is a no-op because the actions are already logged as
-			 * SMGR WAL records.  TRUNCATE WAL record only exists for logical
-			 * decoding.
-			 */
-			break;
-		case XLOG_HEAP_HOT_UPDATE:
-			heap_xlog_update(record, true);
-			break;
-		case XLOG_HEAP_CONFIRM:
-			heap_xlog_confirm(record);
-			break;
-		case XLOG_HEAP_LOCK:
-			heap_xlog_lock(record);
-			break;
-		case XLOG_HEAP_INPLACE:
-			heap_xlog_inplace(record);
-			break;
-		default:
-			elog(PANIC, "heap_redo: unknown op code %u", info);
-	}
-}
-
-void
-heap2_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	switch (info & XLOG_HEAP_OPMASK)
-	{
-		case XLOG_HEAP2_PRUNE_ON_ACCESS:
-		case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
-		case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
-			heap_xlog_prune_freeze(record);
-			break;
-		case XLOG_HEAP2_VISIBLE:
-			heap_xlog_visible(record);
-			break;
-		case XLOG_HEAP2_MULTI_INSERT:
-			heap_xlog_multi_insert(record);
-			break;
-		case XLOG_HEAP2_LOCK_UPDATED:
-			heap_xlog_lock_updated(record);
-			break;
-		case XLOG_HEAP2_NEW_CID:
-
-			/*
-			 * Nothing to do on a real replay, only used during logical
-			 * decoding.
-			 */
-			break;
-		case XLOG_HEAP2_REWRITE:
-			heap_xlog_logical_rewrite(record);
-			break;
-		default:
-			elog(PANIC, "heap2_redo: unknown op code %u", info);
-	}
-}
-
-/*
- * Mask a heap page before performing consistency checks on it.
- */
-void
-heap_mask(char *pagedata, BlockNumber blkno)
-{
-	Page		page = (Page) pagedata;
-	OffsetNumber off;
-
-	mask_page_lsn_and_checksum(page);
-
-	mask_page_hint_bits(page);
-	mask_unused_space(page);
-
-	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
-	{
-		ItemId		iid = PageGetItemId(page, off);
-		char	   *page_item;
-
-		page_item = (char *) (page + ItemIdGetOffset(iid));
-
-		if (ItemIdIsNormal(iid))
-		{
-			HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
-
-			/*
-			 * If xmin of a tuple is not yet frozen, we should ignore
-			 * differences in hint bits, since they can be set without
-			 * emitting WAL.
-			 */
-			if (!HeapTupleHeaderXminFrozen(page_htup))
-				page_htup->t_infomask &= ~HEAP_XACT_MASK;
-			else
-			{
-				/* Still we need to mask xmax hint bits. */
-				page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
-				page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
-			}
-
-			/*
-			 * During replay, we set Command Id to FirstCommandId. Hence, mask
-			 * it. See heap_xlog_insert() for details.
-			 */
-			page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
-
-			/*
-			 * For a speculative tuple, heap_insert() does not set ctid in the
-			 * caller-passed heap tuple itself, leaving the ctid field to
-			 * contain a speculative token value - a per-backend monotonically
-			 * increasing identifier. Besides, it does not WAL-log ctid under
-			 * any circumstances.
-			 *
-			 * During redo, heap_xlog_insert() sets t_ctid to current block
-			 * number and self offset number. It doesn't care about any
-			 * speculative insertions on the primary. Hence, we set t_ctid to
-			 * current block number and self offset number to ignore any
-			 * inconsistency.
-			 */
-			if (HeapTupleHeaderIsSpeculative(page_htup))
-				ItemPointerSet(&page_htup->t_ctid, blkno, off);
-
-			/*
-			 * NB: Not ignoring ctid changes due to the tuple having moved
-			 * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
-			 * important information that needs to be in-sync between primary
-			 * and standby, and thus is WAL logged.
-			 */
-		}
-
-		/*
-		 * Ignore any padding bytes after the tuple, when the length of the
-		 * item is not MAXALIGNed.
-		 */
-		if (ItemIdHasStorage(iid))
-		{
-			int			len = ItemIdGetLength(iid);
-			int			padlen = MAXALIGN(len) - len;
-
-			if (padlen > 0)
-				memset(page_item + len, MASK_MARKER, padlen);
-		}
-	}
-}
-
 /*
  * HeapCheckForSerializableConflictOut
  *		We are reading a tuple.  If it's not visible, there may be a
diff --git a/src/backend/access/heap/heapam_xlog.c b/src/backend/access/heap/heapam_xlog.c
new file mode 100644
index 0000000000..af4976f382
--- /dev/null
+++ b/src/backend/access/heap/heapam_xlog.c
@@ -0,0 +1,1321 @@
+/*-------------------------------------------------------------------------
+ *
+ * heapam_xlog.c
+ *	  WAL replay logic for heaps.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/heap/heapam_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/heapam.h"
+#include "access/visibilitymap.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "storage/freespace.h"
+#include "storage/standby.h"
+
+
+/*
+ * Replay XLOG_HEAP2_PRUNE_* records.
+ */
+static void
+heap_xlog_prune_freeze(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	char	   *maindataptr = XLogRecGetData(record);
+	xl_heap_prune xlrec;
+	Buffer		buffer;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	XLogRedoAction action;
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+	memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
+	maindataptr += SizeOfHeapPrune;
+
+	/*
+	 * We will take an ordinary exclusive lock or a cleanup lock depending on
+	 * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
+	 * lock, we better not be doing anything that requires moving existing
+	 * tuple data.
+	 */
+	Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
+		   (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
+
+	/*
+	 * We are about to remove and/or freeze tuples.  In Hot Standby mode,
+	 * ensure that there are no queries running for which the removed tuples
+	 * are still visible or which still consider the frozen xids as running.
+	 * The conflict horizon XID comes after xl_heap_prune.
+	 */
+	if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
+	{
+		TransactionId snapshot_conflict_horizon;
+
+		/* memcpy() because snapshot_conflict_horizon is stored unaligned */
+		memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
+		maindataptr += sizeof(TransactionId);
+
+		if (InHotStandby)
+			ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
+												(xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
+												rlocator);
+	}
+
+	/*
+	 * If we have a full-page image, restore it and we're done.
+	 */
+	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+										   (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
+										   &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		Page		page = (Page) BufferGetPage(buffer);
+		OffsetNumber *redirected;
+		OffsetNumber *nowdead;
+		OffsetNumber *nowunused;
+		int			nredirected;
+		int			ndead;
+		int			nunused;
+		int			nplans;
+		Size		datalen;
+		xlhp_freeze_plan *plans;
+		OffsetNumber *frz_offsets;
+		char	   *dataptr = XLogRecGetBlockData(record, 0, &datalen);
+
+		heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
+											   &nplans, &plans, &frz_offsets,
+											   &nredirected, &redirected,
+											   &ndead, &nowdead,
+											   &nunused, &nowunused);
+
+		/*
+		 * Update all line pointers per the record, and repair fragmentation
+		 * if needed.
+		 */
+		if (nredirected > 0 || ndead > 0 || nunused > 0)
+			heap_page_prune_execute(buffer,
+									(xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
+									redirected, nredirected,
+									nowdead, ndead,
+									nowunused, nunused);
+
+		/* Freeze tuples */
+		for (int p = 0; p < nplans; p++)
+		{
+			HeapTupleFreeze frz;
+
+			/*
+			 * Convert freeze plan representation from WAL record into
+			 * per-tuple format used by heap_execute_freeze_tuple
+			 */
+			frz.xmax = plans[p].xmax;
+			frz.t_infomask2 = plans[p].t_infomask2;
+			frz.t_infomask = plans[p].t_infomask;
+			frz.frzflags = plans[p].frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
+
+			for (int i = 0; i < plans[p].ntuples; i++)
+			{
+				OffsetNumber offset = *(frz_offsets++);
+				ItemId		lp;
+				HeapTupleHeader tuple;
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
+		}
+
+		/* There should be no more data */
+		Assert((char *) frz_offsets == dataptr + datalen);
+
+		/*
+		 * Note: we don't worry about updating the page's prunability hints.
+		 * At worst this will cause an extra prune cycle to occur soon.
+		 */
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+
+	/*
+	 * If we released any space or line pointers, update the free space map.
+	 *
+	 * Do this regardless of a full-page image being applied, since the FSM
+	 * data is not in the page anyway.
+	 */
+	if (BufferIsValid(buffer))
+	{
+		if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
+						   XLHP_HAS_DEAD_ITEMS |
+						   XLHP_HAS_NOW_UNUSED_ITEMS))
+		{
+			Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
+
+			UnlockReleaseBuffer(buffer);
+
+			XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+		}
+		else
+			UnlockReleaseBuffer(buffer);
+	}
+}
+
+/*
+ * Replay XLOG_HEAP2_VISIBLE record.
+ *
+ * The critical integrity requirement here is that we must never end up with
+ * a situation where the visibility map bit is set, and the page-level
+ * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
+ * page modification would fail to clear the visibility map bit.
+ */
+static void
+heap_xlog_visible(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+	Buffer		vmbuffer = InvalidBuffer;
+	Buffer		buffer;
+	Page		page;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	XLogRedoAction action;
+
+	Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
+	XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
+
+	/*
+	 * If there are any Hot Standby transactions running that have an xmin
+	 * horizon old enough that this page isn't all-visible for them, they
+	 * might incorrectly decide that an index-only scan can skip a heap fetch.
+	 *
+	 * NB: It might be better to throw some kind of "soft" conflict here that
+	 * forces any index-only scan that is in flight to perform heap fetches,
+	 * rather than killing the transaction outright.
+	 */
+	if (InHotStandby)
+		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
+											rlocator);
+
+	/*
+	 * Read the heap page, if it still exists. If the heap file has dropped or
+	 * truncated later in recovery, we don't need to update the page, but we'd
+	 * better still update the visibility map.
+	 */
+	action = XLogReadBufferForRedo(record, 1, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		/*
+		 * We don't bump the LSN of the heap page when setting the visibility
+		 * map bit (unless checksums or wal_hint_bits is enabled, in which
+		 * case we must). This exposes us to torn page hazards, but since
+		 * we're not inspecting the existing page contents in any way, we
+		 * don't care.
+		 */
+		page = BufferGetPage(buffer);
+
+		PageSetAllVisible(page);
+
+		if (XLogHintBitIsNeeded())
+			PageSetLSN(page, lsn);
+
+		MarkBufferDirty(buffer);
+	}
+	else if (action == BLK_RESTORED)
+	{
+		/*
+		 * If heap block was backed up, we already restored it and there's
+		 * nothing more to do. (This can only happen with checksums or
+		 * wal_log_hints enabled.)
+		 */
+	}
+
+	if (BufferIsValid(buffer))
+	{
+		Size		space = PageGetFreeSpace(BufferGetPage(buffer));
+
+		UnlockReleaseBuffer(buffer);
+
+		/*
+		 * Since FSM is not WAL-logged and only updated heuristically, it
+		 * easily becomes stale in standbys.  If the standby is later promoted
+		 * and runs VACUUM, it will skip updating individual free space
+		 * figures for pages that became all-visible (or all-frozen, depending
+		 * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
+		 * propagates too optimistic free space values to upper FSM layers;
+		 * later inserters try to use such pages only to find out that they
+		 * are unusable.  This can cause long stalls when there are many such
+		 * pages.
+		 *
+		 * Forestall those problems by updating FSM's idea about a page that
+		 * is becoming all-visible or all-frozen.
+		 *
+		 * Do this regardless of a full-page image being applied, since the
+		 * FSM data is not in the page anyway.
+		 */
+		if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
+			XLogRecordPageWithFreeSpace(rlocator, blkno, space);
+	}
+
+	/*
+	 * Even if we skipped the heap page update due to the LSN interlock, it's
+	 * still safe to update the visibility map.  Any WAL record that clears
+	 * the visibility map bit does so before checking the page LSN, so any
+	 * bits that need to be cleared will still be cleared.
+	 */
+	if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+									  &vmbuffer) == BLK_NEEDS_REDO)
+	{
+		Page		vmpage = BufferGetPage(vmbuffer);
+		Relation	reln;
+		uint8		vmbits;
+
+		/* initialize the page if it was read as zeros */
+		if (PageIsNew(vmpage))
+			PageInit(vmpage, BLCKSZ, 0);
+
+		/* remove VISIBILITYMAP_XLOG_* */
+		vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
+		/*
+		 * XLogReadBufferForRedoExtended locked the buffer. But
+		 * visibilitymap_set will handle locking itself.
+		 */
+		LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
+
+		reln = CreateFakeRelcacheEntry(rlocator);
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+
+		visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
+						  xlrec->snapshotConflictHorizon, vmbits);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+	else if (BufferIsValid(vmbuffer))
+		UnlockReleaseBuffer(vmbuffer);
+}
+
+/*
+ * Given an "infobits" field from an XLog record, set the correct bits in the
+ * given infomask and infomask2 for the tuple touched by the record.
+ *
+ * (This is the reverse of compute_infobits).
+ */
+static void
+fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
+{
+	*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
+				   HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
+	*infomask2 &= ~HEAP_KEYS_UPDATED;
+
+	if (infobits & XLHL_XMAX_IS_MULTI)
+		*infomask |= HEAP_XMAX_IS_MULTI;
+	if (infobits & XLHL_XMAX_LOCK_ONLY)
+		*infomask |= HEAP_XMAX_LOCK_ONLY;
+	if (infobits & XLHL_XMAX_EXCL_LOCK)
+		*infomask |= HEAP_XMAX_EXCL_LOCK;
+	/* note HEAP_XMAX_SHR_LOCK isn't considered here */
+	if (infobits & XLHL_XMAX_KEYSHR_LOCK)
+		*infomask |= HEAP_XMAX_KEYSHR_LOCK;
+
+	if (infobits & XLHL_KEYS_UPDATED)
+		*infomask2 |= HEAP_KEYS_UPDATED;
+}
+
+static void
+heap_xlog_delete(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+	BlockNumber blkno;
+	RelFileLocator target_locator;
+	ItemPointerData target_tid;
+
+	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+	ItemPointerSetBlockNumber(&target_tid, blkno);
+	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(target_locator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+			lp = PageGetItemId(page, xlrec->offnum);
+
+		if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->infobits_set,
+								   &htup->t_infomask, &htup->t_infomask2);
+		if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
+			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		else
+			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, XLogRecGetXid(record));
+
+		if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* Make sure t_ctid is set correctly */
+		if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
+			HeapTupleHeaderSetMovedPartitions(htup);
+		else
+			htup->t_ctid = target_tid;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_insert(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	HeapTupleHeader htup;
+	xl_heap_header xlhdr;
+	uint32		newlen;
+	Size		freespace = 0;
+	RelFileLocator target_locator;
+	BlockNumber blkno;
+	ItemPointerData target_tid;
+	XLogRedoAction action;
+
+	XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+	ItemPointerSetBlockNumber(&target_tid, blkno);
+	ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(target_locator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/*
+	 * If we inserted the first and only tuple on the page, re-initialize the
+	 * page from scratch.
+	 */
+	if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+	{
+		buffer = XLogInitBufferForRedo(record, 0);
+		page = BufferGetPage(buffer);
+		PageInit(page, BufferGetPageSize(buffer), 0);
+		action = BLK_NEEDS_REDO;
+	}
+	else
+		action = XLogReadBufferForRedo(record, 0, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		Size		datalen;
+		char	   *data;
+
+		page = BufferGetPage(buffer);
+
+		if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
+			elog(PANIC, "invalid max offset number");
+
+		data = XLogRecGetBlockData(record, 0, &datalen);
+
+		newlen = datalen - SizeOfHeapHeader;
+		Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+		memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+		data += SizeOfHeapHeader;
+
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+		memcpy((char *) htup + SizeofHeapTupleHeader,
+			   data,
+			   newlen);
+		newlen += SizeofHeapTupleHeader;
+		htup->t_infomask2 = xlhdr.t_infomask2;
+		htup->t_infomask = xlhdr.t_infomask;
+		htup->t_hoff = xlhdr.t_hoff;
+		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		htup->t_ctid = target_tid;
+
+		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+						true, true) == InvalidOffsetNumber)
+			elog(PANIC, "failed to add tuple");
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+
+		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+			PageSetAllVisible(page);
+
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	/*
+	 * If the page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
+}
+
+/*
+ * Handles MULTI_INSERT record type.
+ */
+static void
+heap_xlog_multi_insert(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_multi_insert *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber blkno;
+	Buffer		buffer;
+	Page		page;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	HeapTupleHeader htup;
+	uint32		newlen;
+	Size		freespace = 0;
+	int			i;
+	bool		isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
+	XLogRedoAction action;
+
+	/*
+	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
+	 * required.
+	 */
+	xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+
+	/* check that the mutually exclusive flags are not both set */
+	Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
+			 (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (isinit)
+	{
+		buffer = XLogInitBufferForRedo(record, 0);
+		page = BufferGetPage(buffer);
+		PageInit(page, BufferGetPageSize(buffer), 0);
+		action = BLK_NEEDS_REDO;
+	}
+	else
+		action = XLogReadBufferForRedo(record, 0, &buffer);
+	if (action == BLK_NEEDS_REDO)
+	{
+		char	   *tupdata;
+		char	   *endptr;
+		Size		len;
+
+		/* Tuples are stored as block data */
+		tupdata = XLogRecGetBlockData(record, 0, &len);
+		endptr = tupdata + len;
+
+		page = (Page) BufferGetPage(buffer);
+
+		for (i = 0; i < xlrec->ntuples; i++)
+		{
+			OffsetNumber offnum;
+			xl_multi_insert_tuple *xlhdr;
+
+			/*
+			 * If we're reinitializing the page, the tuples are stored in
+			 * order from FirstOffsetNumber. Otherwise there's an array of
+			 * offsets in the WAL record, and the tuples come after that.
+			 */
+			if (isinit)
+				offnum = FirstOffsetNumber + i;
+			else
+				offnum = xlrec->offsets[i];
+			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+				elog(PANIC, "invalid max offset number");
+
+			xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+			tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+
+			newlen = xlhdr->datalen;
+			Assert(newlen <= MaxHeapTupleSize);
+			htup = &tbuf.hdr;
+			MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+			/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+			memcpy((char *) htup + SizeofHeapTupleHeader,
+				   (char *) tupdata,
+				   newlen);
+			tupdata += newlen;
+
+			newlen += SizeofHeapTupleHeader;
+			htup->t_infomask2 = xlhdr->t_infomask2;
+			htup->t_infomask = xlhdr->t_infomask;
+			htup->t_hoff = xlhdr->t_hoff;
+			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+			HeapTupleHeaderSetCmin(htup, FirstCommandId);
+			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+			if (offnum == InvalidOffsetNumber)
+				elog(PANIC, "failed to add tuple");
+		}
+		if (tupdata != endptr)
+			elog(PANIC, "total tuple length mismatch");
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+
+		if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+		if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+			PageSetAllVisible(page);
+
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	/*
+	 * If the page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+}
+
+/*
+ * Handles UPDATE and HOT_UPDATE
+ */
+static void
+heap_xlog_update(XLogReaderState *record, bool hot_update)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
+	RelFileLocator rlocator;
+	BlockNumber oldblk;
+	BlockNumber newblk;
+	ItemPointerData newtid;
+	Buffer		obuffer,
+				nbuffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleData oldtup;
+	HeapTupleHeader htup;
+	uint16		prefixlen = 0,
+				suffixlen = 0;
+	char	   *newp;
+	union
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	xl_heap_header xlhdr;
+	uint32		newlen;
+	Size		freespace = 0;
+	XLogRedoAction oldaction;
+	XLogRedoAction newaction;
+
+	/* initialize to keep the compiler quiet */
+	oldtup.t_data = NULL;
+	oldtup.t_len = 0;
+
+	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
+	if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
+	{
+		/* HOT updates are never done across pages */
+		Assert(!hot_update);
+	}
+	else
+		oldblk = newblk;
+
+	ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, oldblk, &vmbuffer);
+		visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/*
+	 * In normal operation, it is important to lock the two pages in
+	 * page-number order, to avoid possible deadlocks against other update
+	 * operations going the other way.  However, during WAL replay there can
+	 * be no other update happening, so we don't need to worry about that. But
+	 * we *do* need to worry that we don't expose an inconsistent state to Hot
+	 * Standby queries --- so the original page can't be unlocked before we've
+	 * added the new tuple to the new page.
+	 */
+
+	/* Deal with old tuple version */
+	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
+									  &obuffer);
+	if (oldaction == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(obuffer);
+		offnum = xlrec->old_offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		oldtup.t_data = htup;
+		oldtup.t_len = ItemIdGetLength(lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		if (hot_update)
+			HeapTupleHeaderSetHotUpdated(htup);
+		else
+			HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		/* Set forward chain link in t_ctid */
+		htup->t_ctid = newtid;
+
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, XLogRecGetXid(record));
+
+		if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(obuffer);
+	}
+
+	/*
+	 * Read the page the new tuple goes into, if different from old.
+	 */
+	if (oldblk == newblk)
+	{
+		nbuffer = obuffer;
+		newaction = oldaction;
+	}
+	else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+	{
+		nbuffer = XLogInitBufferForRedo(record, 0);
+		page = (Page) BufferGetPage(nbuffer);
+		PageInit(page, BufferGetPageSize(nbuffer), 0);
+		newaction = BLK_NEEDS_REDO;
+	}
+	else
+		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(rlocator);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, newblk, &vmbuffer);
+		visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	/* Deal with new tuple */
+	if (newaction == BLK_NEEDS_REDO)
+	{
+		char	   *recdata;
+		char	   *recdata_end;
+		Size		datalen;
+		Size		tuplen;
+
+		recdata = XLogRecGetBlockData(record, 0, &datalen);
+		recdata_end = recdata + datalen;
+
+		page = BufferGetPage(nbuffer);
+
+		offnum = xlrec->new_offnum;
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+			elog(PANIC, "invalid max offset number");
+
+		if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
+		{
+			Assert(newblk == oldblk);
+			memcpy(&prefixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
+		}
+		if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
+		{
+			Assert(newblk == oldblk);
+			memcpy(&suffixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
+		}
+
+		memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
+		recdata += SizeOfHeapHeader;
+
+		tuplen = recdata_end - recdata;
+		Assert(tuplen <= MaxHeapTupleSize);
+
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+
+		/*
+		 * Reconstruct the new tuple using the prefix and/or suffix from the
+		 * old tuple, and the data stored in the WAL record.
+		 */
+		newp = (char *) htup + SizeofHeapTupleHeader;
+		if (prefixlen > 0)
+		{
+			int			len;
+
+			/* copy bitmap [+ padding] [+ oid] from WAL record */
+			len = xlhdr.t_hoff - SizeofHeapTupleHeader;
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+
+			/* copy prefix from old tuple */
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+			newp += prefixlen;
+
+			/* copy new tuple data from WAL record */
+			len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+		}
+		else
+		{
+			/*
+			 * copy bitmap [+ padding] [+ oid] + data from record, all in one
+			 * go
+			 */
+			memcpy(newp, recdata, tuplen);
+			recdata += tuplen;
+			newp += tuplen;
+		}
+		Assert(recdata == recdata_end);
+
+		/* copy suffix from old tuple */
+		if (suffixlen > 0)
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+		newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
+		htup->t_infomask2 = xlhdr.t_infomask2;
+		htup->t_infomask = xlhdr.t_infomask;
+		htup->t_hoff = xlhdr.t_hoff;
+
+		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
+		/* Make sure there is no forward chain link in t_ctid */
+		htup->t_ctid = newtid;
+
+		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+		if (offnum == InvalidOffsetNumber)
+			elog(PANIC, "failed to add tuple");
+
+		if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
+
+		freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(nbuffer);
+	}
+
+	if (BufferIsValid(nbuffer) && nbuffer != obuffer)
+		UnlockReleaseBuffer(nbuffer);
+	if (BufferIsValid(obuffer))
+		UnlockReleaseBuffer(obuffer);
+
+	/*
+	 * If the new page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * However, don't update the FSM on HOT updates, because after crash
+	 * recovery, either the old or the new tuple will certainly be dead and
+	 * prunable. After pruning, the page will have roughly as much free space
+	 * as it did before the update, assuming the new tuple is about the same
+	 * size as the old one.
+	 *
+	 * XXX: Don't do this if the page was restored from full page image. We
+	 * don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
+}
+
+static void
+heap_xlog_confirm(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		/*
+		 * Confirm tuple as actually inserted
+		 */
+		ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_lock(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+	{
+		RelFileLocator rlocator;
+		Buffer		vmbuffer = InvalidBuffer;
+		BlockNumber block;
+		Relation	reln;
+
+		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+		reln = CreateFakeRelcacheEntry(rlocator);
+
+		visibilitymap_pin(reln, block, &vmbuffer);
+		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = (Page) BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+
+		/*
+		 * Clear relevant update flags, but only if the modified infomask says
+		 * there's no update.
+		 */
+		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
+		{
+			HeapTupleHeaderClearHotUpdated(htup);
+			/* Make sure there is no forward chain link in t_ctid */
+			ItemPointerSet(&htup->t_ctid,
+						   BufferGetBlockNumber(buffer),
+						   offnum);
+		}
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_lock_updated(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_lock_updated *xlrec;
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+
+	xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+	{
+		RelFileLocator rlocator;
+		Buffer		vmbuffer = InvalidBuffer;
+		BlockNumber block;
+		Relation	reln;
+
+		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+		reln = CreateFakeRelcacheEntry(rlocator);
+
+		visibilitymap_pin(reln, block, &vmbuffer);
+		visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+static void
+heap_xlog_inplace(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber offnum;
+	ItemId		lp = NULL;
+	HeapTupleHeader htup;
+	uint32		oldlen;
+	Size		newlen;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		char	   *newtup = XLogRecGetBlockData(record, 0, &newlen);
+
+		page = BufferGetPage(buffer);
+
+		offnum = xlrec->offnum;
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
+
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "invalid lp");
+
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+		oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+		if (oldlen != newlen)
+			elog(PANIC, "wrong tuple length");
+
+		memcpy((char *) htup + htup->t_hoff, newtup, newlen);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+void
+heap_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	/*
+	 * These operations don't overwrite MVCC data so no conflict processing is
+	 * required. The ones in heap2 rmgr do.
+	 */
+
+	switch (info & XLOG_HEAP_OPMASK)
+	{
+		case XLOG_HEAP_INSERT:
+			heap_xlog_insert(record);
+			break;
+		case XLOG_HEAP_DELETE:
+			heap_xlog_delete(record);
+			break;
+		case XLOG_HEAP_UPDATE:
+			heap_xlog_update(record, false);
+			break;
+		case XLOG_HEAP_TRUNCATE:
+
+			/*
+			 * TRUNCATE is a no-op because the actions are already logged as
+			 * SMGR WAL records.  TRUNCATE WAL record only exists for logical
+			 * decoding.
+			 */
+			break;
+		case XLOG_HEAP_HOT_UPDATE:
+			heap_xlog_update(record, true);
+			break;
+		case XLOG_HEAP_CONFIRM:
+			heap_xlog_confirm(record);
+			break;
+		case XLOG_HEAP_LOCK:
+			heap_xlog_lock(record);
+			break;
+		case XLOG_HEAP_INPLACE:
+			heap_xlog_inplace(record);
+			break;
+		default:
+			elog(PANIC, "heap_redo: unknown op code %u", info);
+	}
+}
+
+void
+heap2_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info & XLOG_HEAP_OPMASK)
+	{
+		case XLOG_HEAP2_PRUNE_ON_ACCESS:
+		case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
+		case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
+			heap_xlog_prune_freeze(record);
+			break;
+		case XLOG_HEAP2_VISIBLE:
+			heap_xlog_visible(record);
+			break;
+		case XLOG_HEAP2_MULTI_INSERT:
+			heap_xlog_multi_insert(record);
+			break;
+		case XLOG_HEAP2_LOCK_UPDATED:
+			heap_xlog_lock_updated(record);
+			break;
+		case XLOG_HEAP2_NEW_CID:
+
+			/*
+			 * Nothing to do on a real replay, only used during logical
+			 * decoding.
+			 */
+			break;
+		case XLOG_HEAP2_REWRITE:
+			heap_xlog_logical_rewrite(record);
+			break;
+		default:
+			elog(PANIC, "heap2_redo: unknown op code %u", info);
+	}
+}
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *pagedata, BlockNumber blkno)
+{
+	Page		page = (Page) pagedata;
+	OffsetNumber off;
+
+	mask_page_lsn_and_checksum(page);
+
+	mask_page_hint_bits(page);
+	mask_unused_space(page);
+
+	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+	{
+		ItemId		iid = PageGetItemId(page, off);
+		char	   *page_item;
+
+		page_item = (char *) (page + ItemIdGetOffset(iid));
+
+		if (ItemIdIsNormal(iid))
+		{
+			HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+			/*
+			 * If xmin of a tuple is not yet frozen, we should ignore
+			 * differences in hint bits, since they can be set without
+			 * emitting WAL.
+			 */
+			if (!HeapTupleHeaderXminFrozen(page_htup))
+				page_htup->t_infomask &= ~HEAP_XACT_MASK;
+			else
+			{
+				/* Still we need to mask xmax hint bits. */
+				page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
+				page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
+			}
+
+			/*
+			 * During replay, we set Command Id to FirstCommandId. Hence, mask
+			 * it. See heap_xlog_insert() for details.
+			 */
+			page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
+
+			/*
+			 * For a speculative tuple, heap_insert() does not set ctid in the
+			 * caller-passed heap tuple itself, leaving the ctid field to
+			 * contain a speculative token value - a per-backend monotonically
+			 * increasing identifier. Besides, it does not WAL-log ctid under
+			 * any circumstances.
+			 *
+			 * During redo, heap_xlog_insert() sets t_ctid to current block
+			 * number and self offset number. It doesn't care about any
+			 * speculative insertions on the primary. Hence, we set t_ctid to
+			 * current block number and self offset number to ignore any
+			 * inconsistency.
+			 */
+			if (HeapTupleHeaderIsSpeculative(page_htup))
+				ItemPointerSet(&page_htup->t_ctid, blkno, off);
+
+			/*
+			 * NB: Not ignoring ctid changes due to the tuple having moved
+			 * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
+			 * important information that needs to be in-sync between primary
+			 * and standby, and thus is WAL logged.
+			 */
+		}
+
+		/*
+		 * Ignore any padding bytes after the tuple, when the length of the
+		 * item is not MAXALIGNed.
+		 */
+		if (ItemIdHasStorage(iid))
+		{
+			int			len = ItemIdGetLength(iid);
+			int			padlen = MAXALIGN(len) - len;
+
+			if (padlen > 0)
+				memset(page_item + len, MASK_MARKER, padlen);
+		}
+	}
+}
diff --git a/src/backend/access/heap/meson.build b/src/backend/access/heap/meson.build
index e00d5b4f0d..19a990208e 100644
--- a/src/backend/access/heap/meson.build
+++ b/src/backend/access/heap/meson.build
@@ -4,6 +4,7 @@ backend_sources += files(
   'heapam.c',
   'heapam_handler.c',
   'heapam_visibility.c',
+  'heapam_xlog.c',
   'heaptoast.c',
   'hio.c',
   'pruneheap.c',
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..b92eb506ec 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -14,6 +14,7 @@
 #ifndef HEAPAM_H
 #define HEAPAM_H
 
+#include "access/heapam_xlog.h"
 #include "access/relation.h"	/* for backward compatibility */
 #include "access/relscan.h"
 #include "access/sdir.h"
@@ -422,4 +423,28 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
 												Buffer buffer, Snapshot snapshot);
 
+/*
+ * heap_execute_freeze_tuple
+ *		Execute the prepared freezing of a tuple with caller's freeze plan.
+ *
+ * Caller is responsible for ensuring that no other backend can access the
+ * storage underlying this tuple, either by holding an exclusive lock on the
+ * buffer containing it (which is what lazy VACUUM does), or by having it be
+ * in private storage (which is what CLUSTER and friends do).
+ */
+static inline void
+heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
+{
+	HeapTupleHeaderSetXmax(tuple, frz->xmax);
+
+	if (frz->frzflags & XLH_FREEZE_XVAC)
+		HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
+
+	if (frz->frzflags & XLH_INVALID_XVAC)
+		HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
+
+	tuple->t_infomask = frz->t_infomask;
+	tuple->t_infomask2 = frz->t_infomask2;
+}
+
 #endif							/* HEAPAM_H */
-- 
2.34.1

