From a6bbab22a5958e13203c63b70189a7bd4ba9484d Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Mon, 15 Mar 2021 17:26:47 -0700
Subject: [PATCH v9 2/2] pg_amcheck: provide additional toast corruption
 information

Modifying amcheck to provide additional information about corrupted
toast, and adjusting pg_amcheck to expect the new amcheck output
format.  Part of the additional toast information was known to
amcheck all along but unhelpfully not included in the corruption
reports, but this commit also adds more thorough checking, including
whether toasted data matches the rawsize and extsize claimed in the
main table's toast pointer, and whether compressed toasted data is
corrupted.  These additional checks were quite intentionally not
performed in the original amcheck version due to performance and
stability concerns, but on reflection those concerns could better be
addressed by adding options to turn the checks off if desired.

While at it, fixing some amcheck messages not to include the
attribute number.  Doing so is redundant, given that attnum is
already one of the returned columns, and just makes the message text
needlessly longer.
---
 contrib/amcheck/amcheck--1.2--1.3.sql     |   4 +
 contrib/amcheck/expected/check_heap.out   |  88 +++++------
 contrib/amcheck/verify_heapam.c           | 175 ++++++++++++++++++----
 src/bin/pg_amcheck/pg_amcheck.c           |  20 ++-
 src/bin/pg_amcheck/t/004_verify_heapam.pl | 131 ++++++++++++++--
 5 files changed, 322 insertions(+), 96 deletions(-)

diff --git a/contrib/amcheck/amcheck--1.2--1.3.sql b/contrib/amcheck/amcheck--1.2--1.3.sql
index 7237ab738c..c77090b5e9 100644
--- a/contrib/amcheck/amcheck--1.2--1.3.sql
+++ b/contrib/amcheck/amcheck--1.2--1.3.sql
@@ -15,6 +15,10 @@ CREATE FUNCTION verify_heapam(relation regclass,
 							  blkno OUT bigint,
 							  offnum OUT integer,
 							  attnum OUT integer,
+							  rawsize OUT integer,
+							  extsize OUT integer,
+							  valueid OUT oid,
+							  toastrelid OUT oid,
 							  msg OUT text)
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'verify_heapam'
diff --git a/contrib/amcheck/expected/check_heap.out b/contrib/amcheck/expected/check_heap.out
index 1fb3823142..54c327583e 100644
--- a/contrib/amcheck/expected/check_heap.out
+++ b/contrib/amcheck/expected/check_heap.out
@@ -6,60 +6,60 @@ ERROR:  invalid skip option
 HINT:  Valid skip options are "all-visible", "all-frozen", and "none".
 -- Check specifying invalid block ranges when verifying an empty table
 SELECT * FROM verify_heapam(relation := 'heaptest', startblock := 0, endblock := 0);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', startblock := 5, endblock := 8);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 -- Check that valid options are not rejected nor corruption reported
 -- for an empty table, and that skip enum-like parameter is case-insensitive
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'none');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-frozen');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-visible');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'None');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'All-Frozen');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'All-Visible');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'NONE');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'ALL-FROZEN');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'ALL-VISIBLE');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 -- Add some data so subsequent tests are not entirely trivial
@@ -69,23 +69,23 @@ INSERT INTO heaptest (a, b)
 -- Check that valid options are not rejected nor corruption reported
 -- for a non-empty table
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'none');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-frozen');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-visible');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', startblock := 0, endblock := 0);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 CREATE ROLE regress_heaptest_role;
@@ -98,8 +98,8 @@ GRANT EXECUTE ON FUNCTION verify_heapam(regclass, boolean, boolean, text, bigint
 -- verify permissions are now sufficient
 SET ROLE regress_heaptest_role;
 SELECT * FROM verify_heapam(relation := 'heaptest');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 RESET ROLE;
@@ -113,23 +113,23 @@ VACUUM (FREEZE, DISABLE_PAGE_SKIPPING) heaptest;
 -- Check that valid options are not rejected nor corruption reported
 -- for a non-empty frozen table
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'none');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-frozen');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', skip := 'all-visible');
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 SELECT * FROM verify_heapam(relation := 'heaptest', startblock := 0, endblock := 0);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 -- Check that partitioned tables (the parent ones) which don't have visibility
@@ -146,8 +146,8 @@ CREATE TABLE test_partition partition OF test_partitioned FOR VALUES IN (1);
 SELECT * FROM verify_heapam('test_partition',
 							startblock := NULL,
 							endblock := NULL);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 -- Check that valid options are not rejected nor corruption reported
@@ -156,8 +156,8 @@ INSERT INTO test_partitioned (a) (SELECT 1 FROM generate_series(1,1000) gs);
 SELECT * FROM verify_heapam('test_partition',
 							startblock := NULL,
 							endblock := NULL);
- blkno | offnum | attnum | msg 
--------+--------+--------+-----
+ blkno | offnum | attnum | rawsize | extsize | valueid | toastrelid | msg 
+-------+--------+--------+---------+---------+---------+------------+-----
 (0 rows)
 
 -- Check that indexes are rejected
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index dc57fe5774..50edc2d6a8 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -18,6 +18,7 @@
 #include "access/toast_internals.h"
 #include "access/visibilitymap.h"
 #include "catalog/pg_am.h"
+#include "common/pg_lzcompress.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
@@ -28,7 +29,7 @@
 PG_FUNCTION_INFO_V1(verify_heapam);
 
 /* The number of columns in tuples returned by verify_heapam */
-#define HEAPCHECK_RELATION_COLS 4
+#define HEAPCHECK_RELATION_COLS 8
 
 /*
  * Despite the name, we use this for reporting problems with both XIDs and
@@ -114,6 +115,8 @@ typedef struct HeapCheckContext
 	AttrNumber	attnum;
 
 	/* Values for iterating over toast for the attribute */
+	struct varatt_external toast_pointer;
+	bool		checking_toastptr;
 	int32		chunkno;
 	int32		attrsize;
 	int32		endchunk;
@@ -130,7 +133,7 @@ typedef struct HeapCheckContext
 /* Internal implementation */
 static void sanity_check_relation(Relation rel);
 static void check_tuple(HeapCheckContext *ctx);
-static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx);
+static int32 check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx);
 
 static bool check_tuple_attribute(HeapCheckContext *ctx);
 static bool check_tuple_header_and_visibilty(HeapTupleHeader tuphdr,
@@ -343,6 +346,7 @@ verify_heapam(PG_FUNCTION_ARGS)
 	if (TransactionIdIsNormal(ctx.relfrozenxid))
 		ctx.oldest_xid = ctx.relfrozenxid;
 
+	ctx.checking_toastptr = false;
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
@@ -517,7 +521,21 @@ report_corruption(HeapCheckContext *ctx, char *msg)
 	values[1] = Int32GetDatum(ctx->offnum);
 	values[2] = Int32GetDatum(ctx->attnum);
 	nulls[2] = (ctx->attnum < 0);
-	values[3] = CStringGetTextDatum(msg);
+	if (ctx->checking_toastptr)
+	{
+		values[3] = Int32GetDatum(ctx->toast_pointer.va_rawsize);
+		values[4] = Int32GetDatum(ctx->toast_pointer.va_extsize);
+		values[5] = ObjectIdGetDatum(ctx->toast_pointer.va_valueid);
+		values[6] = ObjectIdGetDatum(ctx->toast_pointer.va_toastrelid);
+	}
+	else
+	{
+		nulls[3] = true;
+		nulls[4] = true;
+		nulls[5] = true;
+		nulls[6] = true;
+	}
+	values[7] = CStringGetTextDatum(msg);
 
 	/*
 	 * In principle, there is nothing to prevent a scan over a large, highly
@@ -548,6 +566,10 @@ verify_heapam_tupdesc(void)
 	TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, ++a, "offnum", INT4OID, -1, 0);
 	TupleDescInitEntry(tupdesc, ++a, "attnum", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "rawsize", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "extsize", INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "valueid", OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, ++a, "toastrelid", OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, ++a, "msg", TEXTOID, -1, 0);
 	Assert(a == HEAPCHECK_RELATION_COLS);
 
@@ -819,8 +841,11 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
  * tuples that store the toasted value are retrieved and checked in order, with
  * each toast tuple being checked against where we are in the sequence, as well
  * as each toast tuple having its varlena structure sanity checked.
+ *
+ * Returns the size of the chunk, not including the header, or zero if it
+ * cannot be determined due to corruption.
  */
-static void
+static int32
 check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 {
 	int32		curchunk;
@@ -838,7 +863,7 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	{
 		report_corruption(ctx,
 						  pstrdup("toast chunk sequence number is null"));
-		return;
+		return 0;
 	}
 	chunk = DatumGetPointer(fastgetattr(toasttup, 3,
 										ctx->toast_rel->rd_att, &isnull));
@@ -846,7 +871,7 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 	{
 		report_corruption(ctx,
 						  pstrdup("toast chunk data is null"));
-		return;
+		return 0;
 	}
 	if (!VARATT_IS_EXTENDED(chunk))
 		chunksize = VARSIZE(chunk) - VARHDRSZ;
@@ -865,7 +890,7 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		report_corruption(ctx,
 						  psprintf("corrupt extended toast chunk has invalid varlena header: %0x (sequence number %d)",
 								   header, curchunk));
-		return;
+		return 0;
 	}
 
 	/*
@@ -876,14 +901,14 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		report_corruption(ctx,
 						  psprintf("toast chunk sequence number %u does not match the expected sequence number %u",
 								   curchunk, ctx->chunkno));
-		return;
+		return chunksize;
 	}
 	if (curchunk > ctx->endchunk)
 	{
 		report_corruption(ctx,
 						  psprintf("toast chunk sequence number %u exceeds the end chunk sequence number %u",
 								   curchunk, ctx->endchunk));
-		return;
+		return chunksize;
 	}
 
 	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
@@ -893,8 +918,10 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		report_corruption(ctx,
 						  psprintf("toast chunk size %u differs from the expected size %u",
 								   chunksize, expected_size));
-		return;
+		return chunksize;
 	}
+
+	return chunksize;
 }
 
 /*
@@ -920,11 +947,11 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 static bool
 check_tuple_attribute(HeapCheckContext *ctx)
 {
-	struct varatt_external toast_pointer;
 	ScanKeyData toastkey;
 	SysScanDesc toastscan;
 	SnapshotData SnapshotToast;
 	HeapTuple	toasttup;
+	int64		toastsize;		/* corrupt toast could overflow 32 bits */
 	bool		found_toasttup;
 	Datum		attdatum;
 	struct varlena *attr;
@@ -932,6 +959,8 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	uint16		infomask;
 	Form_pg_attribute thisatt;
 
+	Assert(! ctx->checking_toastptr);
+
 	infomask = ctx->tuphdr->t_infomask;
 	thisatt = TupleDescAttr(RelationGetDescr(ctx->rel), ctx->attnum);
 
@@ -940,8 +969,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u with length %u starts at offset %u beyond total tuple length %u",
-								   ctx->attnum,
+						  psprintf("attribute with length %u starts at offset %u beyond total tuple length %u",
 								   thisatt->attlen,
 								   ctx->tuphdr->t_hoff + ctx->offset,
 								   ctx->lp_len));
@@ -961,8 +989,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 		{
 			report_corruption(ctx,
-							  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-									   ctx->attnum,
+							  psprintf("attribute with length %u ends at offset %u beyond total tuple length %u",
 									   thisatt->attlen,
 									   ctx->tuphdr->t_hoff + ctx->offset,
 									   ctx->lp_len));
@@ -994,8 +1021,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		if (va_tag != VARTAG_ONDISK)
 		{
 			report_corruption(ctx,
-							  psprintf("toasted attribute %u has unexpected TOAST tag %u",
-									   ctx->attnum,
+							  psprintf("toasted attribute has unexpected TOAST tag %u",
 									   va_tag));
 			/* We can't know where the next attribute begins */
 			return false;
@@ -1009,8 +1035,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-								   ctx->attnum,
+						  psprintf("attribute with length %u ends at offset %u beyond total tuple length %u",
 								   thisatt->attlen,
 								   ctx->tuphdr->t_hoff + ctx->offset,
 								   ctx->lp_len));
@@ -1037,12 +1062,18 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 	/* It is external, and we're looking at a page on disk */
 
+	/*
+	 * Must copy attr into toast_pointer for alignment considerations
+	 */
+	VARATT_EXTERNAL_GET_POINTER(ctx->toast_pointer, attr);
+	ctx->checking_toastptr = true;
+
 	/* The tuple header better claim to contain toasted values */
 	if (!(infomask & HEAP_HASEXTERNAL))
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but tuple header flag HEAP_HASEXTERNAL not set",
-								   ctx->attnum));
+						  pstrdup("attribute is external but tuple header flag HEAP_HASEXTERNAL not set"));
+		ctx->checking_toastptr = false;
 		return true;
 	}
 
@@ -1050,21 +1081,33 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (!ctx->rel->rd_rel->reltoastrelid)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but relation has no toast relation",
-								   ctx->attnum));
+						  pstrdup("attribute is external but relation has no toast relation"));
+		ctx->checking_toastptr = false;
 		return true;
 	}
 
 	/* If we were told to skip toast checking, then we're done. */
 	if (ctx->toast_rel == NULL)
+	{
+		ctx->checking_toastptr = false;
 		return true;
+	}
 
-	/*
-	 * Must copy attr into toast_pointer for alignment considerations
-	 */
-	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+	if (ctx->toast_pointer.va_extsize > ctx->toast_pointer.va_rawsize - VARHDRSZ)
+		report_corruption(ctx,
+						  pstrdup("toast pointer external size exceeds maximum expected for rawsize"));
+
+
+	if (ctx->toast_pointer.va_toastrelid != ctx->rel->rd_rel->reltoastrelid)
+	{
+		report_corruption(ctx,
+						  psprintf("toast pointer relation oid differs from expected value %u",
+								   ctx->rel->rd_rel->reltoastrelid));
+		ctx->checking_toastptr = false;
+		return true;
+	}
 
-	ctx->attrsize = toast_pointer.va_extsize;
+	ctx->attrsize = ctx->toast_pointer.va_extsize;
 	ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
 	ctx->totalchunks = ctx->endchunk + 1;
 
@@ -1074,7 +1117,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	ScanKeyInit(&toastkey,
 				(AttrNumber) 1,
 				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(toast_pointer.va_valueid));
+				ObjectIdGetDatum(ctx->toast_pointer.va_valueid));
 
 	/*
 	 * Check if any chunks for this toasted object exist in the toast table,
@@ -1087,24 +1130,92 @@ check_tuple_attribute(HeapCheckContext *ctx)
 										   &toastkey);
 	ctx->chunkno = 0;
 	found_toasttup = false;
+	toastsize = 0;
 	while ((toasttup =
 			systable_getnext_ordered(toastscan,
 									 ForwardScanDirection)) != NULL)
 	{
 		found_toasttup = true;
-		check_toast_tuple(toasttup, ctx);
+		toastsize += check_toast_tuple(toasttup, ctx);
 		ctx->chunkno++;
 	}
 	if (!found_toasttup)
 		report_corruption(ctx,
-						  psprintf("toasted value for attribute %u missing from toast table",
-								   ctx->attnum));
+						  pstrdup("toasted value missing from toast table"));
 	else if (ctx->chunkno != (ctx->endchunk + 1))
 		report_corruption(ctx,
 						  psprintf("final toast chunk number %u differs from expected value %u",
 								   ctx->chunkno, (ctx->endchunk + 1)));
 	systable_endscan_ordered(toastscan);
+	if (toastsize != ctx->toast_pointer.va_extsize)
+		report_corruption(ctx,
+						  psprintf("total toast size " INT64_FORMAT " differs from expected extsize",
+								   toastsize));
+	else
+	{
+		Size			allocsize;
+
+		if (!AllocSizeIsValid(ctx->toast_pointer.va_rawsize))
+			report_corruption(ctx,
+							  pstrdup("rawsize too large for attribute to be allocated"));
+
+		allocsize = ctx->toast_pointer.va_extsize + VARHDRSZ;
+		if (!AllocSizeIsValid(allocsize))
+			report_corruption(ctx,
+							  pstrdup("extsize too large for attribute to be allocated"));
+		else
+		{
+			struct varlena *attr;
+
+			/* Fetch all chunks */
+			attr = (struct varlena *) palloc(allocsize);
+			if (VARATT_EXTERNAL_IS_COMPRESSED(ctx->toast_pointer))
+				SET_VARSIZE_COMPRESSED(attr, allocsize);
+			else
+				SET_VARSIZE(attr, allocsize);
+
+			table_relation_fetch_toast_slice(ctx->toast_rel, ctx->toast_pointer.va_valueid,
+											 toastsize, 0, toastsize, attr);
+
+			if (VARATT_IS_COMPRESSED(attr))
+			{
+				struct varlena *uncompressed;
+
+				allocsize = TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ;
+				if (allocsize != ctx->toast_pointer.va_rawsize)
+					report_corruption(ctx,
+									  psprintf("toast data rawsize %zu differs from expected rawsize",
+									  allocsize));
+				else if (AllocSizeIsValid(allocsize))
+				{
+					uncompressed = (struct varlena *) palloc(allocsize);
+					SET_VARSIZE(uncompressed, allocsize);
+					if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
+										TOAST_COMPRESS_SIZE(attr),
+										VARDATA(uncompressed),
+										TOAST_COMPRESS_RAWSIZE(attr), true) < 0)
+					{
+						report_corruption(ctx,
+										  pstrdup("compressed toast data is corrupted"));
+					}
+					else if (VARSIZE(uncompressed) != ctx->toast_pointer.va_rawsize)
+						report_corruption(ctx,
+										  psprintf("decompressed toast size %u differs from expected rawsize",
+												   VARSIZE(attr)));
+
+					pfree(uncompressed);
+				}
+			}
+			else if (VARSIZE(attr) != ctx->toast_pointer.va_rawsize)
+				report_corruption(ctx,
+								  psprintf("detoasted attribute size %u differs from expected rawsize",
+								  VARSIZE(attr)));
+
+			pfree(attr);
+		}
+	}
 
+	ctx->checking_toastptr = false;
 	return true;
 }
 
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index c9d9900693..e5ec7bf2e9 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -799,7 +799,7 @@ prepare_heap_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
 {
 	resetPQExpBuffer(sql);
 	appendPQExpBuffer(sql,
-					  "SELECT blkno, offnum, attnum, msg FROM %s.verify_heapam("
+					  "SELECT blkno, offnum, attnum, rawsize, extsize, valueid, toastrelid, msg FROM %s.verify_heapam("
 					  "\nrelation := %u, on_error_stop := %s, check_toast := %s, skip := '%s'",
 					  rel->datinfo->amcheck_schema,
 					  rel->reloid,
@@ -990,12 +990,24 @@ verify_heap_slot_handler(PGresult *res, PGconn *conn, void *context)
 			const char *msg;
 
 			/* The message string should never be null, but check */
-			if (PQgetisnull(res, i, 3))
+			if (PQgetisnull(res, i, 7))
 				msg = "NO MESSAGE";
 			else
-				msg = PQgetvalue(res, i, 3);
+				msg = PQgetvalue(res, i, 7);
 
-			if (!PQgetisnull(res, i, 2))
+			if (!PQgetisnull(res, i, 6))
+				printf("heap table \"%s\".\"%s\".\"%s\", block %s, offset %s, attribute %s, rawsize %s, extsize %s, valueid %s, toastrelid %s:\n    %s\n",
+					   rel->datinfo->datname, rel->nspname, rel->relname,
+					   PQgetvalue(res, i, 0),	/* blkno */
+					   PQgetvalue(res, i, 1),	/* offnum */
+					   PQgetvalue(res, i, 2),	/* attnum */
+					   PQgetvalue(res, i, 3),	/* toast rawsize */
+					   PQgetvalue(res, i, 4),	/* toast extsize */
+					   PQgetvalue(res, i, 5),	/* toast valueid */
+					   PQgetvalue(res, i, 6),	/* toast relid */
+					   msg);
+
+			else if (!PQgetisnull(res, i, 2))
 				printf("heap table \"%s\".\"%s\".\"%s\", block %s, offset %s, attribute %s:\n    %s\n",
 					   rel->datinfo->datname, rel->nspname, rel->relname,
 					   PQgetvalue(res, i, 0),	/* blkno */
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 16574cb1f8..243b8fc71d 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -224,7 +224,7 @@ my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.te
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 21;
 $node->safe_psql('postgres', qq(
 	INSERT INTO public.test (a, b, c)
 		VALUES (
@@ -259,6 +259,13 @@ select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
 	offset $tup limit 1)));
 }
 
+# Find our toast relation id
+my $toastrelid = $node->safe_psql('postgres', qq(
+	SELECT c.reltoastrelid
+		FROM pg_catalog.pg_class c
+		WHERE c.oid = 'public.test'::regclass
+		));
+
 # Sanity check that our 'test' table on disk layout matches expectations.  If
 # this is not so, we will have to skip the test until somebody updates the test
 # to work on this platform.
@@ -296,7 +303,7 @@ close($file)
 $node->start;
 
 # Ok, Xids and page layout look ok.  We can run corruption tests.
-plan tests => 19;
+plan tests => 31;
 
 # Check that pg_amcheck runs against the uncorrupted table without error.
 $node->command_ok(['pg_amcheck', '-p', $port, 'postgres'],
@@ -310,6 +317,7 @@ $node->stop;
 
 # Some #define constants from access/htup_details.h for use while corrupting.
 use constant HEAP_HASNULL            => 0x0001;
+use constant HEAP_HASEXTERNAL        => 0x0004;
 use constant HEAP_XMAX_LOCK_ONLY     => 0x0080;
 use constant HEAP_XMIN_COMMITTED     => 0x0100;
 use constant HEAP_XMIN_INVALID       => 0x0200;
@@ -323,7 +331,22 @@ use constant HEAP_KEYS_UPDATED       => 0x2000;
 # expect verify_heapam() to return given which fields we expect to be non-null.
 sub header
 {
-	my ($blkno, $offnum, $attnum) = @_;
+	my %fields = @_;
+	my $blkno = $fields{blkno};
+	my $offnum = $fields{offnum};
+	my $attnum = $fields{attnum};
+
+	if (exists $fields{rawsize} ||
+		exists $fields{extsize} ||
+		exists $fields{valueid} ||
+		exists $fields{toastrelid})
+	{
+		my $rawsize = defined $fields{rawsize} ? $fields{rawsize} : '\d+';
+		my $extsize = defined $fields{extsize} ? $fields{extsize} : '\d+';
+		my $valueid = defined $fields{valueid} ? $fields{valueid} : '\d+';
+		my $toastrelid = defined $fields{toastrelid} ? $fields{toastrelid} : '\d+';
+		return qr/heap table "postgres"\."public"\."test", block $blkno, offset $offnum, attribute $attnum, rawsize $rawsize, extsize $extsize, valueid $valueid, toastrelid $toastrelid:\s+/ms;
+	}
 	return qr/heap table "postgres"\."public"\."test", block $blkno, offset $offnum, attribute $attnum:\s+/ms
 		if (defined $attnum);
 	return qr/heap table "postgres"\."public"\."test", block $blkno, offset $offnum:\s+/ms
@@ -349,7 +372,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 	my $offset = $lp_off[$tupidx];
 	my $tup = read_tuple($file, $offset);
 
-	my $header = header(0, $offnum, undef);
+	my $header = header(blkno => 0, offnum => $offnum);
 	if ($offnum == 1)
 	{
 		# Corruptly set xmin < relfrozenxid
@@ -459,6 +482,20 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 			qr/${$header}number of attributes 67 exceeds maximum expected for table 3/;
 	}
 	elsif ($offnum == 12)
+	{
+		# Corrupt infomask to claim there are no external attributes, which conflicts
+		# with column 'c' which is toasted
+		$tup->{t_infomask} &= ~HEAP_HASEXTERNAL;
+		$header = header(blkno => 0,
+						 offnum => $offnum,
+						 attnum => 2,
+						 rawsize => 10004,
+						 extsize => 10000,
+						 toastrelid => $toastrelid);
+		push @expected,
+			qr/${header}attribute is external but tuple header flag HEAP_HASEXTERNAL not set/;
+	}
+	elsif ($offnum == 13)
 	{
 		# Overwrite column 'b' 1-byte varlena header and initial characters to
 		# look like a long 4-byte varlena
@@ -478,18 +515,9 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		$tup->{b_body2} = 0xFF;
 		$tup->{b_body3} = 0xFF;
 
-		$header = header(0, $offnum, 1);
+		$header = header(blkno => 0, offnum => $offnum, attnum => 1);
 		push @expected,
-			qr/${header}attribute \d+ with length \d+ ends at offset \d+ beyond total tuple length \d+/;
-	}
-	elsif ($offnum == 13)
-	{
-		# Corrupt the bits in column 'c' toast pointer
-		$tup->{c_va_valueid} = 0xFFFFFFFF;
-
-		$header = header(0, $offnum, 2);
-		push @expected,
-			qr/${header}toasted value for attribute 2 missing from toast table/;
+			qr/${header}attribute with length \d+ ends at offset \d+ beyond total tuple length \d+/;
 	}
 	elsif ($offnum == 14)
 	{
@@ -501,7 +529,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)	# Last offnum must equal ROWCOUNT
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -511,6 +539,77 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	elsif ($offnum == 16)
+	{
+		# Corrupt column c's toast pointer va_vartag field
+		$tup->{c_va_vartag} = 42;
+		$header = header(blkno => 0,
+						 offnum => $offnum,
+						 attnum => 2);
+		push @expected,
+			qr/$header/,
+			qr/toasted attribute has unexpected TOAST tag 42/;
+	}
+	elsif ($offnum == 17)
+	{
+		# Corrupt column c's toast pointer va_rawsize field to corruptly
+		# make the toast data appear to be compressed, though it is not.
+		$tup->{c_va_rawsize} = 10005;
+		$header = header(blkno => 0,
+						 offnum => $offnum,
+						 attnum => 2,
+						 rawsize => 10005,
+						 extsize => 10000,
+						 toastrelid => $toastrelid);
+		push @expected,
+			qr/${header}/,
+			qr/toast data rawsize \d+ differs from expected rawsize/;
+	}
+	elsif ($offnum == 18)
+	{
+		# Corrupt column c's toast pointer va_extsize field
+		$tup->{c_va_extsize} = 9999999;
+		$header = header(blkno => 0,
+						 offnum => $offnum,
+						 attnum => 2,
+						 rawsize => 10004,
+						 extsize => $tup->{c_va_extsize},
+						 toastrelid => $toastrelid);
+		push @expected,
+			qr/$header/,
+			qr/toast pointer external size exceeds maximum expected for rawsize/,
+			qr/toast chunk size \d+ differs from the expected size \d+/,
+			qr/toast chunk number \d+ differs from expected value \d+/,
+			qr/total toast size 10000 differs from expected extsize/;
+
+	}
+	elsif ($offnum == 19)
+	{
+		# Corrupt column c's toast pointer va_valueid field.  We have not
+		# consumed enough oids for any valueid in the toast table to be large.
+		# Use a large oid for the corruption to avoid colliding with an
+		# existent entry in the toast.
+		my $corrupt = $tup->{c_va_valueid} + 100000000;
+		$tup->{c_va_valueid} = $corrupt;
+		$header = header(blkno => 0,
+						 offnum => $offnum,
+						 attnum => 2,
+						 rawsize => 10004,
+						 extsize => 10000,
+						 valueid => $corrupt,
+						 toastrelid => $toastrelid);
+		push @expected,
+			qr/${header}/,
+			qr/toasted value missing from toast table/;
+	}
+	elsif ($offnum == 20)	# Last offnum must less than or equal to ROWCOUNT-1
+	{
+		# Corrupt column c's toast pointer va_toastrelid field
+		my $otherid = $toastrelid + 1;
+		$tup->{c_va_toastrelid} = $otherid;
+		push @expected,
+			qr/toast pointer relation oid differs from expected value $toastrelid/;
+	}
 	write_tuple($file, $offset, $tup);
 }
 close($file)
-- 
2.21.1 (Apple Git-122.3)

