From a5ab1de277e70fc9a0975f8064fe7db2e7aeb338 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 16 Jul 2020 11:41:28 +0530
Subject: [PATCH v2] Provide a GUC to allow vacuum to continue on corrupted
 tuple

A new GUC to control whether the vacuum will error out immediately
on detection of a corrupted tuple or it will just emit a WARNING for
each such instance and complete the rest of the vacuuming.
---
 doc/src/sgml/config.sgml                      | 21 ++++
 src/backend/access/heap/heapam.c              | 95 ++++++++++++++++---
 src/backend/access/heap/vacuumlazy.c          | 24 ++++-
 src/backend/commands/vacuum.c                 | 15 +++
 src/backend/utils/misc/guc.c                  |  9 ++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/access/heapam_xlog.h              |  3 +-
 src/include/commands/vacuum.h                 |  2 +
 .../test_misc/t/002_vacuum_tolerate_damage.pl | 49 ++++++++++
 9 files changed, 202 insertions(+), 17 deletions(-)
 create mode 100644 src/test/modules/test_misc/t/002_vacuum_tolerate_damage.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index b353c61683..3c1e647e27 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -8250,6 +8250,27 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-vacuum-tolerate-damage" xreflabel="vacuum_tolerate_damage">
+      <term><varname>vacuum_tolerate_damage</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>vacuum_tolerate_damage</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When set to off, which is the default, the <command>VACUUM</command> will raise
+        a ERROR-level error if detected a corrupted tuple.  This causes the operation to
+        stop immediately.
+       </para>
+
+       <para>
+        If set to on, the <command>VACUUM</command> will instead emit a WARNING for each
+        such tuple but the operation will continue.
+        vacuuming.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-bytea-output" xreflabel="bytea_output">
       <term><varname>bytea_output</varname> (<type>enum</type>)
       <indexterm>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e09c8101e7..ab7e38182e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
@@ -5784,6 +5785,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 #define		FRM_RETURN_IS_XID		0x0004
 #define		FRM_RETURN_IS_MULTI		0x0008
 #define		FRM_MARK_COMMITTED		0x0010
+#define		FRM_CORRUPTED_XID		0x0020
 
 /*
  * FreezeMultiXactId
@@ -5805,6 +5807,12 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
  * FRM_RETURN_IS_MULTI
  *		The return value is a new MultiXactId to set as new Xmax.
  *		(caller must obtain proper infomask bits using GetMultiXactIdHintBits)
+ * FRM_CORRUPTED_XID
+ *		This flag will be set if a corrupted multi-xact is detected.
+ *		Ideally, in such cases it will report an error but based on the
+ *		vacuum_damage_elevel, it might just complain about the corruption
+ *		and allow vacuum to continue.  So if the flag is set the caller will
+ *		not do anything to this tuple.
  */
 static TransactionId
 FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
@@ -5836,10 +5844,14 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 		return InvalidTransactionId;
 	}
 	else if (MultiXactIdPrecedes(multi, relminmxid))
-		ereport(ERROR,
+	{
+		ereport(vacuum_damage_elevel(),
 				(errcode(ERRCODE_DATA_CORRUPTED),
 				 errmsg_internal("found multixact %u from before relminmxid %u",
 								 multi, relminmxid)));
+		*flags |= FRM_CORRUPTED_XID;
+		return InvalidTransactionId;
+	}
 	else if (MultiXactIdPrecedes(multi, cutoff_multi))
 	{
 		/*
@@ -5850,10 +5862,14 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 		 */
 		if (MultiXactIdIsRunning(multi,
 								 HEAP_XMAX_IS_LOCKED_ONLY(t_infomask)))
-			ereport(ERROR,
+		{
+			ereport(vacuum_damage_elevel(),
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg_internal("multixact %u from before cutoff %u found to be still running",
 									 multi, cutoff_multi)));
+			*flags |= FRM_CORRUPTED_XID;
+			return InvalidTransactionId;
+		}
 
 		if (HEAP_XMAX_IS_LOCKED_ONLY(t_infomask))
 		{
@@ -5869,10 +5885,14 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 			Assert(TransactionIdIsValid(xid));
 
 			if (TransactionIdPrecedes(xid, relfrozenxid))
-				ereport(ERROR,
+			{
+				ereport(vacuum_damage_elevel(),
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("found update xid %u from before relfrozenxid %u",
 										 xid, relfrozenxid)));
+				*flags |= FRM_CORRUPTED_XID;
+				return InvalidTransactionId;
+			}
 
 			/*
 			 * If the xid is older than the cutoff, it has to have aborted,
@@ -5881,9 +5901,13 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 			if (TransactionIdPrecedes(xid, cutoff_xid))
 			{
 				if (TransactionIdDidCommit(xid))
-					ereport(ERROR,
+				{
+					ereport(vacuum_damage_elevel(),
 							(errcode(ERRCODE_DATA_CORRUPTED),
 							 errmsg_internal("cannot freeze committed update xid %u", xid)));
+					*flags |= FRM_CORRUPTED_XID;
+					return InvalidTransactionId;
+				}
 				*flags |= FRM_INVALIDATE_XMAX;
 				xid = InvalidTransactionId; /* not strictly necessary */
 			}
@@ -5957,10 +5981,15 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 
 			Assert(TransactionIdIsValid(xid));
 			if (TransactionIdPrecedes(xid, relfrozenxid))
-				ereport(ERROR,
+			{
+				ereport(vacuum_damage_elevel(),
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("found update xid %u from before relfrozenxid %u",
 										 xid, relfrozenxid)));
+				pfree(members);
+				*flags |= FRM_CORRUPTED_XID;
+				return InvalidTransactionId;
+			}
 
 			/*
 			 * It's an update; should we keep it?  If the transaction is known
@@ -6007,10 +6036,15 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask,
 			 */
 			if (TransactionIdIsValid(update_xid) &&
 				TransactionIdPrecedes(update_xid, cutoff_xid))
-				ereport(ERROR,
+			{
+				ereport(vacuum_damage_elevel(),
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("found update xid %u from before xid cutoff %u",
 										 update_xid, cutoff_xid)));
+				pfree(members);
+				*flags |= FRM_CORRUPTED_XID;
+				return InvalidTransactionId;
+			}
 
 			/*
 			 * If we determined that it's an Xid corresponding to an update
@@ -6110,7 +6144,8 @@ bool
 heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 						  TransactionId relfrozenxid, TransactionId relminmxid,
 						  TransactionId cutoff_xid, TransactionId cutoff_multi,
-						  xl_heap_freeze_tuple *frz, bool *totally_frozen_p)
+						  xl_heap_freeze_tuple *frz, bool *totally_frozen_p,
+						  bool *corrupted_xid)
 {
 	bool		changed = false;
 	bool		xmax_already_frozen = false;
@@ -6123,6 +6158,9 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	frz->t_infomask = tuple->t_infomask;
 	frz->xmax = HeapTupleHeaderGetRawXmax(tuple);
 
+	*totally_frozen_p = false;
+	*corrupted_xid = false;
+
 	/*
 	 * Process xmin.  xmin_frozen has two slightly different meanings: in the
 	 * !XidIsNormal case, it means "the xmin doesn't need any freezing" (it's
@@ -6137,19 +6175,27 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	else
 	{
 		if (TransactionIdPrecedes(xid, relfrozenxid))
-			ereport(ERROR,
+		{
+			ereport(vacuum_damage_elevel(),
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg_internal("found xmin %u from before relfrozenxid %u",
 									 xid, relfrozenxid)));
+			*corrupted_xid = true;
+			return false;
+		}
 
 		xmin_frozen = TransactionIdPrecedes(xid, cutoff_xid);
 		if (xmin_frozen)
 		{
 			if (!TransactionIdDidCommit(xid))
-				ereport(ERROR,
+			{
+				ereport(vacuum_damage_elevel(),
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("uncommitted xmin %u from before xid cutoff %u needs to be frozen",
 										 xid, cutoff_xid)));
+				*corrupted_xid = true;
+				return false;
+			}
 
 			frz->t_infomask |= HEAP_XMIN_FROZEN;
 			changed = true;
@@ -6175,6 +6221,15 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		newxmax = FreezeMultiXactId(xid, tuple->t_infomask,
 									relfrozenxid, relminmxid,
 									cutoff_xid, cutoff_multi, &flags);
+		/*
+		 * Return as nothing changed if we have detected a corrupted
+		 * multi-xact.
+		 */
+		if (flags & FRM_CORRUPTED_XID)
+		{
+			*corrupted_xid = true;
+			return false;
+		}
 
 		freeze_xmax = (flags & FRM_INVALIDATE_XMAX);
 
@@ -6218,10 +6273,14 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 	else if (TransactionIdIsNormal(xid))
 	{
 		if (TransactionIdPrecedes(xid, relfrozenxid))
-			ereport(ERROR,
+		{
+			ereport(vacuum_damage_elevel(),
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg_internal("found xmax %u from before relfrozenxid %u",
 									 xid, relfrozenxid)));
+			*corrupted_xid = true;
+			return false;
+		}
 
 		if (TransactionIdPrecedes(xid, cutoff_xid))
 		{
@@ -6233,10 +6292,14 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 			 */
 			if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask) &&
 				TransactionIdDidCommit(xid))
-				ereport(ERROR,
+			{
+				ereport(vacuum_damage_elevel(),
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("cannot freeze committed xmax %u",
 										 xid)));
+				*corrupted_xid = true;
+				return false;
+			}
 			freeze_xmax = true;
 		}
 		else
@@ -6249,10 +6312,14 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 		xmax_already_frozen = true;
 	}
 	else
-		ereport(ERROR,
+	{
+		ereport(vacuum_damage_elevel(),
 				(errcode(ERRCODE_DATA_CORRUPTED),
 				 errmsg_internal("found xmax %u (infomask 0x%04x) not frozen, not multi, not normal",
 								 xid, tuple->t_infomask)));
+		*corrupted_xid = true;
+		return false;
+	}
 
 	if (freeze_xmax)
 	{
@@ -6364,11 +6431,13 @@ heap_freeze_tuple(HeapTupleHeader tuple,
 	xl_heap_freeze_tuple frz;
 	bool		do_freeze;
 	bool		tuple_totally_frozen;
+	bool		corrupted_xid;
 
 	do_freeze = heap_prepare_freeze_tuple(tuple,
 										  relfrozenxid, relminmxid,
 										  cutoff_xid, cutoff_multi,
-										  &frz, &tuple_totally_frozen);
+										  &frz, &tuple_totally_frozen,
+										  &corrupted_xid);
 
 	/*
 	 * Note that because this is not a WAL-logged operation, we don't need to
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 1bbc4598f7..df5c765928 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -312,6 +312,7 @@ typedef struct LVRelStats
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
+	bool		corrupted_tuple_detected;
 
 	/* Used for error callback */
 	char	   *indname;
@@ -492,6 +493,7 @@ heap_vacuum_rel(Relation onerel, VacuumParams *params,
 	vacrelstats->num_index_scans = 0;
 	vacrelstats->pages_removed = 0;
 	vacrelstats->lock_waiter_detected = false;
+	vacrelstats->corrupted_tuple_detected = false;
 
 	/* Open all indexes of the relation */
 	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
@@ -591,8 +593,20 @@ heap_vacuum_rel(Relation onerel, VacuumParams *params,
 	if (new_rel_allvisible > new_rel_pages)
 		new_rel_allvisible = new_rel_pages;
 
-	new_frozen_xid = scanned_all_unfrozen ? FreezeLimit : InvalidTransactionId;
-	new_min_multi = scanned_all_unfrozen ? MultiXactCutoff : InvalidMultiXactId;
+	/*
+	 * Don't advance the relfrozenxid and relminmxid, if we have detected a
+	 * corrupted xid while trying to freeze some tuple.
+	 */
+	if (scanned_all_unfrozen && !vacrelstats->corrupted_tuple_detected)
+	{
+		new_frozen_xid = FreezeLimit;
+		new_min_multi = MultiXactCutoff;
+	}
+	else
+	{
+		new_frozen_xid = InvalidTransactionId;
+		new_min_multi = InvalidMultiXactId;
+	}
 
 	vac_update_relstats(onerel,
 						new_rel_pages,
@@ -1444,6 +1458,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 			else
 			{
 				bool		tuple_totally_frozen;
+				bool		corrupted_xid;
 
 				num_tuples += 1;
 				hastup = true;
@@ -1456,8 +1471,11 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
 											  relfrozenxid, relminmxid,
 											  FreezeLimit, MultiXactCutoff,
 											  &frozen[nfrozen],
-											  &tuple_totally_frozen))
+											  &tuple_totally_frozen,
+											  &corrupted_xid))
 					frozen[nfrozen++].offset = offnum;
+				else if (corrupted_xid)
+					vacrelstats->corrupted_tuple_detected = true;
 
 				if (!tuple_totally_frozen)
 					all_frozen = false;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 576c7e63e9..5a093c231c 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -63,6 +63,8 @@ int			vacuum_freeze_table_age;
 int			vacuum_multixact_freeze_min_age;
 int			vacuum_multixact_freeze_table_age;
 
+/* Whether to continue with the vacuuming after detecting a corruped tuple */
+bool vacuum_tolerate_damage = false;
 
 /* A few variables that don't seem worth passing around as parameters */
 static MemoryContext vac_context = NULL;
@@ -2102,3 +2104,16 @@ get_vacopt_ternary_value(DefElem *def)
 {
 	return defGetBoolean(def) ? VACOPT_TERNARY_ENABLED : VACOPT_TERNARY_DISABLED;
 }
+
+/*
+ * vacuum_damage_elevel
+ *
+ * Return the error level for reporting the corrupted tuple while trying to
+ * freeze the tuple during vacuum.  Based on the GUC value, it will return
+ * whether to report with ERROR or to report with WARNING.
+ */
+int
+vacuum_damage_elevel(void)
+{
+	return vacuum_tolerate_damage ? WARNING : ERROR;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 031ca0327f..683812dd4a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2041,6 +2041,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"vacuum_tolerate_damage", PGC_USERSET, ERROR_HANDLING_OPTIONS,
+			gettext_noop("Whether to continue running vacuum after detecting corrupted tuple."),
+		},
+		&vacuum_tolerate_damage,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e430e33c7b..88654f4983 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -666,6 +666,7 @@
 #vacuum_cleanup_index_scale_factor = 0.1	# fraction of total number of tuples
 						# before index cleanup, 0 always performs
 						# index cleanup
+#vacuum_tolerate_damage = false
 #bytea_output = 'hex'			# hex, escape
 #xmlbinary = 'base64'
 #xmloption = 'content'
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index aa17f7df84..61def10e2a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -412,7 +412,8 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  TransactionId cutoff_xid,
 									  TransactionId cutoff_multi,
 									  xl_heap_freeze_tuple *frz,
-									  bool *totally_frozen);
+									  bool *totally_frozen,
+									  bool *corrupted_xid);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 									  xl_heap_freeze_tuple *xlrec_tp);
 extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index a4cd721400..5e711832bc 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -237,6 +237,7 @@ extern int	vacuum_freeze_min_age;
 extern int	vacuum_freeze_table_age;
 extern int	vacuum_multixact_freeze_min_age;
 extern int	vacuum_multixact_freeze_table_age;
+extern bool vacuum_tolerate_damage;
 
 /* Variables for cost-based parallel vacuum */
 extern pg_atomic_uint32 *VacuumSharedCostBalance;
@@ -278,6 +279,7 @@ extern bool vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple,
 									 int options);
 extern Relation vacuum_open_relation(Oid relid, RangeVar *relation,
 									 int options, bool verbose, LOCKMODE lmode);
+extern int vacuum_damage_elevel(void);
 
 /* in commands/analyze.c */
 extern void analyze_rel(Oid relid, RangeVar *relation,
diff --git a/src/test/modules/test_misc/t/002_vacuum_tolerate_damage.pl b/src/test/modules/test_misc/t/002_vacuum_tolerate_damage.pl
new file mode 100644
index 0000000000..acf1266ab1
--- /dev/null
+++ b/src/test/modules/test_misc/t/002_vacuum_tolerate_damage.pl
@@ -0,0 +1,49 @@
+# Verify the vacuum_tolerate_damage GUC
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Initialize a test cluster
+my $node = get_new_node('primary');
+$node->init();
+$node->start;
+
+# Run a SQL command and return psql's stderr
+sub run_sql_command
+{
+	my $sql = shift;
+	my $stderr;
+
+	$node->psql(
+		'postgres',
+		$sql,
+		stderr        => \$stderr);
+	return $stderr;
+}
+
+my $output;
+
+# Insert 2 tuple in the table and update the relfrozenxid for the table to
+# the future xid.
+run_sql_command(
+	"create table test_vac (a int) WITH (autovacuum_enabled = off);
+	 insert into test_vac values (1), (2);
+	 update pg_class set relfrozenxid=txid_current()::text::xid where relname='test_vac';");
+
+$output = run_sql_command('vacuum(freeze, disable_page_skipping) test_vac;');
+ok( $output =~ m/ERROR:.*found xmin.*before relfrozenxid/);
+
+# set the vacuum_tolerate_damage and run again
+$output = run_sql_command('set vacuum_tolerate_damage=true;
+						   vacuum(freeze, disable_page_skipping) test_vac;');
+
+
+# this time we should get WARNING for both the tuples
+ok( scalar( @{[ $output=~/WARNING:.*found xmin.*before relfrozenxid/gi ]}) == 2);
+
+run_sql_command('DROP TABLE test_vac;');
+
+$node->stop('fast');
-- 
2.23.0

