From 2a1bc0bb9fa94bd929adc1a408900cb925ebcdd5 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Mon, 20 Apr 2020 08:05:58 -0700
Subject: [PATCH v2] Adding heapcheck contrib module.

The heapcheck module introduces a new function for checking a heap
relation and associated toast relation, if any, for corruption.

The postgres backend already defends against certain forms of
corruption, by checking the page header of each page before allowing
it into the page cache, and by checking the page checksum, if enabled.
Experience shows that broken or ill-conceived backup and restore
mechanisms can result in a page, or an entire file, being overwritten
with an earlier version of itself, restored from backup.  Pages thus
overwritten will appear to have valid page headers and checksums,
while potentially containing xmin, xmax, and toast pointers that are
invalid.

contrib/heapcheck introduces a function, heapcheck_relation, that
takes a regclass argument, scans the given heap relation, and returns
rows containing information about corruption found within the table.
The main focus of the scan is to find invalid xmin, xmax, and toast
pointer values.  It also checks for structural corruption within the
page (such as invalid t_hoff values) that could lead to the backend
aborting should the function blindly trust the data as it finds it.
---
 contrib/Makefile                              |    1 +
 contrib/heapcheck/.gitignore                  |    4 +
 contrib/heapcheck/Makefile                    |   25 +
 .../expected/001_create_extension.out         |    1 +
 .../expected/002_disallowed_reltypes.out      |   27 +
 contrib/heapcheck/heapcheck--1.0.sql          |   21 +
 contrib/heapcheck/heapcheck.c                 | 1167 +++++++++++++++++
 contrib/heapcheck/heapcheck.control           |    5 +
 .../heapcheck/sql/001_create_extension.sql    |    1 +
 .../heapcheck/sql/002_disallowed_reltypes.sql |   29 +
 contrib/heapcheck/t/003_heapcheck_relation.pl |  361 +++++
 doc/src/sgml/contrib.sgml                     |    1 +
 doc/src/sgml/filelist.sgml                    |    1 +
 doc/src/sgml/heapcheck.sgml                   |  133 ++
 14 files changed, 1777 insertions(+)
 create mode 100644 contrib/heapcheck/.gitignore
 create mode 100644 contrib/heapcheck/Makefile
 create mode 100644 contrib/heapcheck/expected/001_create_extension.out
 create mode 100644 contrib/heapcheck/expected/002_disallowed_reltypes.out
 create mode 100644 contrib/heapcheck/heapcheck--1.0.sql
 create mode 100644 contrib/heapcheck/heapcheck.c
 create mode 100644 contrib/heapcheck/heapcheck.control
 create mode 100644 contrib/heapcheck/sql/001_create_extension.sql
 create mode 100644 contrib/heapcheck/sql/002_disallowed_reltypes.sql
 create mode 100644 contrib/heapcheck/t/003_heapcheck_relation.pl
 create mode 100644 doc/src/sgml/heapcheck.sgml

diff --git a/contrib/Makefile b/contrib/Makefile
index 1846d415b6..27ac131526 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -20,6 +20,7 @@ SUBDIRS = \
 		earthdistance	\
 		file_fdw	\
 		fuzzystrmatch	\
+		heapcheck \
 		hstore		\
 		intagg		\
 		intarray	\
diff --git a/contrib/heapcheck/.gitignore b/contrib/heapcheck/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/contrib/heapcheck/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/contrib/heapcheck/Makefile b/contrib/heapcheck/Makefile
new file mode 100644
index 0000000000..8d780a41ab
--- /dev/null
+++ b/contrib/heapcheck/Makefile
@@ -0,0 +1,25 @@
+# contrib/heapcheck/Makefile
+
+MODULE_big = heapcheck
+OBJS = \
+	$(WIN32RES) \
+	heapcheck.o
+
+EXTENSION = heapcheck
+DATA = heapcheck--1.0.sql
+PGFILEDESC = "heapcheck - page corruption information"
+
+REGRESS = 001_create_extension 002_disallowed_reltypes
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/heapcheck
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/heapcheck/expected/001_create_extension.out b/contrib/heapcheck/expected/001_create_extension.out
new file mode 100644
index 0000000000..0ca79c22be
--- /dev/null
+++ b/contrib/heapcheck/expected/001_create_extension.out
@@ -0,0 +1 @@
+create extension heapcheck;
diff --git a/contrib/heapcheck/expected/002_disallowed_reltypes.out b/contrib/heapcheck/expected/002_disallowed_reltypes.out
new file mode 100644
index 0000000000..8e0b18dfc3
--- /dev/null
+++ b/contrib/heapcheck/expected/002_disallowed_reltypes.out
@@ -0,0 +1,27 @@
+--
+-- check that using the module's functions with unsupported relations will fail
+--
+-- partitioned tables (the parent ones) don't have visibility maps
+create table test_partitioned (a int, b text default repeat('x', 5000)) partition by list (a);
+-- these should all fail
+select * from heapcheck_relation('test_partitioned');
+ERROR:  "test_partitioned" is not a table, materialized view, or TOAST table
+create table test_partition partition of test_partitioned for values in (1);
+create index test_index on test_partition (a);
+-- indexes do not, so these all fail
+select * from heapcheck_relation('test_index');
+ERROR:  "test_index" is not a table, materialized view, or TOAST table
+create view test_view as select 1;
+-- views do not have vms, so these all fail
+select * from heapcheck_relation('test_view');
+ERROR:  "test_view" is not a table, materialized view, or TOAST table
+create sequence test_sequence;
+-- sequences do not have vms, so these all fail
+select * from heapcheck_relation('test_sequence');
+ERROR:  "test_sequence" is not a table, materialized view, or TOAST table
+create foreign data wrapper dummy;
+create server dummy_server foreign data wrapper dummy;
+create foreign table test_foreign_table () server dummy_server;
+-- foreign tables do not have vms, so these all fail
+select * from heapcheck_relation('test_foreign_table');
+ERROR:  "test_foreign_table" is not a table, materialized view, or TOAST table
diff --git a/contrib/heapcheck/heapcheck--1.0.sql b/contrib/heapcheck/heapcheck--1.0.sql
new file mode 100644
index 0000000000..48251e6781
--- /dev/null
+++ b/contrib/heapcheck/heapcheck--1.0.sql
@@ -0,0 +1,21 @@
+/* contrib/heapcheck/heapcheck--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION heapcheck" to load this file. \quit
+
+-- Show visibility map and page-level visibility information for each block.
+CREATE FUNCTION heapcheck_relation(regclass,
+								  blkno OUT bigint,
+								  offnum OUT integer,
+								  lp_off OUT smallint,
+								  lp_flags OUT smallint,
+								  lp_len OUT smallint,
+								  attnum OUT integer,
+								  chunk OUT integer,
+								  msg OUT text
+								  )
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'heapcheck_relation'
+LANGUAGE C STRICT;
+REVOKE ALL ON FUNCTION heapcheck_relation(regclass) FROM PUBLIC;
+GRANT EXECUTE ON FUNCTION heapcheck_relation(regclass) TO pg_stat_scan_tables;
diff --git a/contrib/heapcheck/heapcheck.c b/contrib/heapcheck/heapcheck.c
new file mode 100644
index 0000000000..7cd4690f98
--- /dev/null
+++ b/contrib/heapcheck/heapcheck.c
@@ -0,0 +1,1167 @@
+/*-------------------------------------------------------------------------
+ *
+ * heapcheck.c
+ *	  Functions to check postgresql relations for corruption
+ *
+ * Copyright (c) 2016-2020, PostgreSQL Global Development Group
+ *
+ *	  contrib/heapcheck/heapcheck.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/heaptoast.h"
+#include "access/htup_details.h"
+#include "access/multixact.h"
+#include "access/toast_internals.h"
+#include "access/visibilitymap.h"
+#include "access/xact.h"
+#include "catalog/pg_am.h"
+#include "catalog/pg_type.h"
+#include "catalog/storage_xlog.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(heapcheck_relation);
+
+typedef struct CorruptionInfo
+{
+	BlockNumber blkno;
+	OffsetNumber offnum;
+	int16		lp_off;
+	int16		lp_flags;
+	int16		lp_len;
+	int32		attnum;
+	int32		chunk;
+	char	   *msg;
+}			CorruptionInfo;
+
+typedef struct HeapCheckContext
+{
+	/* Values concerning the heap relation being checked */
+	Oid			relid;
+	Relation	rel;
+	TupleDesc	relDesc;
+	TransactionId relfrozenxid;
+	MultiXactId relminmxid;
+	int			rel_natts;
+	bool		has_toastrel;
+	Relation	toastrel;
+	Relation   *toast_indexes;
+	Relation	valid_toast_index;
+	int			num_toast_indexes;
+
+	/* Values for iterating over pages in the relation */
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy;
+	Buffer		buffer;
+	Page		page;
+
+	/* Values for iterating over tuples within a page */
+	OffsetNumber offnum;
+	OffsetNumber maxoff;
+	ItemId		itemid;
+	uint16		lp_len;
+	HeapTupleHeader tuphdr;
+	TransactionId xmin;
+	TransactionId xmax;
+	uint16		infomask;
+	int			natts;
+	bool		hasnulls;
+
+	/* Values for iterating over attributes within the tuple */
+	uint32		offset;			/* offset in tuple data */
+	AttrNumber	attnum;
+	char	   *tp;				/* pointer to the tuple data */
+	bits8	   *bp;				/* ptr to null bitmap in tuple */
+	Form_pg_attribute thisatt;
+
+	/* Values for iterating over toast for the attribute */
+	ScanKeyData toastkey;
+	SysScanDesc toastscan;
+	SnapshotData SnapshotToast;
+	int32		chunkno;
+	HeapTuple	toasttup;
+	int32		attrsize;
+	int32		endchunk;
+	int32		totalchunks;
+	TupleDesc	toasttupDesc;
+	bool		found_toasttup;
+
+	/* List of CorruptionInfo */
+	List	   *corruption;
+}			HeapCheckContext;
+
+/* Public API */
+typedef struct CheckRelCtx
+{
+	List	   *corruption;
+	int			idx;
+}			CheckRelCtx;
+
+Datum		heapcheck_relation(PG_FUNCTION_ARGS);
+
+/* Internal implementation */
+void		record_corruption(HeapCheckContext * ctx, char *msg);
+TupleDesc	heapcheck_relation_tupdesc(void);
+
+void		beginRelBlockIteration(HeapCheckContext * ctx);
+bool		relBlockIteration_next(HeapCheckContext * ctx);
+void		endRelBlockIteration(HeapCheckContext * ctx);
+
+void		beginPageTupleIteration(HeapCheckContext * ctx);
+bool		pageTupleIteration_next(HeapCheckContext * ctx);
+void		endPageTupleIteration(HeapCheckContext * ctx);
+
+void		beginTupleAttributeIteration(HeapCheckContext * ctx);
+bool		tupleAttributeIteration_next(HeapCheckContext * ctx);
+void		endTupleAttributeIteration(HeapCheckContext * ctx);
+
+void		beginToastTupleIteration(HeapCheckContext * ctx,
+									 struct varatt_external *toast_pointer);
+void		endToastTupleIteration(HeapCheckContext * ctx);
+bool		toastTupleIteration_next(HeapCheckContext * ctx);
+
+bool		TransactionIdStillValid(TransactionId xid, FullTransactionId *fxid);
+bool		HeapTupleIsVisible(HeapTupleHeader tuphdr, HeapCheckContext * ctx);
+void		check_toast_tuple(HeapCheckContext * ctx);
+bool		check_tuple_attribute(HeapCheckContext * ctx);
+void		check_tuple(HeapCheckContext * ctx);
+
+List	   *check_relation(Oid relid);
+void		check_relation_relkind(Relation rel);
+
+/*
+ * record_corruption
+ *
+ *   Record a message about corruption, including information
+ *   about where in the relation the corruption was found.
+ */
+void
+record_corruption(HeapCheckContext * ctx, char *msg)
+{
+	CorruptionInfo *info = (CorruptionInfo *) palloc0(sizeof(CorruptionInfo));
+
+	info->blkno = ctx->blkno;
+	info->offnum = ctx->offnum;
+	info->lp_off = ItemIdGetOffset(ctx->itemid);
+	info->lp_flags = ItemIdGetFlags(ctx->itemid);
+	info->lp_len = ItemIdGetLength(ctx->itemid);
+	info->attnum = ctx->attnum;
+	info->chunk = ctx->chunkno;
+	info->msg = msg;
+
+	ctx->corruption = lappend(ctx->corruption, info);
+}
+
+/*
+ * Helper function to construct the TupleDesc needed by heapcheck_relation.
+ */
+TupleDesc
+heapcheck_relation_tupdesc()
+{
+	TupleDesc	tupdesc;
+	AttrNumber	maxattr = 8;
+	AttrNumber	a = 0;
+
+	tupdesc = CreateTemplateTupleDesc(maxattr);
+	TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "offnum", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_off", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_flags", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "lp_len", INT2OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "attnum", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "chunk", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "msg", TEXTOID, -1, 0);
+	Assert(a == maxattr);
+
+	return BlessTupleDesc(tupdesc);
+}
+
+/*
+ * heapcheck_relation
+ *
+ *   Scan and report corruption in heap pages or in associated toast relation.
+ */
+Datum
+heapcheck_relation(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	CheckRelCtx *ctx;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		Oid			relid = PG_GETARG_OID(0);
+		MemoryContext oldcontext;
+
+		/*
+		 * Scan the entire relation, building up a list of corruption found in
+		 * ctx->corruption, for returning later.  The scan must be performed
+		 * in a memory context that will survive until after all rows are
+		 * returned.
+		 */
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+		funcctx->tuple_desc = heapcheck_relation_tupdesc();
+		ctx = (CheckRelCtx *) palloc0(sizeof(CheckRelCtx));
+		ctx->corruption = check_relation(relid);
+		ctx->idx = 0;			/* start the iterator at the beginning */
+		funcctx->user_fctx = (void *) ctx;
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	ctx = (CheckRelCtx *) funcctx->user_fctx;
+
+	/*
+	 * Return the next corruption message from the list, if any.  Our location
+	 * in the list is recorded in ctx->idx.  The special value -1 is used in
+	 * the list of corruptions to represent NULL; we check for negative
+	 * numbers when setting the nulls[] values.
+	 */
+	if (ctx->idx < list_length(ctx->corruption))
+	{
+		Datum		values[8];
+		bool		nulls[8];
+		HeapTuple	tuple;
+		CorruptionInfo *info = list_nth(ctx->corruption, ctx->idx);
+
+		MemSet(values, 0, sizeof(nulls));
+		MemSet(nulls, 0, sizeof(nulls));
+		values[0] = Int64GetDatum(info->blkno);
+		values[1] = Int32GetDatum(info->offnum);
+		nulls[1] = (info->offnum < 0);
+		values[2] = Int16GetDatum(info->lp_off);
+		nulls[2] = (info->lp_off < 0);
+		values[3] = Int16GetDatum(info->lp_flags);
+		nulls[3] = (info->lp_flags < 0);
+		values[4] = Int16GetDatum(info->lp_len);
+		nulls[4] = (info->lp_len < 0);
+		values[5] = Int32GetDatum(info->attnum);
+		nulls[5] = (info->attnum < 0);
+		values[6] = Int32GetDatum(info->chunk);
+		nulls[6] = (info->chunk < 0);
+		values[7] = CStringGetTextDatum(info->msg);
+		ctx->idx++;
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * beginRelBlockIteration
+ *
+ *   For the given heap relation being checked, as recorded in ctx, sets up
+ *   variables for iterating over the heap's pages.
+ *
+ *   The caller should have already opened the heap relation, ctx->rel
+ */
+void
+beginRelBlockIteration(HeapCheckContext * ctx)
+{
+	ctx->nblocks = RelationGetNumberOfBlocks(ctx->rel);
+	ctx->blkno = InvalidBlockNumber;
+	ctx->bstrategy = GetAccessStrategy(BAS_BULKREAD);
+	ctx->buffer = InvalidBuffer;
+	ctx->page = NULL;
+}
+
+/*
+ * endRelBlockIteration
+ *
+ *   Releases resources that were reserved by either beginRelBlockIteration or
+ *   relBlockIteration_next.
+ */
+void
+endRelBlockIteration(HeapCheckContext * ctx)
+{
+	/*
+	 * Clean up.  If the caller iterated to the end, the final call to
+	 * relBlockIteration_next will already have released the buffer, but if
+	 * the caller is bailing out early, we have to release it ourselves.
+	 */
+	if (InvalidBuffer != ctx->buffer)
+		UnlockReleaseBuffer(ctx->buffer);
+}
+
+/*
+ * relBlockIteration_next
+ *
+ *   Updates the state in ctx to point to the next page in the relation.
+ *   Returns true if there is any such page, else false.
+ *
+ *   The caller should have already called beginRelBlockIteration, and should
+ *   only continue calling until the false result.
+ */
+bool
+relBlockIteration_next(HeapCheckContext * ctx)
+{
+	/* We must unlock the page from the prior iteration, if any */
+	Assert(ctx->blkno == InvalidBlockNumber || ctx->buffer != InvalidBuffer);
+	if (InvalidBuffer != ctx->buffer)
+	{
+		UnlockReleaseBuffer(ctx->buffer);
+		ctx->buffer = InvalidBuffer;
+	}
+
+	/* We rely on this math property for the first iteration */
+	StaticAssertStmt(InvalidBlockNumber + 1 == 0,
+					 "InvalidBlockNumber increments to zero");
+	ctx->blkno++;
+	if (ctx->blkno >= ctx->nblocks)
+		return false;
+
+	/* Read and lock the next page. */
+	ctx->buffer = ReadBufferExtended(ctx->rel, MAIN_FORKNUM, ctx->blkno,
+									 RBM_NORMAL, ctx->bstrategy);
+	LockBuffer(ctx->buffer, BUFFER_LOCK_SHARE);
+	ctx->page = BufferGetPage(ctx->buffer);
+
+	return true;
+}
+
+/*
+ * beginPageTupleIteration
+ *
+ *   For the given page begin visited, as stored in ctx, sets up variables for
+ *   iterating over the tuples on the page.
+ */
+void
+beginPageTupleIteration(HeapCheckContext * ctx)
+{
+	/* We rely on this math property for the first iteration */
+	StaticAssertStmt(InvalidOffsetNumber + 1 == FirstOffsetNumber,
+					 "InvalidOffsetNumber increments to FirstOffsetNumber");
+
+	ctx->offnum = InvalidOffsetNumber;
+	ctx->maxoff = PageGetMaxOffsetNumber(ctx->page);
+	ctx->itemid = NULL;
+	ctx->lp_len = 0;
+	ctx->tuphdr = NULL;
+	ctx->xmin = InvalidOid;
+	ctx->xmax = InvalidOid;
+	ctx->infomask = 0;
+	ctx->natts = 0;
+	ctx->hasnulls = false;
+}
+
+/*
+ * endPageTupleIteration
+ *
+ *   Releases resources taken by beginPageTupleIteration or
+ *   pageTupleIteration_next.
+ */
+void
+endPageTupleIteration(HeapCheckContext * ctx)
+{
+	/* Abuse beginPageTupleIteration to reset the tuple iteration variables */
+	beginPageTupleIteration(ctx);
+}
+
+/*
+ * pageTupleIteration_next
+ *
+ *   Advances the state tracked in ctx to the next tuple on the page.
+ *
+ *   Caller should have already set up the iteration via
+ *   beginPageTupleIteration, and should stop calling when this function
+ *   returns false.
+ */
+bool
+pageTupleIteration_next(HeapCheckContext * ctx)
+{
+	/*
+	 * Iterate to the next interesting line pointer, if any. Unused, dead and
+	 * redirect line pointers are of no interest.
+	 */
+	do
+	{
+		ctx->offnum = OffsetNumberNext(ctx->offnum);
+		if (ctx->offnum > ctx->maxoff)
+			return false;
+		ctx->itemid = PageGetItemId(ctx->page, ctx->offnum);
+	} while (!ItemIdIsUsed(ctx->itemid) ||
+			 ItemIdIsDead(ctx->itemid) ||
+			 ItemIdIsRedirected(ctx->itemid));
+
+	/* Set up context information about this next tuple */
+	ctx->lp_len = ItemIdGetLength(ctx->itemid);
+	ctx->tuphdr = (HeapTupleHeader) PageGetItem(ctx->page, ctx->itemid);
+	ctx->xmin = HeapTupleHeaderGetXmin(ctx->tuphdr);
+	ctx->xmax = HeapTupleHeaderGetRawXmax(ctx->tuphdr);
+	ctx->infomask = ctx->tuphdr->t_infomask;
+	ctx->natts = HeapTupleHeaderGetNatts(ctx->tuphdr);
+	ctx->hasnulls = ctx->infomask & HEAP_HASNULL;
+
+	/*
+	 * Reset information about individual attributes and related toast values,
+	 * so they show as NULL in the corruption report if we record a corruption
+	 * before beginning to iterate over the attributes.
+	 */
+	ctx->attnum = -1;
+	ctx->chunkno = -1;
+
+	return true;
+}
+
+/*
+ * beginTupleAttributeIteration
+ *
+ *   For the given tuple begin visited, as stored in ctx, sets up variables for
+ *   iterating over the attributes in the tuple.
+ */
+void
+beginTupleAttributeIteration(HeapCheckContext * ctx)
+{
+	ctx->offset = 0;
+	ctx->attnum = -1;
+	ctx->tp = (char *) ctx->tuphdr + ctx->tuphdr->t_hoff;
+	ctx->bp = ctx->tuphdr->t_bits;
+}
+
+/*
+ * tupleAttributeIteration_next
+ *
+ *   Advances the state tracked in ctx to the next attribute in the tuple.
+ *
+ *   Caller should have already set up the iteration via
+ *   beginTupleAttributeIteration, and should stop calling when this function
+ *   returns false.
+ */
+bool
+tupleAttributeIteration_next(HeapCheckContext * ctx)
+{
+	ctx->attnum++;
+	if (ctx->attnum >= ctx->natts)
+		return false;
+	ctx->thisatt = TupleDescAttr(ctx->relDesc, ctx->attnum);
+	return true;
+}
+
+/*
+ * endTupleAttributeIteration
+ *
+ *   Resets state tracked in ctx after iteration over attributes ends.
+ */
+void
+endTupleAttributeIteration(HeapCheckContext * ctx)
+{
+	ctx->offset = -1;
+	ctx->attnum = -1;
+}
+
+/*
+ * beginToastTupleIteration
+ *
+ *   For the given attribute begin visited, as stored in ctx, sets up variables for
+ *   iterating over the related toast value.
+ */
+void
+beginToastTupleIteration(HeapCheckContext * ctx,
+						 struct varatt_external *toast_pointer)
+{
+	ctx->toasttupDesc = ctx->toastrel->rd_att;
+	ctx->found_toasttup = false;
+
+	ctx->attrsize = toast_pointer->va_extsize;
+	ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
+	ctx->totalchunks = ctx->endchunk + 1;
+
+	/*
+	 * Setup a scan key to find chunks in toast table with matching va_valueid
+	 */
+	ScanKeyInit(&ctx->toastkey,
+				(AttrNumber) 1,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(toast_pointer->va_valueid));
+
+	/*
+	 * Check if any chunks for this toasted object exist in the toast table,
+	 * accessible via the index.
+	 */
+	init_toast_snapshot(&ctx->SnapshotToast);
+	ctx->toastscan = systable_beginscan_ordered(ctx->toastrel,
+												ctx->valid_toast_index,
+												&ctx->SnapshotToast, 1,
+												&ctx->toastkey);
+	ctx->chunkno = 0;
+}
+
+/*
+ * toastTupleIteration_next
+ *
+ *   Advances the state tracked in ctx to the next toast tuple for the
+ *   attribute.
+ *
+ *   Caller should have already set up the iteration via
+ *   beginToastTupleIteration, and should stop calling when this function
+ *   returns false.
+ */
+bool
+toastTupleIteration_next(HeapCheckContext * ctx)
+{
+	ctx->toasttup = systable_getnext_ordered(ctx->toastscan,
+											 ForwardScanDirection);
+	return ctx->toasttup != NULL;
+}
+
+/*
+ * endToastTupleIteration
+ *
+ *   Releases resources taken by beginToastTupleIteration or
+ *   toastTupleIteration_next.
+ */
+void
+endToastTupleIteration(HeapCheckContext * ctx)
+{
+	systable_endscan_ordered(ctx->toastscan);
+}
+
+/*
+ * Given a TransactionId, attempt to interpret it as a valid
+ * FullTransactionId, neither in the future nor overlong in
+ * the past.  Stores the inferred FullTransactionId in *fxid.
+ *
+ * Returns whether the xid is newer than the oldest clog xid.
+ */
+bool
+TransactionIdStillValid(TransactionId xid, FullTransactionId *fxid)
+{
+	FullTransactionId fnow;
+	uint32		epoch;
+
+	/* Initialize fxid; we'll overwrite this later if needed */
+	*fxid = FullTransactionIdFromEpochAndXid(0, xid);
+
+	/* Special xids can quickly be turned into invalid fxids */
+	if (!TransactionIdIsValid(xid))
+		return false;
+	if (!TransactionIdIsNormal(xid))
+		return true;
+
+	/*
+	 * Charitably infer the full transaction id as being within one epoch ago
+	 */
+	fnow = ReadNextFullTransactionId();
+	epoch = EpochFromFullTransactionId(fnow);
+	*fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+	if (!FullTransactionIdPrecedes(*fxid, fnow))
+		*fxid = FullTransactionIdFromEpochAndXid(epoch - 1, xid);
+	if (!FullTransactionIdPrecedes(*fxid, fnow))
+		return false;
+
+	/* The oldestClogXid is protected by CLogTruncationLock */
+	Assert(LWLockHeldByMe(CLogTruncationLock));
+	if (TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
+		return false;
+	return true;
+}
+
+/*
+ * HeapTupleIsVisible
+ *
+ *	Determine whether tuples are visible for heapcheck.  Similar to
+ *  HeapTupleSatisfiesVacuum, but with critical differences.
+ *
+ *  1) Does not touch hint bits.  It seems imprudent to write hint bits
+ *     to a table during a corruption check.
+ *  2) Gracefully handles xids that are too old by calling
+ *     TransactionIdStillValid before TransactionLogFetch, thus avoiding
+ *     a backend abort.
+ *  3) Only makes a boolean determination of whether heapcheck should
+ *     see the tuple, rather than doing extra work for vacuum-related
+ *     categorization.
+ */
+bool
+HeapTupleIsVisible(HeapTupleHeader tuphdr, HeapCheckContext * ctx)
+{
+	FullTransactionId fxmin,
+				fxmax;
+	uint16		infomask = tuphdr->t_infomask;
+	TransactionId xmin = HeapTupleHeaderGetXmin(tuphdr);
+
+	if (!HeapTupleHeaderXminCommitted(tuphdr))
+	{
+		if (HeapTupleHeaderXminInvalid(tuphdr))
+		{
+			return false;		/* HEAPTUPLE_DEAD */
+		}
+		/* Used by pre-9.0 binary upgrades */
+		else if (infomask & HEAP_MOVED_OFF)
+		{
+			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+
+			if (TransactionIdIsCurrentTransactionId(xvac))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			if (TransactionIdIsInProgress(xvac))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			if (TransactionIdDidCommit(xvac))
+				return false;	/* HEAPTUPLE_DEAD */
+		}
+		/* Used by pre-9.0 binary upgrades */
+		else if (infomask & HEAP_MOVED_IN)
+		{
+			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+
+			if (TransactionIdIsCurrentTransactionId(xvac))
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			if (TransactionIdIsInProgress(xvac))
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			if (!TransactionIdDidCommit(xvac))
+				return false;	/* HEAPTUPLE_DEAD */
+		}
+		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuphdr)))
+			return false;		/* insert or delete in progress */
+		else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuphdr)))
+			return false;		/* HEAPTUPLE_INSERT_IN_PROGRESS */
+
+		/*
+		 * The tuple appears to either be or to have been visible to us, but
+		 * the xmin may be too far in the past to be used.  We have to check
+		 * that before calling TransactionIdDidCommit to avoid an Assertion.
+		 */
+		LWLockAcquire(CLogTruncationLock, LW_SHARED);
+		if (!TransactionIdStillValid(xmin, &fxmin))
+		{
+			LWLockRelease(CLogTruncationLock);
+			record_corruption(ctx, psprintf("tuple xmin = %u (interpreted as "
+											UINT64_FORMAT
+											") not or no longer valid",
+											xmin, fxmin.value));
+			return false;
+		}
+		else if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuphdr)))
+		{
+			LWLockRelease(CLogTruncationLock);
+			return false;		/* HEAPTUPLE_DEAD */
+		}
+		LWLockRelease(CLogTruncationLock);
+	}
+
+	if (!(infomask & HEAP_XMAX_INVALID) && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
+	{
+		if (infomask & HEAP_XMAX_IS_MULTI)
+		{
+			TransactionId xmax = HeapTupleGetUpdateXid(tuphdr);
+
+			/* not LOCKED_ONLY, so it has to have an xmax */
+			if (!TransactionIdIsValid(xmax))
+			{
+				record_corruption(ctx, _("heap tuple with XMAX_IS_MULTI is "
+										 "neither LOCKED_ONLY nor has a "
+										 "valid xmax"));
+				return false;
+			}
+			if (TransactionIdIsInProgress(xmax))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+
+			LWLockAcquire(CLogTruncationLock, LW_SHARED);
+			if (!TransactionIdStillValid(xmax, &fxmax))
+			{
+				LWLockRelease(CLogTruncationLock);
+				record_corruption(ctx, psprintf("tuple xmax = %u (interpreted "
+												"as " UINT64_FORMAT
+												") not or no longer valid",
+												xmax, fxmax.value));
+				return false;
+			}
+			else if (TransactionIdDidCommit(xmax))
+			{
+				LWLockRelease(CLogTruncationLock);
+				return false;	/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
+			}
+			LWLockRelease(CLogTruncationLock);
+			/* Ok, the tuple is live */
+		}
+		else if (!(infomask & HEAP_XMAX_COMMITTED))
+		{
+			if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuphdr)))
+				return false;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			/* Ok, the tuple is live */
+		}
+		else
+			return false;		/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
+	}
+	return true;
+}
+
+/*
+ * check_toast_tuple
+ *
+ *   Checks the current toast tuple as tracked in ctx for corruption.  Records
+ *   any corruption found in ctx->corruption.
+ *
+ *   The caller should have iterated to a tuple via toastTupleIteration_next.
+ */
+void
+check_toast_tuple(HeapCheckContext * ctx)
+{
+	int32		curchunk;
+	Pointer		chunk;
+	bool		isnull;
+	char	   *chunkdata;
+	int32		chunksize;
+	int32		expected_size;
+
+	ctx->found_toasttup = true;
+
+	/*
+	 * Have a chunk, extract the sequence number and the data
+	 */
+	curchunk = DatumGetInt32(fastgetattr(ctx->toasttup, 2,
+										 ctx->toasttupDesc, &isnull));
+	if (isnull)
+	{
+		record_corruption(ctx, _("toast chunk sequencenumber is null"));
+		return;
+	}
+	chunk = DatumGetPointer(fastgetattr(ctx->toasttup, 3,
+										ctx->toasttupDesc, &isnull));
+	if (isnull)
+	{
+		record_corruption(ctx, _("toast chunk data is null"));
+		return;
+	}
+	if (!VARATT_IS_EXTENDED(chunk))
+	{
+		chunksize = VARSIZE(chunk) - VARHDRSZ;
+		chunkdata = VARDATA(chunk);
+	}
+	else if (VARATT_IS_SHORT(chunk))
+	{
+		/*
+		 * could happen due to heap_form_tuple doing its thing
+		 */
+		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
+		chunkdata = VARDATA_SHORT(chunk);
+	}
+	else
+	{
+		/* should never happen */
+		record_corruption(ctx, _("toast chunk is neither short nor extended"));
+		return;
+	}
+
+	/*
+	 * Some checks on the data we've found
+	 */
+	if (curchunk != ctx->chunkno)
+	{
+		record_corruption(ctx, psprintf("toast chunk sequence number %u "
+										"not the expected sequence number %u",
+										curchunk, ctx->chunkno));
+		return;
+	}
+	if (curchunk > ctx->endchunk)
+	{
+		record_corruption(ctx, psprintf("toast chunk sequence number %u "
+										"exceeds the end chunk sequence "
+										"number %u",
+										curchunk, ctx->endchunk));
+		return;
+	}
+
+	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
+		: ctx->attrsize - ((ctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
+	if (chunksize != expected_size)
+	{
+		record_corruption(ctx, psprintf("chunk size %u differs from "
+										"expected size %u",
+										chunksize, expected_size));
+		return;
+	}
+
+	ctx->chunkno++;
+}
+
+/*
+ * check_tuple_attribute
+ *
+ *   Checks the current attribute as tracked in ctx for corruption.  Records
+ *   any corruption found in ctx->corruption.
+ *
+ *   The caller should have iterated to a tuple via
+ *   tupleAttributeIteration_next.
+ */
+bool
+check_tuple_attribute(HeapCheckContext * ctx)
+{
+	Datum		attdatum;
+	struct varlena *attr;
+
+	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
+	{
+		record_corruption(ctx, psprintf("t_hoff + offset > lp_len (%u + %u > %u)",
+										ctx->tuphdr->t_hoff, ctx->offset,
+										ctx->lp_len));
+		return false;
+	}
+
+	/* Skip null values */
+	if (ctx->hasnulls && att_isnull(ctx->attnum, ctx->bp))
+		return true;
+
+	/* Skip non-varlena values, but update offset first */
+	if (ctx->thisatt->attlen != -1)
+	{
+		ctx->offset = att_align_nominal(ctx->offset, ctx->thisatt->attalign);
+		ctx->offset = att_addlength_pointer(ctx->offset, ctx->thisatt->attlen,
+											ctx->tp + ctx->offset);
+		return true;
+	}
+
+	/* Ok, we're looking at a varlena attribute. */
+	ctx->offset = att_align_pointer(ctx->offset, ctx->thisatt->attalign, -1,
+									ctx->tp + ctx->offset);
+
+	/* Get the (possibly corrupt) varlena datum */
+	attdatum = fetchatt(ctx->thisatt, ctx->tp + ctx->offset);
+
+	/*
+	 * We have the datum, but we cannot decode it carelessly, as it may still
+	 * be corrupt.
+	 */
+
+	/*
+	 * Check that VARTAG_SIZE won't hit a TrapMacro on a corrupt va_tag before
+	 * risking a call into att_addlength_pointer
+	 */
+	if (VARATT_IS_1B_E(ctx->tp + ctx->offset))
+	{
+		uint8		va_tag = va_tag = VARTAG_EXTERNAL(ctx->tp + ctx->offset);
+
+		if (va_tag != VARTAG_ONDISK)
+		{
+			record_corruption(ctx, psprintf("unexpected TOAST vartag %u for "
+											"attribute #%u at t_hoff = %u, "
+											"offset = %u",
+											va_tag, ctx->attnum,
+											ctx->tuphdr->t_hoff, ctx->offset));
+			return false;		/* We can't know where the next attribute
+								 * begins */
+		}
+	}
+
+	/* Ok, should be safe now */
+	ctx->offset = att_addlength_pointer(ctx->offset, ctx->thisatt->attlen,
+										ctx->tp + ctx->offset);
+
+	/*
+	 * heap_deform_tuple would be done with this attribute at this point,
+	 * having stored it in values[], and would continue to the next attribute.
+	 * We go further, because we need to check if the toast datum is corrupt.
+	 */
+
+	attr = (struct varlena *) DatumGetPointer(attdatum);
+
+	/*
+	 * Now we follow the logic of detoast_external_attr(), with the same
+	 * caveats about being paranoid about corruption.
+	 */
+
+	/* Skip values that are not external */
+	if (!VARATT_IS_EXTERNAL(attr))
+		return true;
+
+	/* It is external, and we're looking at a page on disk */
+	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		record_corruption(ctx,
+						  _("attribute is external but not marked as on disk"));
+		return true;
+	}
+
+	/* The tuple header better claim to contain toasted values */
+	if (!(ctx->infomask & HEAP_HASEXTERNAL))
+	{
+		record_corruption(ctx, _("attribute is external but tuple header "
+								 "flag HEAP_HASEXTERNAL not set"));
+		return true;
+	}
+
+	/* The relation better have a toast table */
+	if (!ctx->has_toastrel)
+	{
+		record_corruption(ctx, _("attribute is external but relation has "
+								 "no toast relation"));
+		return true;
+	}
+
+	/*
+	 * Must dereference indirect toast pointers before we can check them
+	 */
+	if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect redirect;
+
+		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+		attr = (struct varlena *) redirect.pointer;
+
+		/* nested indirect Datums aren't allowed */
+		if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+		{
+			record_corruption(ctx, _("attribute has nested external "
+									 "indirect toast pointer"));
+			return true;
+		}
+	}
+
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		struct varatt_external toast_pointer;
+
+		/*
+		 * Must copy attr into toast_pointer for alignment considerations
+		 */
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+		beginToastTupleIteration(ctx, &toast_pointer);
+
+		while (toastTupleIteration_next(ctx))
+			check_toast_tuple(ctx);
+
+		if (ctx->chunkno != (ctx->endchunk + 1))
+			record_corruption(ctx, psprintf("final chunk number differs from "
+											"expected (%u vs. %u)",
+											ctx->chunkno, (ctx->endchunk + 1)));
+		if (!ctx->found_toasttup)
+			record_corruption(ctx, _("toasted value missing from "
+									 "toast table"));
+		endToastTupleIteration(ctx);
+	}
+	return true;
+}
+
+/*
+ * check_tuple
+ *
+ *   Checks the current tuple as tracked in ctx for corruption.  Records any
+ *   corruption found in ctx->corruption.
+ *
+ *   The caller should have iterated to a tuple via pageTupleIteration_next.
+ */
+void
+check_tuple(HeapCheckContext * ctx)
+{
+	bool		fatal = false;
+
+	/* Check relminmxid against mxid, if any */
+	if (ctx->infomask & HEAP_XMAX_IS_MULTI &&
+		MultiXactIdPrecedes(ctx->xmax, ctx->relminmxid))
+	{
+		record_corruption(ctx, psprintf("tuple xmax = %u precedes relation "
+										"relminmxid = %u",
+										ctx->xmax, ctx->relminmxid));
+	}
+
+	/* Check xmin against relfrozenxid */
+	if (TransactionIdIsNormal(ctx->relfrozenxid) &&
+		TransactionIdIsNormal(ctx->xmin) &&
+		TransactionIdPrecedes(ctx->xmin, ctx->relfrozenxid))
+	{
+		record_corruption(ctx, psprintf("tuple xmin = %u precedes relation "
+										"relfrozenxid = %u",
+										ctx->xmin, ctx->relfrozenxid));
+	}
+
+	/* Check xmax against relfrozenxid */
+	if (TransactionIdIsNormal(ctx->relfrozenxid) &&
+		TransactionIdIsNormal(ctx->xmax) &&
+		TransactionIdPrecedes(ctx->xmax, ctx->relfrozenxid))
+	{
+		record_corruption(ctx, psprintf("tuple xmax = %u precedes relation "
+										"relfrozenxid = %u",
+										ctx->xmax, ctx->relfrozenxid));
+	}
+
+	/* Check for tuple header corruption */
+	if (ctx->tuphdr->t_hoff < SizeofHeapTupleHeader)
+	{
+		record_corruption(ctx, psprintf("t_hoff < SizeofHeapTupleHeader (%u < %u)",
+										ctx->tuphdr->t_hoff,
+										(unsigned) SizeofHeapTupleHeader));
+		fatal = true;
+	}
+	if (ctx->tuphdr->t_hoff > ctx->lp_len)
+	{
+		record_corruption(ctx, psprintf("t_hoff > lp_len (%u > %u)",
+										ctx->tuphdr->t_hoff, ctx->lp_len));
+		fatal = true;
+	}
+	if (ctx->tuphdr->t_hoff != MAXALIGN(ctx->tuphdr->t_hoff))
+	{
+		record_corruption(ctx, psprintf("t_hoff not max-aligned (%u)",
+										ctx->tuphdr->t_hoff));
+		fatal = true;
+	}
+
+	/*
+	 * If the tuple has nulls, check that the implied length of the variable
+	 * length nulls bitmap field t_bits does not overflow the allowed space.
+	 * We don't know if the corruption is in the natts field or the infomask
+	 * bit HEAP_HASNULL.
+	 */
+	if (ctx->hasnulls &&
+		SizeofHeapTupleHeader + BITMAPLEN(ctx->natts) > ctx->tuphdr->t_hoff)
+	{
+		record_corruption(ctx, psprintf("SizeofHeapTupleHeader + "
+										"BITMAPLEN(natts) > t_hoff "
+										"(%u + %u > %u)",
+										(unsigned) SizeofHeapTupleHeader,
+										BITMAPLEN(ctx->natts),
+										ctx->tuphdr->t_hoff));
+		fatal = true;
+	}
+
+	/* Cannot process tuple data if tuple header was corrupt */
+	if (fatal)
+		return;
+
+	/*
+	 * Skip tuples that are invisible, as we cannot assume the TupleDesc we
+	 * are using is appropriate.
+	 */
+	if (!HeapTupleIsVisible(ctx->tuphdr, ctx))
+		return;
+
+	/*
+	 * If we get this far, the tuple is visible to us, so it must not be
+	 * incompatible with our relDesc.  The natts field could be legitimately
+	 * shorter than rel_natts, but it cannot be longer than rel_natts.
+	 */
+	if (ctx->rel_natts < ctx->natts)
+	{
+		record_corruption(ctx, psprintf("relation natts < tuple natts (%u < %u)",
+										ctx->rel_natts, ctx->natts));
+		return;
+	}
+
+	/*
+	 * Iterate over the attributes looking for broken toast values. This
+	 * roughly follows the logic of heap_deform_tuple, except that it doesn't
+	 * bother building up isnull[] and values[] arrays, since nobody wants
+	 * them, and it unrolls anything that might trip over an Assert when
+	 * processing corrupt data.
+	 */
+	beginTupleAttributeIteration(ctx);
+	while (tupleAttributeIteration_next(ctx) &&
+		   check_tuple_attribute(ctx))
+		;
+	endTupleAttributeIteration(ctx);
+}
+
+/*
+ * check_relation
+ *
+ *   Checks the relation given by relid for corruption, returning a list of all
+ *   it finds.
+ *
+ *   The caller should set up the memory context as desired before calling.
+ *   The returned list belongs to the caller.
+ */
+List *
+check_relation(Oid relid)
+{
+	HeapCheckContext ctx;
+
+	memset(&ctx, 0, sizeof(HeapCheckContext));
+
+	/* Open the relation */
+	ctx.relid = relid;
+	ctx.corruption = NIL;
+	ctx.rel = relation_open(relid, AccessShareLock);
+	check_relation_relkind(ctx.rel);
+
+	ctx.relDesc = RelationGetDescr(ctx.rel);
+	ctx.rel_natts = RelationGetDescr(ctx.rel)->natts;
+	ctx.relfrozenxid = ctx.rel->rd_rel->relfrozenxid;
+	ctx.relminmxid = ctx.rel->rd_rel->relminmxid;
+
+	/* Open the toast relation, if any */
+	if (ctx.rel->rd_rel->reltoastrelid)
+	{
+		int			offset;
+
+		/* Main relation has associated toast relation */
+		ctx.has_toastrel = true;
+		ctx.toastrel = table_open(ctx.rel->rd_rel->reltoastrelid,
+								  AccessShareLock);
+		offset = toast_open_indexes(ctx.toastrel,
+									AccessShareLock,
+									&(ctx.toast_indexes),
+									&(ctx.num_toast_indexes));
+		ctx.valid_toast_index = ctx.toast_indexes[offset];
+	}
+	else
+	{
+		/* Main relation has no associated toast relation */
+		ctx.has_toastrel = false;
+		ctx.toast_indexes = NULL;
+		ctx.num_toast_indexes = 0;
+	}
+
+	/* check all blocks of the relation */
+	beginRelBlockIteration(&ctx);
+	while (relBlockIteration_next(&ctx))
+	{
+		/* Perform tuple checks */
+		beginPageTupleIteration(&ctx);
+		while (pageTupleIteration_next(&ctx))
+			check_tuple(&ctx);
+		endPageTupleIteration(&ctx);
+	}
+	endRelBlockIteration(&ctx);
+
+	/* Close the associated toast table and indexes, if any. */
+	if (ctx.has_toastrel)
+	{
+		toast_close_indexes(ctx.toast_indexes, ctx.num_toast_indexes,
+							AccessShareLock);
+		table_close(ctx.toastrel, AccessShareLock);
+	}
+
+	/* Close the main relation */
+	relation_close(ctx.rel, AccessShareLock);
+
+	return ctx.corruption;
+}
+
+/*
+ * check_relation_relkind
+ *
+ *   convenience routine to check that relation is of a supported relkind.
+ */
+void
+check_relation_relkind(Relation rel)
+{
+	if (rel->rd_rel->relkind != RELKIND_RELATION &&
+		rel->rd_rel->relkind != RELKIND_MATVIEW &&
+		rel->rd_rel->relkind != RELKIND_TOASTVALUE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("\"%s\" is not a table, materialized view, "
+						"or TOAST table",
+						RelationGetRelationName(rel))));
+	if (rel->rd_rel->relam != HEAP_TABLE_AM_OID)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("\"%s\" is not a heap AM",
+						RelationGetRelationName(rel))));
+}
diff --git a/contrib/heapcheck/heapcheck.control b/contrib/heapcheck/heapcheck.control
new file mode 100644
index 0000000000..23b076169e
--- /dev/null
+++ b/contrib/heapcheck/heapcheck.control
@@ -0,0 +1,5 @@
+# heapcheck extension
+comment = 'examine relations for corruption'
+default_version = '1.0'
+module_pathname = '$libdir/heapcheck'
+relocatable = true
diff --git a/contrib/heapcheck/sql/001_create_extension.sql b/contrib/heapcheck/sql/001_create_extension.sql
new file mode 100644
index 0000000000..0ca79c22be
--- /dev/null
+++ b/contrib/heapcheck/sql/001_create_extension.sql
@@ -0,0 +1 @@
+create extension heapcheck;
diff --git a/contrib/heapcheck/sql/002_disallowed_reltypes.sql b/contrib/heapcheck/sql/002_disallowed_reltypes.sql
new file mode 100644
index 0000000000..782e2c7039
--- /dev/null
+++ b/contrib/heapcheck/sql/002_disallowed_reltypes.sql
@@ -0,0 +1,29 @@
+--
+-- check that using the module's functions with unsupported relations will fail
+--
+
+-- partitioned tables (the parent ones) don't have visibility maps
+create table test_partitioned (a int, b text default repeat('x', 5000)) partition by list (a);
+-- these should all fail
+select * from heapcheck_relation('test_partitioned');
+
+create table test_partition partition of test_partitioned for values in (1);
+create index test_index on test_partition (a);
+-- indexes do not, so these all fail
+select * from heapcheck_relation('test_index');
+
+create view test_view as select 1;
+-- views do not have vms, so these all fail
+select * from heapcheck_relation('test_view');
+
+create sequence test_sequence;
+-- sequences do not have vms, so these all fail
+select * from heapcheck_relation('test_sequence');
+
+create foreign data wrapper dummy;
+create server dummy_server foreign data wrapper dummy;
+create foreign table test_foreign_table () server dummy_server;
+-- foreign tables do not have vms, so these all fail
+select * from heapcheck_relation('test_foreign_table');
+
+
diff --git a/contrib/heapcheck/t/003_heapcheck_relation.pl b/contrib/heapcheck/t/003_heapcheck_relation.pl
new file mode 100644
index 0000000000..8630ac798b
--- /dev/null
+++ b/contrib/heapcheck/t/003_heapcheck_relation.pl
@@ -0,0 +1,361 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 1;
+
+# This regression test demonstrates that the heapcheck_relation() function
+# supplied with this contrib module correctly identifies specific kinds of
+# corruption within pages.  To test this, we need a mechanism to create corrupt
+# pages with predictable, repeatable corruption.  The postgres backend cannot be
+# expected to help us with this, as its design is not consistent with the goal
+# of intentionally corrupting pages.
+#
+# Instead, we create a table to corrupt, and with careful consideration of how
+# postgresql lays out heap pages, we seek to offsets within the page and
+# overwrite deliberately chosen bytes with specific values calculated to
+# corrupt the page in expected ways.  We then verify that heapcheck_relation
+# reports the corruption, and that it runs without crashing.  Note that the
+# backend cannot simply be started to run queries against the corrupt table, as
+# the backend will crash, at least for some of the corruption types we
+# generate.
+#
+# Autovacuum potentially touching the table in the background makes the exact
+# behavior of this test harder to reason about.  We turn it off to keep things
+# simpler.  We use a "belt and suspenders" approach, turning it off for the
+# system generally in postgresql.conf, and turning it off specifically for the
+# test table.
+#
+# This test depends on the table being written to the heap file exactly as we
+# expect it to be, so we take care to arrange the columns of the table, and
+# insert rows of the table, that give predictable sizes and locations within
+# the table page.
+#
+# The HeapTupleHeaderData has 23 bytes of fixed size fields before the variable
+# length t_bits[] array.  We have exactly 3 columns in the table, so natts = 3,
+# t_bits is 1 byte long, and t_hoff = MAXALIGN(23 + 1) = 24.
+#
+# We're not too fussy about which datatypes we use for the test, but we do care
+# about some specific properties.  We'd like to test both fixed size and
+# varlena types.  We'd like some varlena data inline and some toasted.  And
+# we'd like the layout of the table such that the datums land at predictable
+# offsets within the tuple.  We choose a structure without padding on all
+# supported architectures:
+#
+# 	a BIGINT
+#	b TEXT
+#	c TEXT
+#
+# We always insert a 7-ascii character string into field 'b', which with a
+# 1-byte varlena header gives an 8 byte inline value.  We always insert a long
+# text string in field 'c', long enough to force toast storage.
+#
+# This formatting produces heap pages where each tuple is 58 bytes long, padded
+# out to 64 bytes for alignment, with the first one on the page starting at
+# offset 8128, as follows:
+#
+#    [ lp_off: 8128 lp_len:   58]
+#    [ lp_off: 8064 lp_len:   58]
+#    [ lp_off: 8000 lp_len:   58]
+#    [ lp_off: 7936 lp_len:   58]
+#    [ lp_off: 7872 lp_len:   58]
+#    [ lp_off: 7808 lp_len:   58]
+#               ...
+#
+
+use constant LP_OFF_BEGIN => 8128;
+use constant LP_OFF_DELTA => 64;
+
+# We choose to read and write binary copies of our table's tuples, using perl's
+# pack() and unpack() functions.  Perl uses a packing code system in which:
+#
+#	L = "Unsigned 32-bit Long",
+#	S = "Unsigned 16-bit Short",
+#	C = "Unsigned 8-bit Octet",
+#	c = "signed 8-bit octet",
+#	q = "signed 64-bit quadword"
+#	
+# Each tuple in our table has a layout as follows:
+#
+#    xx xx xx xx            t_xmin: xxxx		offset = 0		L
+#    xx xx xx xx            t_xmax: xxxx		offset = 4		L
+#    xx xx xx xx          t_field3: xxxx		offset = 8		L
+#    xx xx                   bi_hi: xx			offset = 12		S
+#    xx xx                   bi_lo: xx			offset = 14		S
+#    xx xx                ip_posid: xx			offset = 16		S
+#    xx xx             t_infomask2: xx			offset = 18		S
+#    xx xx              t_infomask: xx			offset = 20		S
+#    xx                     t_hoff: x			offset = 22		C
+#    xx                     t_bits: x			offset = 23		C
+#    xx xx xx xx xx xx xx xx   'a': xxxxxxxx	offset = 24		q
+#    xx xx xx xx xx xx xx xx   'b': xxxxxxxx	offset = 32		Cccccccc
+#    xx xx xx xx xx xx xx xx   'c': xxxxxxxx	offset = 40		SSSS
+#    xx xx xx xx xx xx xx xx      : xxxxxxxx	 ...continued	SSSS
+#    xx xx                        : xx      	 ...continued	S
+#	
+# We could choose to read and write columns 'b' and 'c' in other ways, but
+# it is convenient enough to do it this way.  We define packing code
+# constants here, where they can be compared easily against the layout.
+
+use constant HEAPTUPLE_PACK_CODE => 'LLLSSSSSCCqCcccccccSSSSSSSSS';
+use constant HEAPTUPLE_PACK_LENGTH => 58;     # Total size
+
+# Read a tuple of our table from a heap page.
+#
+# Takes an open filehandle to the heap file, and the offset of the tuple.
+#
+# Rather than returning the binary data from the file, unpacks the data into a
+# perl hash with named fields.  These fields exactly match the ones understood
+# by write_tuple(), below.  Returns a reference to this hash.
+#
+sub read_tuple ($$)
+{
+	my ($fh, $offset) = @_;
+	my ($buffer, %tup);
+	seek($fh, $offset, 0);
+	sysread($fh, $buffer, HEAPTUPLE_PACK_LENGTH);
+	
+	@_ = unpack(HEAPTUPLE_PACK_CODE, $buffer);
+	%tup = (t_xmin => shift,
+			t_xmax => shift,
+			t_field3 => shift,
+			bi_hi => shift,
+			bi_lo => shift,
+			ip_posid => shift,
+			t_infomask2 => shift,
+			t_infomask => shift,
+			t_hoff => shift,
+			t_bits => shift,
+			a => shift,
+			b_header => shift,
+			b_body1 => shift,
+			b_body2 => shift,
+			b_body3 => shift,
+			b_body4 => shift,
+			b_body5 => shift,
+			b_body6 => shift,
+			b_body7 => shift,
+			c1 => shift,
+			c2 => shift,
+			c3 => shift,
+			c4 => shift,
+			c5 => shift,
+			c6 => shift,
+			c7 => shift,
+			c8 => shift,
+			c9 => shift);
+	# Stitch together the text for column 'b'
+	$tup{b} = join('', map { chr($tup{"b_body$_"}) } (1..7));
+	return \%tup;
+}
+
+# Write a tuple of our table to a heap page.
+#
+# Takes an open filehandle to the heap file, the offset of the tuple, and a
+# reference to a hash with the tuple values, as returned by read_tuple().
+# Writes the tuple fields from the hash into the heap file.
+#
+# The purpose of this function is to write a tuple back to disk with some
+# subset of fields modified.  The function does no error checking.  Use
+# cautiously.
+#
+sub write_tuple($$$)
+{
+	my ($fh, $offset, $tup) = @_;
+	my $buffer = pack(HEAPTUPLE_PACK_CODE,
+					$tup->{t_xmin},
+					$tup->{t_xmax},
+					$tup->{t_field3},
+					$tup->{bi_hi},
+					$tup->{bi_lo},
+					$tup->{ip_posid},
+					$tup->{t_infomask2},
+					$tup->{t_infomask},
+					$tup->{t_hoff},
+					$tup->{t_bits},
+					$tup->{a},
+					$tup->{b_header},
+					$tup->{b_body1},
+					$tup->{b_body2},
+					$tup->{b_body3},
+					$tup->{b_body4},
+					$tup->{b_body5},
+					$tup->{b_body6},
+					$tup->{b_body7},
+					$tup->{c1},
+					$tup->{c2},
+					$tup->{c3},
+					$tup->{c4},
+					$tup->{c5},
+					$tup->{c6},
+					$tup->{c7},
+					$tup->{c8},
+					$tup->{c9});
+	seek($fh, $offset, 0);
+	syswrite($fh, $buffer, HEAPTUPLE_PACK_LENGTH);
+	return;
+}
+
+# Set umask so test directories and files are created with default permissions
+umask(0077);
+
+my ($result, $node);
+
+# Set up the node and test table.
+$node = get_new_node('test');
+$node->init;
+$node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->start;
+my $pgdata = $node->data_dir;
+$node->safe_psql('postgres', "CREATE EXTENSION heapcheck");
+
+$node->safe_psql(
+	'postgres', qq(
+		CREATE TABLE public.test (a BIGINT, b TEXT, c TEXT);
+		ALTER TABLE public.test SET (autovacuum_enabled=false);
+		ALTER TABLE public.test ALTER COLUMN c SET STORAGE EXTERNAL;
+	));
+
+my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.test')));
+my $relpath = "$pgdata/$rel";
+
+use constant ROWCOUNT => 12;
+$node->safe_psql('postgres', qq(
+	INSERT INTO public.test (a, b, c)
+		VALUES (
+			12345678,
+			repeat('f', 7),
+			repeat('w', 10000)
+		);
+	VACUUM FREEZE public.test
+	)) for (1..ROWCOUNT);
+
+my $relfrozenxid = $node->safe_psql('postgres',
+	q(select relfrozenxid from pg_class where relname = 'test'));
+
+$node->stop;
+
+# Some #define constants from access/htup_details.h for use while corrupting.
+use constant HEAP_HASNULL            => 0x0001;
+use constant HEAP_XMIN_COMMITTED     => 0x0100;
+use constant HEAP_XMIN_INVALID       => 0x0200;
+use constant HEAP_XMAX_INVALID       => 0x0800;
+use constant HEAP_NATTS_MASK         => 0x07FF;
+
+# Corrupt the tuples, one type of corruption per tuple.  Some types of
+# corruption cause heapcheck_relation to skip to the next tuple without
+# performing any remaining checks, so we can't exercise the system properly if
+# we focus all our corruption on a single tuple.
+#
+my $file;
+open($file, '+<', $relpath);
+binmode $file;
+for (my $offset = LP_OFF_BEGIN, my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++, $offset -= LP_OFF_DELTA)
+{
+	my $tup = read_tuple($file, $offset);
+
+	if ($tupidx == 0)
+	{
+		# Corruptly set xmin < relfrozenxid
+		$tup->{t_xmin} = 3;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
+	}
+	elsif ($tupidx == 1)
+	{
+		# Corruptly set xmin < relfrozenxid, further back
+		$tup->{t_xmin} = 4026531839;		# Note circularity of xid comparison
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
+	}
+	elsif ($tupidx == 2)
+	{
+		# Corruptly set xmax < relminmxid;
+		$tup->{t_xmax} = 4026531839;		# Note circularity of xid comparison
+		$tup->{t_infomask} &= ~HEAP_XMAX_INVALID;
+	}
+	elsif ($tupidx == 3)
+	{
+		# Corrupt the tuple t_hoff, but keep it aligned properly
+		$tup->{t_hoff} += 128;
+	}
+	elsif ($tupidx == 4)
+	{
+		# Corrupt the tuple t_hoff, wrong alignment
+		$tup->{t_hoff} += 3;
+	}
+	elsif ($tupidx == 5)
+	{
+		# Corrupt the tuple t_hoff, underflow but correct alignment
+		$tup->{t_hoff} -= 8;
+	}
+	elsif ($tupidx == 6)
+	{
+		# Corrupt the tuple t_hoff, underflow and wrong alignment
+		$tup->{t_hoff} -= 3;
+	}
+	elsif ($tupidx == 7)
+	{
+		# Corrupt the tuple to look like it has lots of attributes, not just 3
+		$tup->{t_infomask2} |= HEAP_NATTS_MASK;
+	}
+	elsif ($tupidx == 8)
+	{
+		# Corrupt the tuple to look like it has lots of attributes, some of
+		# them null.  This falsely creates the impression that the t_bits
+		# array is longer than just one byte, but t_hoff still says otherwise.
+		$tup->{t_infomask} |= HEAP_HASNULL;
+		$tup->{t_infomask2} |= HEAP_NATTS_MASK;
+		$tup->{t_bits} = 0xAA;
+	}
+	elsif ($tupidx == 9)
+	{
+		# Same as above, but this time t_hoff plays along
+		$tup->{t_infomask} |= HEAP_HASNULL;
+		$tup->{t_infomask2} |= (HEAP_NATTS_MASK & 0x40);
+		$tup->{t_bits} = 0xAA;
+		$tup->{t_hoff} = 32;
+	}
+	elsif ($tupidx == 10)
+	{
+		# Corrupt the bits in column 'b' 1-byte varlena header
+		$tup->{b_header} = 0x80;
+	}
+	elsif ($tupidx == 11)
+	{
+		# Corrupt the bits in column 'c' toast pointer
+		$tup->{c6} = 41;
+		$tup->{c7} = 41;
+	}
+	write_tuple($file, $offset, $tup);
+}
+close($file);
+
+# Run heapcheck_relation on the corrupted file
+$node->start;
+
+$result = $node->safe_psql('postgres', q(SELECT * FROM heapcheck_relation('test')));
+is ($result,
+"0|1|8128|1|58|||tuple xmin = 3 precedes relation relfrozenxid = $relfrozenxid
+0|1|8128|1|58|||tuple xmin = 3 (interpreted as 3) not or no longer valid
+0|2|8064|1|58|||tuple xmin = 4026531839 precedes relation relfrozenxid = $relfrozenxid
+0|2|8064|1|58|||tuple xmin = 4026531839 (interpreted as 18446744073441116159) not or no longer valid
+0|3|8000|1|58|||tuple xmax = 4026531839 precedes relation relfrozenxid = $relfrozenxid
+0|4|7936|1|58|||t_hoff > lp_len (152 > 58)
+0|5|7872|1|58|||t_hoff not max-aligned (27)
+0|6|7808|1|58|||t_hoff < SizeofHeapTupleHeader (16 < 23)
+0|7|7744|1|58|||t_hoff < SizeofHeapTupleHeader (21 < 23)
+0|7|7744|1|58|||t_hoff not max-aligned (21)
+0|8|7680|1|58|||relation natts < tuple natts (3 < 2047)
+0|9|7616|1|58|||SizeofHeapTupleHeader + BITMAPLEN(natts) > t_hoff (23 + 256 > 24)
+0|10|7552|1|58|||relation natts < tuple natts (3 < 67)
+0|11|7488|1|58|2||t_hoff + offset > lp_len (24 + 429496744 > 58)
+0|12|7424|1|58|2|0|final chunk number differs from expected (0 vs. 6)
+0|12|7424|1|58|2|0|toasted value missing from toast table",
+"Expected heapcheck_relation output");
+
+$node->teardown_node;
+$node->clean_node;
+
diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml
index 261a559e81..f32b8ac5ef 100644
--- a/doc/src/sgml/contrib.sgml
+++ b/doc/src/sgml/contrib.sgml
@@ -110,6 +110,7 @@ CREATE EXTENSION <replaceable>module_name</replaceable>;
  &earthdistance;
  &file-fdw;
  &fuzzystrmatch;
+ &heapcheck;
  &hstore;
  &intagg;
  &intarray;
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 68179f71cd..b43d72b8bb 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -122,6 +122,7 @@
 <!ENTITY earthdistance   SYSTEM "earthdistance.sgml">
 <!ENTITY file-fdw        SYSTEM "file-fdw.sgml">
 <!ENTITY fuzzystrmatch   SYSTEM "fuzzystrmatch.sgml">
+<!ENTITY heapcheck       SYSTEM "heapcheck.sgml">
 <!ENTITY hstore          SYSTEM "hstore.sgml">
 <!ENTITY intagg          SYSTEM "intagg.sgml">
 <!ENTITY intarray        SYSTEM "intarray.sgml">
diff --git a/doc/src/sgml/heapcheck.sgml b/doc/src/sgml/heapcheck.sgml
new file mode 100644
index 0000000000..0a9942a452
--- /dev/null
+++ b/doc/src/sgml/heapcheck.sgml
@@ -0,0 +1,133 @@
+<!-- doc/src/sgml/heapcheck.sgml -->
+
+<sect1 id="heapcheck" xreflabel="heapcheck">
+ <title>heapcheck</title>
+
+ <indexterm zone="heapcheck">
+  <primary>heapcheck</primary>
+ </indexterm>
+
+ <para>
+  The <filename>heapcheck</filename> module provides a means for examining the
+  integrity of a table relation.
+ </para>
+
+ <sect2>
+  <title>Functions</title>
+
+  <variablelist>
+   <varlistentry>
+    <term>
+     <function>
+      heapcheck_relation(relation regclass,
+                         blkno OUT bigint,
+                         offnum OUT integer,
+                         lp_off OUT smallint,
+                         lp_flags OUT smallint,
+                         lp_len OUT smallint,
+                         attnum OUT integer,
+                         chunk OUT integer,
+                         msg OUT text)
+      returns record
+     </function>
+    </term>
+    <listitem>
+     <para>
+      Checks for "logical" corruption, where the page is valid but inconsistent
+      with the rest of the database cluster. This can happen due to faulty or
+      ill-conceived backup and restore tools, or bad storage, or user error, or
+      bugs in the server itself.  It checks xmin and xmax values against
+      relfrozenxid and relminmxid, and also validates TOAST pointers.
+     </para>
+
+     <para>
+      For each block in the relation where corruption is detected, for each
+      corruption detected, returns one row containing the following fields:
+     </para>
+     <variablelist>
+      <varlistentry>
+       <term>blkno</term>
+       <listitem>
+        <para>
+         The number of the block containing the corrupt page.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>offnum</term>
+       <listitem>
+        <para>
+         The OffsetNumber of the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_off</term>
+       <listitem>
+        <para>
+         The offset into the page of the line pointer for the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_flags</term>
+       <listitem>
+        <para>
+         The flags in the line pointer for the corrupt tuple.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>lp_len</term>
+       <listitem>
+        <para>
+         The length of the corrupt tuple as recorded in the line pointer.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>attnum</term>
+       <listitem>
+        <para>
+         The attribute number of the corrupt column in the tuple, if the corruption
+         is specific to a column and not the tuple as a whole.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>chunk</term>
+       <listitem>
+        <para>
+         The chunk number of the corrupt toasted attribute, if the corruption
+         is specific to a toasted value.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term>msg</term>
+       <listitem>
+        <para>
+         A human readable message describing the corruption in the page.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <para>
+   By default, this function is executable only by superusers and members
+   of the <literal>pg_stat_scan_tables</literal> role.
+  </para>
+ </sect2>
+
+ <sect2>
+  <title>Author</title>
+
+  <para>
+   Mark Dilger <email>mark.dilger@enterprisedb.com</email>
+  </para>
+ </sect2>
+
+</sect1>
-- 
2.21.1 (Apple Git-122.3)

