From 68d7fc042e3611ef70e90df057c3dee6ed513727 Mon Sep 17 00:00:00 2001
From: Pavel Borisov <pashkin.elfe@gmail.com>
Date: Mon, 8 Feb 2021 12:26:08 +0400
Subject: [PATCH v1 1/2] Make amcheck checking UNIQUE constraint for btree
 index. On index with unique constraint check that only one table entry
 for the equal keys (including all posting list entries) is visible. Report
 error if not and show all index entries violating the constraint under
 warning level.

Authors: Anastasia Lubennikova <a.lubennikova@postgrespro.ru>, Pavel Borisov <pashkin.elfe@gmail.com>
---
 contrib/amcheck/verify_nbtree.c | 267 +++++++++++++++++++++++++++++++-
 1 file changed, 263 insertions(+), 4 deletions(-)

diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index b8c7793d9e0..8a5809f017e 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -83,6 +83,13 @@ typedef struct BtreeCheckState
 	/* Buffer access strategy */
 	BufferAccessStrategy checkstrategy;
 
+	/*
+	 * Info for uniqueness checking.
+	 * Fill these fields once per index check.
+	 */
+	IndexInfo  *indexinfo;
+	Snapshot	snapshot;
+
 	/*
 	 * Mutable state, for verification of particular page:
 	 */
@@ -148,8 +155,20 @@ static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
 static void bt_recheck_sibling_links(BtreeCheckState *state,
 									 BlockNumber btpo_prev_from_target,
 									 BlockNumber leftcurrent);
+static bool heap_entry_is_visible(BtreeCheckState *state, ItemPointer tid);
+static void bt_report_duplicate(BtreeCheckState *state, ItemPointer tid,
+								BlockNumber block, OffsetNumber offset,
+								int posting, ItemPointer nexttid,
+								BlockNumber nblock, OffsetNumber noffset,
+								int nposting);
+static void bt_entry_unique_check(BtreeCheckState *state, IndexTuple itup,
+								  OffsetNumber offset, int *lVis_i,
+								  ItemPointer *lVis_tid,
+								  OffsetNumber *lVis_offset,
+								  BlockNumber *lVis_block);
 static void bt_target_page_check(BtreeCheckState *state);
-static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state);
+static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state,
+												OffsetNumber *rightfirstoffset);
 static void bt_child_check(BtreeCheckState *state, BTScanInsert targetkey,
 						   OffsetNumber downlinkoffnum);
 static void bt_child_highkey_check(BtreeCheckState *state,
@@ -187,6 +206,7 @@ static ItemId PageGetItemIdCareful(BtreeCheckState *state, BlockNumber block,
 static inline ItemPointer BTreeTupleGetHeapTIDCareful(BtreeCheckState *state,
 													  IndexTuple itup, bool nonpivot);
 static inline ItemPointer BTreeTupleGetPointsToTID(IndexTuple itup);
+static bool errflag; /* Output ERROR at the end of amcheck */
 
 /*
  * bt_index_check(index regclass, heapallindexed boolean)
@@ -449,6 +469,15 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 	state->readonly = readonly;
 	state->heapallindexed = heapallindexed;
 	state->rootdescend = rootdescend;
+	state->indexinfo = BuildIndexInfo(state->rel);
+	/*
+	 * We need a snapshot it to check uniqueness of the index
+	 * For better performance, take it once per index check.
+	 */
+	if (state->indexinfo->ii_Unique)
+		state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
+	else
+		state->snapshot = InvalidSnapshot;
 
 	if (state->heapallindexed)
 	{
@@ -632,7 +661,16 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 	}
 
 	/* Be tidy: */
+	if (state->snapshot != InvalidSnapshot)
+		UnregisterSnapshot(state->snapshot);
 	MemoryContextDelete(state->targetcontext);
+
+	if (errflag == true)
+		ereport(ERROR,
+				(errcode(ERRCODE_INDEX_CORRUPTED),
+				errmsg("index \"%s\" is corrupted. There are tuples violating UNIQUE constraint",
+						RelationGetRelationName(state->rel)),
+				errdetail_internal("Details are in the previous log messages under WARNING priority")));
 }
 
 /*
@@ -1006,6 +1044,142 @@ bt_recheck_sibling_links(BtreeCheckState *state,
 								btpo_prev_from_target)));
 }
 
+/* Check visibility of the table entry referenced from nbtree index */
+static bool heap_entry_is_visible(BtreeCheckState *state, ItemPointer tid)
+{
+	bool tid_visible;
+
+	TupleTableSlot *slot = table_slot_create(state->heaprel, NULL);
+	tid_visible = table_tuple_fetch_row_version(state->heaprel,
+							  tid, state->snapshot, slot);
+	if (slot != NULL)
+		ExecDropSingleTupleTableSlot(slot);
+
+	return tid_visible;
+}
+
+/*
+ * Prepare and print error message for unique constrain violation in the btree
+ * index under WARNING level and set flag to report ERROR at the end of check
+ */
+static void bt_report_duplicate(BtreeCheckState *state,
+				 ItemPointer tid, BlockNumber block, OffsetNumber offset,
+				 int posting,
+				 ItemPointer nexttid, BlockNumber nblock, OffsetNumber noffset,
+				 int nposting)
+{
+	char	   	*htid,
+				*nhtid,
+				*itid,
+				*nitid = "",
+				*pposting = "",
+				*pnposting = "";
+
+	errflag = true;
+	htid = psprintf("tid=(%u,%u)",
+					ItemPointerGetBlockNumberNoCheck(tid),
+					ItemPointerGetOffsetNumberNoCheck(tid));
+	nhtid = psprintf("tid=(%u,%u)",
+					ItemPointerGetBlockNumberNoCheck(nexttid),
+					ItemPointerGetOffsetNumberNoCheck(nexttid));
+	itid = psprintf("tid=(%u,%u)", block, offset);
+
+	if (nblock != block || noffset != offset)
+		nitid = psprintf(" tid=(%u,%u)", nblock, noffset);
+
+	if (posting >= 0)
+		pposting = psprintf(" posting %u", posting);
+
+	if (nposting >= 0)
+		pnposting = psprintf(" posting %u", nposting);
+
+		ereport(WARNING,
+			(errcode(ERRCODE_INDEX_CORRUPTED),
+			errmsg("index uniqueness is violated for index \"%s\": "
+					"Index %s%s and%s%s "
+					"(point to heap %s and %s) "
+					"page lsn=%X/%X.",
+					RelationGetRelationName(state->rel),
+					itid, pposting, nitid, pnposting, htid, nhtid,
+					(uint32) (state->targetlsn >> 32),
+					(uint32) state->targetlsn)));
+}
+
+/* Check if current nbtree leaf entry complies with UNIQUE constraint */
+static void bt_entry_unique_check(BtreeCheckState *state, IndexTuple itup,
+		OffsetNumber offset, int *lVis_i, ItemPointer *lVis_tid,
+		OffsetNumber *lVis_offset, BlockNumber *lVis_block)
+{
+	ItemPointer tid;
+	bool has_visible_entry = false;
+
+	/*
+	 * Current tuple has posting list. If TID of any posting list entry is
+	 * visible, and lVis_tid is already valid report duplicate.
+	 */
+	if (BTreeTupleIsPosting(itup))
+	{
+		for (int i = 0; i < BTreeTupleGetNPosting(itup); i++)
+		{
+			tid = BTreeTupleGetPostingN(itup, i);
+			if (heap_entry_is_visible(state, tid))
+			{
+				has_visible_entry = true;
+				if (ItemPointerIsValid (*lVis_tid))
+				{
+					bt_report_duplicate(state,
+											*lVis_tid, *lVis_block,
+											*lVis_offset, *lVis_i,
+											tid, state->targetblock,
+											offset, i);
+				}
+					*lVis_i = i;
+					*lVis_tid = tid;
+					*lVis_offset = offset;
+					*lVis_block = state->targetblock;
+			}
+		}
+	}
+
+	/*
+	 * Current tuple has no posting list.
+	 * If TID is visible, save info about it for next comparisons in the loop in
+	 * bt_page_check(). If also lVis_tid is already valid, report duplicate.
+	 */
+	else
+	{
+		tid = BTreeTupleGetHeapTID(itup);
+		if (heap_entry_is_visible(state, tid))
+		{
+			has_visible_entry = true;
+			if (ItemPointerIsValid (*lVis_tid))
+			{
+				bt_report_duplicate(state,
+											*lVis_tid, *lVis_block,
+											*lVis_offset, *lVis_i,
+											tid, state->targetblock,
+											offset, -1);
+			}
+			*lVis_i = -1;
+			*lVis_tid = tid;
+			*lVis_offset = offset;
+			*lVis_block = state->targetblock;
+		}
+	}
+
+	if (!has_visible_entry && *lVis_block != InvalidBlockNumber &&
+									   *lVis_block != state->targetblock)
+		ereport(WARNING,
+			(errcode(ERRCODE_INDEX_CORRUPTED),
+			errmsg("index uniqueness may be violated for index \"%s\": "
+					"First key on an index page %u is equal to the key on the "
+					"previous page %u and is invisible. Cross-page unique "
+					"constraint violation can be missed. Vacuum the table "
+					"and repeat the check.",
+					RelationGetRelationName(state->rel),
+					state->targetblock, *lVis_block)));
+}
+
 /*
  * Function performs the following checks on target page, or pages ancillary to
  * target page:
@@ -1026,6 +1200,9 @@ bt_recheck_sibling_links(BtreeCheckState *state,
  * - Various checks on the structure of tuples themselves.  For example, check
  *	 that non-pivot tuples have no truncated attributes.
  *
+ * - For index with unique constraint check that only one of table entries for
+ *   equal keys is visible.
+ *
  * Furthermore, when state passed shows ShareLock held, function also checks:
  *
  * - That all child pages respect strict lower bound from parent's pivot
@@ -1047,6 +1224,13 @@ bt_target_page_check(BtreeCheckState *state)
 	OffsetNumber offset;
 	OffsetNumber max;
 	BTPageOpaque topaque;
+	/* last visible entry info for checking indexes with unique constraint */
+	int			 lVis_i = -1; /* the position of last visible item for posting
+							   * tuple. for non-posting tuple (-1)
+							   */
+	ItemPointer	 lVis_tid = NULL;
+	BlockNumber	 lVis_block = InvalidBlockNumber;
+	OffsetNumber lVis_offset = InvalidOffsetNumber;
 
 	topaque = (BTPageOpaque) PageGetSpecialPointer(state->target);
 	max = PageGetMaxOffsetNumber(state->target);
@@ -1446,6 +1630,39 @@ bt_target_page_check(BtreeCheckState *state)
 										(uint32) state->targetlsn)));
 		}
 
+		/*
+		 * If the index is unique, verify entries uniqueness by checking
+		 * heap tuples visibility.
+		 */
+		if (state->indexinfo->ii_Unique && P_ISLEAF(topaque))
+			bt_entry_unique_check(state, itup, offset,
+					&lVis_i, &lVis_tid, &lVis_offset, &lVis_block);
+
+		if (state->indexinfo->ii_Unique && P_ISLEAF(topaque) &&
+				 OffsetNumberNext(offset) <= max)
+		{
+			/* Save current scankey tid */
+			scantid = skey->scantid;
+			/* Invalidate scankey tid to make _bt_compare compare only keys
+			 * in the item to report equality even if heap TIDs are different
+			 */
+			skey->scantid = NULL;
+
+			/*
+			 * If next key tuple is different, invalidate last visible entry
+			 * data (whole index tuple or last posting in index tuple).
+			 */
+			if (_bt_compare(state->rel, skey, state->target,
+						OffsetNumberNext(offset)) != 0)
+			{
+				lVis_i = -1;
+				lVis_tid = NULL;
+				lVis_block = InvalidBlockNumber;
+				lVis_offset = InvalidOffsetNumber;
+			}
+			skey->scantid = scantid; /* Restore saved scan key state */
+		}
+
 		/*
 		 * * Last item check *
 		 *
@@ -1463,12 +1680,14 @@ bt_target_page_check(BtreeCheckState *state)
 		 * available from sibling for various reasons, though (e.g., target is
 		 * the rightmost page on level).
 		 */
-		else if (offset == max)
+		if (offset == max)
 		{
 			BTScanInsert rightkey;
+			/* first offset on a right index page (log only) */
+			OffsetNumber rightfirstoffset = InvalidOffsetNumber;
 
 			/* Get item in next/right page */
-			rightkey = bt_right_page_check_scankey(state);
+			rightkey = bt_right_page_check_scankey(state, &rightfirstoffset);
 
 			if (rightkey &&
 				!invariant_g_offset(state, rightkey, max))
@@ -1503,6 +1722,43 @@ bt_target_page_check(BtreeCheckState *state)
 											(uint32) (state->targetlsn >> 32),
 											(uint32) state->targetlsn)));
 			}
+
+			/*
+			 * If index has unique constraint check that not more than one found
+			 * equal items is visible.
+			 */
+			if (state->indexinfo->ii_Unique && rightkey && P_ISLEAF(topaque))
+			{
+				elog(DEBUG2, "check cross page unique condition");
+
+				/*
+				 * Make _bt_compare compare only index keys without heap TIDs.
+				 * rightkey->scantid is modified destructively but it is ok
+				 * for it is not used later
+				 */
+				rightkey->scantid = NULL;
+
+				/* First key on next page is same */
+				if (_bt_compare(state->rel, rightkey, state->target, max) == 0)
+				{
+					elog(DEBUG2, "cross page equal keys");
+					state->target = palloc_btree_page(state,
+													  state->targetblock + 1);
+					topaque = (BTPageOpaque) PageGetSpecialPointer(state->target);
+
+					if (P_IGNORE(topaque) || !P_ISLEAF(topaque))
+							break;
+
+					itemid = PageGetItemIdCareful(state, state->targetblock + 1,
+												  state->target,
+												  rightfirstoffset);
+					itup = (IndexTuple) PageGetItem(state->target, itemid);
+
+					bt_entry_unique_check(state, itup, rightfirstoffset,
+									&lVis_i, &lVis_tid, &lVis_offset,
+									&lVis_block);
+				}
+			}
 		}
 
 		/*
@@ -1548,9 +1804,11 @@ bt_target_page_check(BtreeCheckState *state)
  *
  * Note that !readonly callers must reverify that target page has not
  * been concurrently deleted.
+ *
+ * Save rightfirstdataoffset for detailed error message.
  */
 static BTScanInsert
-bt_right_page_check_scankey(BtreeCheckState *state)
+bt_right_page_check_scankey(BtreeCheckState *state, OffsetNumber *rightfirstoffset)
 {
 	BTPageOpaque opaque;
 	ItemId		rightitem;
@@ -1713,6 +1971,7 @@ bt_right_page_check_scankey(BtreeCheckState *state)
 		/* Return first data item (if any) */
 		rightitem = PageGetItemIdCareful(state, targetnext, rightpage,
 										 P_FIRSTDATAKEY(opaque));
+		*rightfirstoffset = P_FIRSTDATAKEY(opaque);
 	}
 	else if (!P_ISLEAF(opaque) &&
 			 nline >= OffsetNumberNext(P_FIRSTDATAKEY(opaque)))
-- 
2.28.0

