From caef25c19ed8b9bc9ddd3bd1107c1a40d02d49e1 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Tue, 28 Mar 2023 18:04:15 +0300
Subject: [PATCH 2/2] Revise changes for tuple_update() and tuple_delete()

11470f544e allowed locking updated tuples in tuple_update() and tuple_delete().

This commit improves the things in the following ways.
 * tuple_update() and tuple_delete() can now fetch the old tuple version not
   only after lock, but also after successful UPDATE/DELETE.  That allows to
   evade tuple re-fetching in more general case.
 * Avoid tuple re-fetching for triggers.
 * Improve code API for heap_lock_tuple.  Automatically detect prefetched in
   slot.
---
 src/backend/access/heap/heapam.c         | 194 +++++++++++++----
 src/backend/access/heap/heapam_handler.c | 131 +++++-------
 src/backend/access/table/tableam.c       |  34 ++-
 src/backend/commands/trigger.c           |  31 +--
 src/backend/executor/execReplication.c   |  15 +-
 src/backend/executor/nodeModifyTable.c   | 258 ++++++++++++++++-------
 src/include/access/heapam.h              |  19 +-
 src/include/access/tableam.h             |  71 +++++--
 8 files changed, 484 insertions(+), 269 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8abc101c8cb..985b95cf1aa 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2449,10 +2449,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
 }
 
 /*
- *	heap_delete - delete a tuple
+ *	heap_delete - delete a tuple, optionally fetching it into a slot
  *
  * See table_tuple_delete() for an explanation of the parameters, except that
- * this routine directly takes a tuple rather than a slot.
+ * this routine directly takes a tuple rather than a slot.  Also, we don't
+ * place a lock on the tuple in this function, just fetch the existing version.
  *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax (the last
@@ -2461,8 +2462,9 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  */
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart)
+			CommandId cid, Snapshot crosscheck, int options,
+			TM_FailureData *tmfd, bool changingPart,
+			LazyTupleTableSlot *oldSlot)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -2540,7 +2542,7 @@ l1:
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("attempted to delete invisible tuple")));
 	}
-	else if (result == TM_BeingModified && wait)
+	else if (result == TM_BeingModified && (options & TABLE_MODIFY_WAIT))
 	{
 		TransactionId xwait;
 		uint16		infomask;
@@ -2676,7 +2678,30 @@ l1:
 			tmfd->cmax = HeapTupleHeaderGetCmax(tp.t_data);
 		else
 			tmfd->cmax = InvalidCommandId;
-		UnlockReleaseBuffer(buffer);
+
+		/*
+		 * If we're asked to lock the updated tuple, we just fetch the
+		 * existing tuple.  That let's the caller save some resouces on
+		 * placing the lock.
+		 */
+		if (result == TM_Updated &&
+			(options & TABLE_MODIFY_LOCK_UPDATED))
+		{
+			BufferHeapTupleTableSlot *bslot;
+
+			Assert(TTS_IS_BUFFERTUPLE(&bslot->base.base));
+			bslot = (BufferHeapTupleTableSlot *) LazyTupleTableSlotEval(oldSlot);
+
+			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+			bslot->base.tupdata = tp;
+			ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata,
+										   &bslot->base.base,
+										   buffer);
+		}
+		else
+		{
+			UnlockReleaseBuffer(buffer);
+		}
 		if (have_tuple_lock)
 			UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
 		if (vmbuffer != InvalidBuffer)
@@ -2850,8 +2875,24 @@ l1:
 	 */
 	CacheInvalidateHeapTuple(relation, &tp, NULL);
 
-	/* Now we can release the buffer */
-	ReleaseBuffer(buffer);
+	/* Fetch the old tuple version if we're asked for that. */
+	if (options & TABLE_MODIFY_FETCH_OLD_TUPLE)
+	{
+		BufferHeapTupleTableSlot *bslot;
+
+		Assert(TTS_IS_BUFFERTUPLE(&bslot->base.base));
+		bslot = (BufferHeapTupleTableSlot *) LazyTupleTableSlotEval(oldSlot);
+
+		bslot->base.tupdata = tp;
+		ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata,
+									   &bslot->base.base,
+									   buffer);
+	}
+	else
+	{
+		/* Now we can release the buffer */
+		ReleaseBuffer(buffer);
+	}
 
 	/*
 	 * Release the lmgr tuple lock, if we had it.
@@ -2884,7 +2925,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	result = heap_delete(relation, tid,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, false /* changingPart */ );
+						 &tmfd, false /* changingPart */ , NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -2911,10 +2952,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 }
 
 /*
- *	heap_update - replace a tuple
+ *	heap_update - replace a tuple, optionally fetching it into a slot
  *
  * See table_tuple_update() for an explanation of the parameters, except that
- * this routine directly takes a tuple rather than a slot.
+ * this routine directly takes a tuple rather than a slot.  Also, we don't
+ * place a lock on the tuple in this function, just fetch the existing version.
  *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax (the last
@@ -2923,9 +2965,9 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  */
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-			CommandId cid, Snapshot crosscheck, bool wait,
+			CommandId cid, Snapshot crosscheck, int options,
 			TM_FailureData *tmfd, LockTupleMode *lockmode,
-			TU_UpdateIndexes *update_indexes)
+			TU_UpdateIndexes *update_indexes, LazyTupleTableSlot *oldSlot)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -3102,7 +3144,7 @@ l2:
 	result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);
 
 	/* see below about the "no wait" case */
-	Assert(result != TM_BeingModified || wait);
+	Assert(result != TM_BeingModified || (options & TABLE_MODIFY_WAIT));
 
 	if (result == TM_Invisible)
 	{
@@ -3111,7 +3153,7 @@ l2:
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("attempted to update invisible tuple")));
 	}
-	else if (result == TM_BeingModified && wait)
+	else if (result == TM_BeingModified && (options & TABLE_MODIFY_WAIT))
 	{
 		TransactionId xwait;
 		uint16		infomask;
@@ -3313,7 +3355,30 @@ l2:
 			tmfd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
 		else
 			tmfd->cmax = InvalidCommandId;
-		UnlockReleaseBuffer(buffer);
+
+		/*
+		 * If we're asked to lock the updated tuple, we just fetch the
+		 * existing tuple.  That let's the caller save some resouces on
+		 * placing the lock.
+		 */
+		if (result == TM_Updated &&
+			(options & TABLE_MODIFY_LOCK_UPDATED))
+		{
+			BufferHeapTupleTableSlot *bslot;
+
+			Assert(TTS_IS_BUFFERTUPLE(&bslot->base.base));
+			bslot = (BufferHeapTupleTableSlot *) LazyTupleTableSlotEval(oldSlot);
+
+			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+			bslot->base.tupdata = oldtup;
+			ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata,
+										   &bslot->base.base,
+										   buffer);
+		}
+		else
+		{
+			UnlockReleaseBuffer(buffer);
+		}
 		if (have_tuple_lock)
 			UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
 		if (vmbuffer != InvalidBuffer)
@@ -3791,7 +3856,26 @@ l2:
 	/* Now we can release the buffer(s) */
 	if (newbuf != buffer)
 		ReleaseBuffer(newbuf);
-	ReleaseBuffer(buffer);
+
+	/* Fetch the old tuple version if we're asked for that. */
+	if (options & TABLE_MODIFY_FETCH_OLD_TUPLE)
+	{
+		BufferHeapTupleTableSlot *bslot;
+
+		Assert(TTS_IS_BUFFERTUPLE(&bslot->base.base));
+		bslot = (BufferHeapTupleTableSlot *) LazyTupleTableSlotEval(oldSlot);
+
+		bslot->base.tupdata = oldtup;
+		ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata,
+									   &bslot->base.base,
+									   buffer);
+	}
+	else
+	{
+		/* Now we can release the buffer */
+		ReleaseBuffer(buffer);
+	}
+
 	if (BufferIsValid(vmbuffer_new))
 		ReleaseBuffer(vmbuffer_new);
 	if (BufferIsValid(vmbuffer))
@@ -4000,7 +4084,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
 	result = heap_update(relation, otid, tup,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode, update_indexes);
+						 &tmfd, &lockmode, update_indexes, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -4079,15 +4163,14 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
  * See README.tuplock for a thorough explanation of this mechanism.
  */
 TM_Result
-heap_lock_tuple(Relation relation, HeapTuple tuple,
+heap_lock_tuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
 				CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
-				bool follow_updates,
-				Buffer *buffer, TM_FailureData *tmfd)
+				bool follow_updates, TM_FailureData *tmfd)
 {
 	TM_Result	result;
-	ItemPointer tid = &(tuple->t_self);
 	ItemId		lp;
 	Page		page;
+	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber block;
 	TransactionId xid,
@@ -4099,8 +4182,24 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	bool		skip_tuple_lock = false;
 	bool		have_tuple_lock = false;
 	bool		cleared_all_frozen = false;
+	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+	HeapTuple	tuple = &bslot->base.tupdata;
 
-	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+	/*
+	 * If slot already contains the relevant tuple, re-use the buffer in slot.
+	 */
+	if (TTS_EMPTY(slot) ||
+		slot->tts_tableOid != relation->rd_id ||
+		ItemPointerCompare(&slot->tts_tid, tid) != 0 ||
+		!BufferIsValid(bslot->buffer))
+	{
+		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+	}
+	else
+	{
+		buffer = bslot->buffer;
+		IncrBufferRefCount(buffer);
+	}
 	block = ItemPointerGetBlockNumber(tid);
 
 	/*
@@ -4109,21 +4208,22 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	 * in the middle of changing this, so we'll need to recheck after we have
 	 * the lock.
 	 */
-	if (PageIsAllVisible(BufferGetPage(*buffer)))
+	if (PageIsAllVisible(BufferGetPage(buffer)))
 		visibilitymap_pin(relation, block, &vmbuffer);
 
-	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-	page = BufferGetPage(*buffer);
+	page = BufferGetPage(buffer);
 	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
 	Assert(ItemIdIsNormal(lp));
 
+	tuple->t_self = *tid;
 	tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
 	tuple->t_len = ItemIdGetLength(lp);
 	tuple->t_tableOid = RelationGetRelid(relation);
 
 l3:
-	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
+	result = HeapTupleSatisfiesUpdate(tuple, cid, buffer);
 
 	if (result == TM_Invisible)
 	{
@@ -4152,7 +4252,7 @@ l3:
 		infomask2 = tuple->t_data->t_infomask2;
 		ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
 
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 		/*
 		 * If any subtransaction of the current top transaction already holds
@@ -4304,12 +4404,12 @@ l3:
 					{
 						result = res;
 						/* recovery code expects to have buffer lock held */
-						LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+						LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 						goto failed;
 					}
 				}
 
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4344,7 +4444,7 @@ l3:
 			if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
 				!HEAP_XMAX_IS_EXCL_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4372,7 +4472,7 @@ l3:
 					 * No conflict, but if the xmax changed under us in the
 					 * meantime, start over.
 					 */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 						!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 											 xwait))
@@ -4384,7 +4484,7 @@ l3:
 			}
 			else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/* if the xmax changed in the meantime, start over */
 				if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
@@ -4412,7 +4512,7 @@ l3:
 			TransactionIdIsCurrentTransactionId(xwait))
 		{
 			/* ... but if the xmax changed in the meantime, start over */
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 				!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 									 xwait))
@@ -4434,7 +4534,7 @@ l3:
 		 */
 		if (require_sleep && (result == TM_Updated || result == TM_Deleted))
 		{
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			goto failed;
 		}
 		else if (require_sleep)
@@ -4459,7 +4559,7 @@ l3:
 				 */
 				result = TM_WouldBlock;
 				/* recovery code expects to have buffer lock held */
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 				goto failed;
 			}
 
@@ -4485,7 +4585,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4525,7 +4625,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4551,12 +4651,12 @@ l3:
 				{
 					result = res;
 					/* recovery code expects to have buffer lock held */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					goto failed;
 				}
 			}
 
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 			/*
 			 * xwait is done, but if xwait had just locked the tuple then some
@@ -4578,7 +4678,7 @@ l3:
 				 * don't check for this in the multixact case, because some
 				 * locker transactions might still be running.
 				 */
-				UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
+				UpdateXmaxHintBits(tuple->t_data, buffer, xwait);
 			}
 		}
 
@@ -4637,9 +4737,9 @@ failed:
 	 */
 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
 	{
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 		visibilitymap_pin(relation, block, &vmbuffer);
-		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		goto l3;
 	}
 
@@ -4702,7 +4802,7 @@ failed:
 		cleared_all_frozen = true;
 
 
-	MarkBufferDirty(*buffer);
+	MarkBufferDirty(buffer);
 
 	/*
 	 * XLOG stuff.  You might think that we don't need an XLOG record because
@@ -4722,7 +4822,7 @@ failed:
 		XLogRecPtr	recptr;
 
 		XLogBeginInsert();
-		XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
 		xlrec.locking_xid = xid;
@@ -4743,7 +4843,7 @@ failed:
 	result = TM_Ok;
 
 out_locked:
-	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 out_unlocked:
 	if (BufferIsValid(vmbuffer))
@@ -4761,6 +4861,8 @@ out_unlocked:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, tid, mode);
 
+	ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
+
 	return result;
 }
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 2bddf562c2f..9686ecc3cde 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -45,12 +45,6 @@
 #include "utils/builtins.h"
 #include "utils/rel.h"
 
-static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
-											Snapshot snapshot, TupleTableSlot *slot,
-											CommandId cid, LockTupleMode mode,
-											LockWaitPolicy wait_policy, uint8 flags,
-											TM_FailureData *tmfd, bool updated);
-
 static void reform_and_rewrite_tuple(HeapTuple tuple,
 									 Relation OldHeap, Relation NewHeap,
 									 Datum *values, bool *isnull, RewriteState rwstate);
@@ -304,9 +298,9 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
 
 static TM_Result
 heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
-					Snapshot snapshot, Snapshot crosscheck, bool wait,
+					Snapshot snapshot, Snapshot crosscheck, int options,
 					TM_FailureData *tmfd, bool changingPart,
-					LazyTupleTableSlot *lockedSlot)
+					LazyTupleTableSlot *oldSlot)
 {
 	TM_Result	result;
 
@@ -315,33 +309,31 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	result = heap_delete(relation, tid, cid, crosscheck, wait,
-						 tmfd, changingPart);
+	result = heap_delete(relation, tid, cid, crosscheck, options,
+						 tmfd, changingPart, oldSlot);
 
 	/*
 	 * If the tuple has been concurrently updated, then get the lock on it.
-	 * (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
-	 * lock held retry of delete should succeed even if there are more
-	 * concurrent update attempts.
+	 * (Do this if caller asked for that by setting the
+	 * TABLE_MODIFY_LOCK_UPDATED option 'oldSlot'.)  With the lock held retry
+	 * of the delete should succeed even if there are more concurrent update
+	 * attempts.
 	 */
-	if (result == TM_Updated && lockedSlot)
+	if (result == TM_Updated && (options & TABLE_MODIFY_LOCK_UPDATED))
 	{
 		TupleTableSlot *evalSlot;
 
-		Assert(wait);
-
-		evalSlot = LazyTupleTableSlotEval(lockedSlot);
+		evalSlot = LazyTupleTableSlotEval(oldSlot);
 		result = heapam_tuple_lock_internal(relation, tid, snapshot,
 											evalSlot, cid, LockTupleExclusive,
-											LockWaitBlock,
+											(options & TABLE_MODIFY_WAIT) ?
+											LockWaitBlock :
+											LockWaitSkip,
 											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											tmfd, true);
+											tmfd);
 
 		if (result == TM_Ok)
-		{
-			tmfd->traversed = true;
 			return TM_Updated;
-		}
 	}
 
 	return result;
@@ -351,9 +343,9 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
-					bool wait, TM_FailureData *tmfd,
+					int options, TM_FailureData *tmfd,
 					LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes,
-					LazyTupleTableSlot *lockedSlot)
+					LazyTupleTableSlot *oldSlot)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@@ -363,8 +355,8 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode, update_indexes);
+	result = heap_update(relation, otid, tuple, cid, crosscheck, options,
+						 tmfd, lockmode, update_indexes, oldSlot);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
@@ -373,8 +365,8 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	 * Note: heap_update returns the tid (location) of the new tuple in the
 	 * t_self field.
 	 *
-	 * If the update is not HOT, we must update all indexes. If the update
-	 * is HOT, it could be that we updated summarized columns, so we either
+	 * If the update is not HOT, we must update all indexes. If the update is
+	 * HOT, it could be that we updated summarized columns, so we either
 	 * update only summarized indexes, or none at all.
 	 */
 	if (result != TM_Ok)
@@ -393,28 +385,28 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 
 	/*
 	 * If the tuple has been concurrently updated, then get the lock on it.
-	 * (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
-	 * lock held retry of update should succeed even if there are more
-	 * concurrent update attempts.
+	 * (Do this if caller asked for that by setting the
+	 * TABLE_MODIFY_LOCK_UPDATED option 'oldSlot'.)  With the lock held retry
+	 * of the update should succeed even if there are more concurrent update
+	 * attempts.
 	 */
-	if (result == TM_Updated && lockedSlot)
+	if (result == TM_Updated && (options & TABLE_MODIFY_LOCK_UPDATED))
 	{
 		TupleTableSlot *evalSlot;
 
-		Assert(wait);
+		Assert(options & TABLE_MODIFY_WAIT);
+		evalSlot = LazyTupleTableSlotEval(oldSlot);
 
-		evalSlot = LazyTupleTableSlotEval(lockedSlot);
 		result = heapam_tuple_lock_internal(relation, otid, snapshot,
 											evalSlot, cid, *lockmode,
-											LockWaitBlock,
+											(options & TABLE_MODIFY_WAIT) ?
+											LockWaitBlock :
+											LockWaitSkip,
 											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											tmfd, true);
+											tmfd);
 
 		if (result == TM_Ok)
-		{
-			tmfd->traversed = true;
 			return TM_Updated;
-		}
 	}
 
 	return result;
@@ -425,22 +417,6 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 				  TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
 				  LockWaitPolicy wait_policy, uint8 flags,
 				  TM_FailureData *tmfd)
-{
-	return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid,
-									  mode, wait_policy, flags, tmfd, false);
-}
-
-/*
- * This routine does the work for heapam_tuple_lock(), but also support
- * `updated` argument to re-use the work done by heapam_tuple_update() or
- * heapam_tuple_delete() on figuring out that tuple was concurrently updated.
- */
-static TM_Result
-heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
-						   Snapshot snapshot, TupleTableSlot *slot,
-						   CommandId cid, LockTupleMode mode,
-						   LockWaitPolicy wait_policy, uint8 flags,
-						   TM_FailureData *tmfd, bool updated)
 {
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
 	TM_Result	result;
@@ -454,27 +430,14 @@ heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
 	Assert(TTS_IS_BUFFERTUPLE(slot));
 
 tuple_lock_retry:
-	tuple->t_self = *tid;
-	if (!updated)
-		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
-								 follow_updates, &buffer, tmfd);
-	else
-		result = TM_Updated;
+	result = heap_lock_tuple(relation, tid, slot, cid, mode, wait_policy,
+							 follow_updates, tmfd);
 
 	if (result == TM_Updated &&
 		(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
 	{
-		if (!updated)
-		{
-			/* Should not encounter speculative tuple on recheck */
-			Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
-
-			ReleaseBuffer(buffer);
-		}
-		else
-		{
-			updated = false;
-		}
+		/* Should not encounter speculative tuple on recheck */
+		Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
 
 		if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
 		{
@@ -505,6 +468,8 @@ tuple_lock_retry:
 				tuple->t_self = *tid;
 				if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, true))
 				{
+					ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
+
 					/*
 					 * If xmin isn't what we're expecting, the slot must have
 					 * been recycled and reused for an unrelated tuple.  This
@@ -518,7 +483,7 @@ tuple_lock_retry:
 					if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
 											 priorXmax))
 					{
-						ReleaseBuffer(buffer);
+						ExecClearTuple(slot);
 						return TM_Deleted;
 					}
 
@@ -538,7 +503,7 @@ tuple_lock_retry:
 					 */
 					if (TransactionIdIsValid(SnapshotDirty.xmax))
 					{
-						ReleaseBuffer(buffer);
+						ExecClearTuple(slot);
 						switch (wait_policy)
 						{
 							case LockWaitBlock:
@@ -584,16 +549,19 @@ tuple_lock_retry:
 						 * above.
 						 */
 						tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data);
-						ReleaseBuffer(buffer);
 						return TM_SelfModified;
 					}
 
 					/*
 					 * This is a live tuple, so try to lock it again.
 					 */
-					ReleaseBuffer(buffer);
 					goto tuple_lock_retry;
 				}
+				else
+				{
+					if (BufferIsValid(buffer))
+						ReleaseBuffer(buffer);
+				}
 
 				/*
 				 * If the referenced slot was actually empty, the latest
@@ -602,7 +570,7 @@ tuple_lock_retry:
 				 */
 				if (tuple->t_data == NULL)
 				{
-					Assert(!BufferIsValid(buffer));
+					ExecClearTuple(slot);
 					return TM_Deleted;
 				}
 
@@ -612,7 +580,7 @@ tuple_lock_retry:
 				if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
 										 priorXmax))
 				{
-					ReleaseBuffer(buffer);
+					ExecClearTuple(slot);
 					return TM_Deleted;
 				}
 
@@ -633,7 +601,7 @@ tuple_lock_retry:
 				if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
 				{
 					/* deleted, so forget about it */
-					ReleaseBuffer(buffer);
+					ExecClearTuple(slot);
 					return TM_Deleted;
 				}
 
@@ -641,13 +609,13 @@ tuple_lock_retry:
 				*tid = tuple->t_data->t_ctid;
 				/* updated row should have xmin matching this xmax */
 				priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
-				ReleaseBuffer(buffer);
 				/* loop back to fetch next in chain */
 			}
 		}
 		else
 		{
 			/* tuple was deleted, so give up */
+			ExecClearTuple(slot);
 			return TM_Deleted;
 		}
 	}
@@ -655,9 +623,6 @@ tuple_lock_retry:
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	/* store in slot, transferring existing pin */
-	ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
-
 	return result;
 }
 
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 2a1a6ced3c7..223ab8e8f63 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -297,17 +297,28 @@ simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
  * via ereport().
  */
 void
-simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
+simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot,
+						  TupleTableSlot *oldSlot)
 {
 	TM_Result	result;
 	TM_FailureData tmfd;
+	int			options = TABLE_MODIFY_WAIT;	/* wait for commit */
+	LazyTupleTableSlot *lazyOldSlot = NULL,
+				lazyOldSlotData;
+
+	if (oldSlot)
+	{
+		lazyOldSlot = &lazyOldSlotData;
+		MakeLazyTupleTableSlotWithSlot(lazyOldSlot, oldSlot);
+		options |= TABLE_MODIFY_FETCH_OLD_TUPLE;
+	}
 
 	result = table_tuple_delete(rel, tid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
-								true /* wait for commit */ ,
+								options,
 								&tmfd, false /* changingPart */ ,
-								NULL);
+								lazyOldSlot);
 
 	switch (result)
 	{
@@ -346,18 +357,29 @@ void
 simple_table_tuple_update(Relation rel, ItemPointer otid,
 						  TupleTableSlot *slot,
 						  Snapshot snapshot,
-						  TU_UpdateIndexes *update_indexes)
+						  TU_UpdateIndexes *update_indexes,
+						  TupleTableSlot *oldSlot)
 {
 	TM_Result	result;
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
+	int			options = TABLE_MODIFY_WAIT;	/* wait for commit */
+	LazyTupleTableSlot *lazyOldSlot = NULL,
+				lazyOldSlotData;
+
+	if (oldSlot)
+	{
+		lazyOldSlot = &lazyOldSlotData;
+		MakeLazyTupleTableSlotWithSlot(lazyOldSlot, oldSlot);
+		options |= TABLE_MODIFY_FETCH_OLD_TUPLE;
+	}
 
 	result = table_tuple_update(rel, otid, slot,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
-								true /* wait for commit */ ,
+								options,
 								&tmfd, &lockmode, update_indexes,
-								NULL);
+								lazyOldSlot);
 
 	switch (result)
 	{
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 0f2de7e2e01..fd693b6779f 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -2796,18 +2796,9 @@ ExecARDeleteTriggers(EState *estate,
 	{
 		TupleTableSlot *slot = ExecGetTriggerOldSlot(estate, relinfo);
 
-		Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
-		if (fdw_trigtuple == NULL)
-			GetTupleForTrigger(estate,
-							   NULL,
-							   relinfo,
-							   tupleid,
-							   LockTupleExclusive,
-							   slot,
-							   NULL,
-							   NULL,
-							   NULL);
-		else
+		Assert(!ItemPointerIsValid(tupleid) || !TTS_EMPTY(slot) || fdw_trigtuple != NULL);
+
+		if (fdw_trigtuple != NULL)
 			ExecForceStoreHeapTuple(fdw_trigtuple, slot, false);
 
 		AfterTriggerSaveEvent(estate, relinfo, NULL, NULL,
@@ -3119,19 +3110,11 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
 		tupsrc = src_partinfo ? src_partinfo : relinfo;
 		oldslot = ExecGetTriggerOldSlot(estate, tupsrc);
 
-		if (fdw_trigtuple == NULL && ItemPointerIsValid(tupleid))
-			GetTupleForTrigger(estate,
-							   NULL,
-							   tupsrc,
-							   tupleid,
-							   LockTupleExclusive,
-							   oldslot,
-							   NULL,
-							   NULL,
-							   NULL);
-		else if (fdw_trigtuple != NULL)
+		Assert(!ItemPointerIsValid(tupleid) || !TTS_EMPTY(oldslot) || fdw_trigtuple != NULL);
+
+		if (fdw_trigtuple != NULL)
 			ExecForceStoreHeapTuple(fdw_trigtuple, oldslot, false);
-		else
+		else if (!ItemPointerIsValid(tupleid))
 			ExecClearTuple(oldslot);
 
 		AfterTriggerSaveEvent(estate, relinfo,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 9dd71684615..774f59717bb 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -545,6 +545,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 	{
 		List	   *recheckIndexes = NIL;
 		TU_UpdateIndexes update_indexes;
+		TupleTableSlot *oldSlot = NULL;
 
 		/* Compute stored generated columns */
 		if (rel->rd_att->constr &&
@@ -558,8 +559,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 		if (rel->rd_rel->relispartition)
 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
 
+		if (resultRelInfo->ri_TrigDesc &&
+			resultRelInfo->ri_TrigDesc->trig_update_after_row)
+			oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
 		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
-								  &update_indexes);
+								  &update_indexes, oldSlot);
 
 		if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
 			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
@@ -604,8 +609,14 @@ ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
 
 	if (!skip_tuple)
 	{
+		TupleTableSlot *oldSlot = NULL;
+
+		if (resultRelInfo->ri_TrigDesc &&
+			resultRelInfo->ri_TrigDesc->trig_delete_after_row)
+			oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
 		/* OK, delete the tuple */
-		simple_table_tuple_delete(rel, tid, estate->es_snapshot);
+		simple_table_tuple_delete(rel, tid, estate->es_snapshot, oldSlot);
 
 		/* AFTER ROW DELETE Triggers */
 		ExecARDeleteTriggers(estate, resultRelInfo,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 2f6c97a8404..3f21283c031 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -111,7 +111,7 @@ typedef struct UpdateContext
 {
 	bool		updated;		/* did UPDATE actually occur? */
 	bool		crossPartUpdate;	/* was it a cross-partition update? */
-	TU_UpdateIndexes updateIndexes;	/* Which index updates are required? */
+	TU_UpdateIndexes updateIndexes; /* Which index updates are required? */
 
 	/*
 	 * Lock mode to acquire on the latest tuple version before performing
@@ -1356,30 +1356,19 @@ GetEPQSlot(void *arg)
  */
 static TM_Result
 ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
-			  ItemPointer tupleid, bool changingPart, bool lockUpdated)
+			  ItemPointer tupleid, bool changingPart, int options,
+			  LazyTupleTableSlot *oldSlot)
 {
 	EState	   *estate = context->estate;
-	GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
-	LazyTupleTableSlot lazyEPQSlot,
-			   *lazyEPQSlotPtr;
 
-	if (lockUpdated)
-	{
-		MakeLazyTupleTableSlotWithCallback(&lazyEPQSlot, GetEPQSlot, &slotArg);
-		lazyEPQSlotPtr = &lazyEPQSlot;
-	}
-	else
-	{
-		lazyEPQSlotPtr = NULL;
-	}
 	return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
 							  estate->es_output_cid,
 							  estate->es_snapshot,
 							  estate->es_crosscheck_snapshot,
-							  true /* wait for commit */ ,
+							  options /* wait for commit */ ,
 							  &context->tmfd,
 							  changingPart,
-							  lazyEPQSlotPtr);
+							  oldSlot);
 }
 
 /*
@@ -1425,6 +1414,35 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 						 ar_delete_trig_tcs, changingPart);
 }
 
+/*
+ * Check if DELETE needs to save the old tuple into ri_TrigOldSlot.
+ */
+static bool
+ExecDeleteNeedsOldTupleForTrigger(ModifyTableContext *context,
+								  ResultRelInfo *relinfo,
+								  bool changingPart)
+{
+	TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
+	TransitionCaptureState *transition_capture = context->mtstate->mt_transition_capture;
+
+	if ((trigdesc && trigdesc->trig_delete_after_row) ||
+		(transition_capture && transition_capture->tcs_delete_old_table))
+		return true;
+
+	if (changingPart)
+	{
+		TriggerDesc *rootTrigdesc = relinfo->ri_TrigDesc;
+
+		if ((rootTrigdesc && rootTrigdesc->trig_update_after_row) ||
+			(transition_capture &&
+			 (transition_capture->tcs_update_old_table ||
+			  transition_capture->tcs_update_new_table)))
+			return true;
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		ExecDelete
  *
@@ -1514,6 +1532,35 @@ ExecDelete(ModifyTableContext *context,
 	}
 	else
 	{
+		GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
+		LazyTupleTableSlot oldSlot;
+		int			options = TABLE_MODIFY_WAIT;
+
+		/* Initialize the slot to store old tuple */
+		if (ExecDeleteNeedsOldTupleForTrigger(context, resultRelInfo, changingPart))
+		{
+			TupleTableSlot *triggerOldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
+			ExecClearTuple(triggerOldSlot);
+			MakeLazyTupleTableSlotWithSlot(&oldSlot, triggerOldSlot);
+			options |= TABLE_MODIFY_FETCH_OLD_TUPLE;
+		}
+		else if (processReturning &&
+				 resultRelInfo->ri_projectReturning &&
+				 !resultRelInfo->ri_FdwRoutine)
+		{
+			MakeLazyTupleTableSlotWithSlot(&oldSlot,
+										   ExecGetReturningSlot(estate, resultRelInfo));
+			options |= TABLE_MODIFY_FETCH_OLD_TUPLE;
+		}
+		else if (!IsolationUsesXactSnapshot())
+		{
+			MakeLazyTupleTableSlotWithCallback(&oldSlot, GetEPQSlot, &slotArg);
+		}
+
+		if (!IsolationUsesXactSnapshot())
+			options |= TABLE_MODIFY_LOCK_UPDATED;
+
 		/*
 		 * delete the tuple
 		 *
@@ -1525,7 +1572,7 @@ ExecDelete(ModifyTableContext *context,
 		 */
 ldelete:
 		result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart,
-							   !IsolationUsesXactSnapshot());
+							   options, &oldSlot);
 
 		switch (result)
 		{
@@ -1565,47 +1612,53 @@ ldelete:
 				return NULL;
 
 			case TM_Ok:
+				if (options & TABLE_MODIFY_FETCH_OLD_TUPLE)
+				{
+					TupleTableSlot *oldSlotEval = LazyTupleTableSlotEval(&oldSlot);
+
+					ExecMaterializeSlot(oldSlotEval);
+					if (processReturning &&
+						resultRelInfo->ri_projectReturning &&
+						!resultRelInfo->ri_FdwRoutine)
+					{
+						TupleTableSlot *returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
+
+						if (returningSlot != oldSlotEval)
+							ExecCopySlot(returningSlot, oldSlotEval);
+					}
+				}
 				break;
 
 			case TM_Updated:
 				{
-					TupleTableSlot *inputslot;
 					TupleTableSlot *epqslot;
+					TupleTableSlot *oldSlotEval = LazyTupleTableSlotEval(&oldSlot);
 
 					if (IsolationUsesXactSnapshot())
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
 
-					/*
-					 * ExecDeleteAct() has already locked the old tuple for
-					 * us. Now we need to copy it to the right slot.
-					 */
-					EvalPlanQualBegin(context->epqstate);
-					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
-												 resultRelInfo->ri_RangeTableIndex);
-
 					/*
 					 * Save locked table for further processing for RETURNING
 					 * clause.
 					 */
+					ExecMaterializeSlot(oldSlotEval);
 					if (processReturning &&
 						resultRelInfo->ri_projectReturning &&
 						!resultRelInfo->ri_FdwRoutine)
 					{
-						TupleTableSlot *returningSlot;
+						TupleTableSlot *returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
 
-						returningSlot = ExecGetReturningSlot(estate,
-															 resultRelInfo);
-						ExecCopySlot(returningSlot, inputslot);
-						ExecMaterializeSlot(returningSlot);
+						if (returningSlot != oldSlotEval)
+							ExecCopySlot(returningSlot, oldSlotEval);
 					}
 
 					Assert(context->tmfd.traversed);
 					epqslot = EvalPlanQual(context->epqstate,
 										   resultRelationDesc,
 										   resultRelInfo->ri_RangeTableIndex,
-										   inputslot);
+										   oldSlotEval);
 					if (TupIsNull(epqslot))
 						/* Tuple not passing quals anymore, exiting... */
 						return NULL;
@@ -1682,12 +1735,7 @@ ldelete:
 			{
 				ExecForceStoreHeapTuple(oldtuple, slot, false);
 			}
-			else if (TupIsNull(slot))
-			{
-				if (!table_tuple_fetch_row_version(resultRelationDesc, tupleid,
-												   SnapshotAny, slot))
-					elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING");
-			}
+			Assert(!TupIsNull(slot));
 		}
 
 		rslot = ExecProcessReturning(resultRelInfo, slot, context->planSlot);
@@ -1965,15 +2013,13 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo,
 static TM_Result
 ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 			  ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-			  bool canSetTag, bool lockUpdated, UpdateContext *updateCxt)
+			  bool canSetTag, int options, LazyTupleTableSlot *oldSlot,
+			  UpdateContext *updateCxt)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 	bool		partition_constraint_failed;
 	TM_Result	result;
-	GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
-	LazyTupleTableSlot lazyEPQSlot,
-			   *lazyEPQSlotPtr;
 
 	updateCxt->crossPartUpdate = false;
 
@@ -2099,23 +2145,14 @@ lreplace:
 	 * for referential integrity updates in transaction-snapshot mode
 	 * transactions.
 	 */
-	if (lockUpdated)
-	{
-		MakeLazyTupleTableSlotWithCallback(&lazyEPQSlot, GetEPQSlot, &slotArg);
-		lazyEPQSlotPtr = &lazyEPQSlot;
-	}
-	else
-	{
-		lazyEPQSlotPtr = NULL;
-	}
 	result = table_tuple_update(resultRelationDesc, tupleid, slot,
 								estate->es_output_cid,
 								estate->es_snapshot,
 								estate->es_crosscheck_snapshot,
-								true /* wait for commit */ ,
+								options /* wait for commit */ ,
 								&context->tmfd, &updateCxt->lockmode,
 								&updateCxt->updateIndexes,
-								lazyEPQSlotPtr);
+								oldSlot);
 	if (result == TM_Ok)
 		updateCxt->updated = true;
 
@@ -2239,6 +2276,34 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
 						 tupleid, NULL, newslot, NIL, NULL, true);
 }
 
+/*
+ * Check if UPDATE needs to save the old tuple into ri_TrigOldSlot.
+ */
+static bool
+ExecUpdateNeedsOldTupleForTrigger(ModifyTableContext *context,
+								  ResultRelInfo *relinfo)
+{
+	TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
+	TransitionCaptureState *transition_capture;
+
+	transition_capture = context->mtstate->mt_transition_capture;
+	if ((trigdesc && trigdesc->trig_update_after_row) ||
+		(transition_capture && transition_capture->tcs_delete_old_table))
+		return true;
+
+	if (context->mtstate->operation == CMD_INSERT)
+		transition_capture = context->mtstate->mt_oc_transition_capture;
+
+	if (transition_capture &&
+		(transition_capture->tcs_update_old_table ||
+		 transition_capture->tcs_update_new_table))
+	{
+		return true;
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		ExecUpdate
  *
@@ -2322,6 +2387,34 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	}
 	else
 	{
+		LazyTupleTableSlot oldSlotData,
+				   *oldSlot = NULL;
+		int			options = TABLE_MODIFY_WAIT;
+
+		/* Initialize the slot to store old tuple */
+		if (ExecUpdateNeedsOldTupleForTrigger(context, resultRelInfo))
+		{
+			TupleTableSlot *triggerOldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
+			ExecClearTuple(triggerOldSlot);
+			oldSlot = &oldSlotData;
+			MakeLazyTupleTableSlotWithSlot(oldSlot, triggerOldSlot);
+			options |= TABLE_MODIFY_FETCH_OLD_TUPLE;
+		}
+		else if (!locked)
+		{
+			if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+				ExecInitUpdateProjection(context->mtstate,
+										 resultRelInfo);
+
+			oldSlot = &oldSlotData;
+			MakeLazyTupleTableSlotWithSlot(oldSlot,
+										   resultRelInfo->ri_oldTupleSlot);
+		}
+
+		if (!locked && !IsolationUsesXactSnapshot())
+			options |= TABLE_MODIFY_LOCK_UPDATED;
+
 		/*
 		 * If we generate a new candidate tuple after EvalPlanQual testing, we
 		 * must loop back here to try again.  (We don't need to redo triggers,
@@ -2331,8 +2424,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 		 */
 redo_act:
 		result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
-							   canSetTag, !IsolationUsesXactSnapshot(),
-							   &updateCxt);
+							   canSetTag, options, oldSlot, &updateCxt);
 
 		/*
 		 * If ExecUpdateAct reports that a cross-partition update was done,
@@ -2379,50 +2471,44 @@ redo_act:
 				return NULL;
 
 			case TM_Ok:
+				/* Materialize the tuple for trigger */
+				if (options & TABLE_MODIFY_FETCH_OLD_TUPLE)
+				{
+					TupleTableSlot *oldSlotEval = LazyTupleTableSlotEval(oldSlot);
+
+					ExecMaterializeSlot(oldSlotEval);
+				}
 				break;
 
 			case TM_Updated:
 				{
-					TupleTableSlot *inputslot;
 					TupleTableSlot *epqslot;
-					TupleTableSlot *oldSlot;
+					TupleTableSlot *oldSlotEval = LazyTupleTableSlotEval(oldSlot);
 
 					if (IsolationUsesXactSnapshot())
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
 					Assert(!locked);
+					Assert(context->tmfd.traversed);
 
-					/*
-					 * ExecUpdateAct() has already locked the old tuple for
-					 * us. Now we need to copy it to the right slot.
-					 */
-					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
-												 resultRelInfo->ri_RangeTableIndex);
+					ExecMaterializeSlot(oldSlotEval);
 
-					/* Make sure ri_oldTupleSlot is initialized. */
 					if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
 						ExecInitUpdateProjection(context->mtstate,
 												 resultRelInfo);
-
-					/*
-					 * Save the locked tuple for further calculation of the
-					 * new tuple.
-					 */
-					oldSlot = resultRelInfo->ri_oldTupleSlot;
-					ExecCopySlot(oldSlot, inputslot);
-					ExecMaterializeSlot(oldSlot);
-					Assert(context->tmfd.traversed);
+					if (oldSlotEval != resultRelInfo->ri_oldTupleSlot)
+						ExecCopySlot(resultRelInfo->ri_oldTupleSlot, oldSlotEval);
 
 					epqslot = EvalPlanQual(context->epqstate,
 										   resultRelationDesc,
 										   resultRelInfo->ri_RangeTableIndex,
-										   inputslot);
+										   oldSlotEval);
 					if (TupIsNull(epqslot))
 						/* Tuple not passing quals anymore, exiting... */
 						return NULL;
 					slot = ExecGetUpdateNewTuple(resultRelInfo,
-												 epqslot, oldSlot);
+												 epqslot, oldSlotEval);
 					goto redo_act;
 				}
 
@@ -2776,6 +2862,7 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	bool		isNull;
 	EPQState   *epqstate = &mtstate->mt_epqstate;
 	ListCell   *l;
+	int			options = TABLE_MODIFY_WAIT;
 
 	/*
 	 * If there are no WHEN MATCHED actions, we are done.
@@ -2803,7 +2890,6 @@ lmerge_matched:
 	 * EvalPlanQual returns us a new tuple, which may not be visible to our
 	 * MVCC snapshot.
 	 */
-
 	if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
 									   tupleid,
 									   SnapshotAny,
@@ -2868,9 +2954,18 @@ lmerge_matched:
 					break;		/* concurrent update/delete */
 				}
 				result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
-									   newslot, false, false, &updateCxt);
+									   newslot, false, options, NULL,
+									   &updateCxt);
 				if (result == TM_Ok && updateCxt.updated)
 				{
+					if (ExecUpdateNeedsOldTupleForTrigger(context, resultRelInfo))
+					{
+						TupleTableSlot *triggerOldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
+						ExecCopySlot(triggerOldSlot, resultRelInfo->ri_oldTupleSlot);
+						ExecMaterializeSlot(triggerOldSlot);
+					}
+
 					ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
 									   tupleid, NULL, newslot);
 					mtstate->mt_merge_updated += 1;
@@ -2887,9 +2982,16 @@ lmerge_matched:
 					break;		/* concurrent update/delete */
 				}
 				result = ExecDeleteAct(context, resultRelInfo, tupleid,
-									   false, false);
+									   false, options, NULL);
 				if (result == TM_Ok)
 				{
+					if (ExecDeleteNeedsOldTupleForTrigger(context, resultRelInfo, false))
+					{
+						TupleTableSlot *triggerOldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
+
+						ExecCopySlot(triggerOldSlot, resultRelInfo->ri_oldTupleSlot);
+						ExecMaterializeSlot(triggerOldSlot);
+					}
 					ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
 									   false);
 					mtstate->mt_merge_deleted += 1;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index faf50265191..79cc528506d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -242,19 +242,22 @@ extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
-							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, bool changingPart);
+							 CommandId cid, Snapshot crosscheck, int options,
+							 struct TM_FailureData *tmfd, bool changingPart,
+							 LazyTupleTableSlot *oldSlot);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
 							 HeapTuple newtup,
-							 CommandId cid, Snapshot crosscheck, bool wait,
+							 CommandId cid, Snapshot crosscheck, int options,
 							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes);
-extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
-								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
-								 bool follow_updates,
-								 Buffer *buffer, struct TM_FailureData *tmfd);
+							 TU_UpdateIndexes *update_indexes,
+							 LazyTupleTableSlot *oldSlot);
+extern TM_Result heap_lock_tuple(Relation relation, ItemPointer tid,
+								 TupleTableSlot *slot,
+								 CommandId cid, LockTupleMode mode,
+								 LockWaitPolicy wait_policy, bool follow_updates,
+								 struct TM_FailureData *tmfd);
 
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 7159365e652..445e5ba595a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -259,6 +259,11 @@ typedef struct TM_IndexDeleteOp
 /* Follow update chain and lock latest version of tuple */
 #define TUPLE_LOCK_FLAG_FIND_LAST_VERSION		(1 << 1)
 
+/* "options" flag bits for table_tuple_update and table_tuple_delete */
+#define TABLE_MODIFY_WAIT			0x0001
+#define TABLE_MODIFY_FETCH_OLD_TUPLE 0x0002
+#define TABLE_MODIFY_LOCK_UPDATED	0x0004
+
 
 /* Typedef for callback function for table_index_build_scan */
 typedef void (*IndexBuildCallback) (Relation index,
@@ -528,10 +533,10 @@ typedef struct TableAmRoutine
 								 CommandId cid,
 								 Snapshot snapshot,
 								 Snapshot crosscheck,
-								 bool wait,
+								 int options,
 								 TM_FailureData *tmfd,
 								 bool changingPart,
-								 LazyTupleTableSlot *lockedSlot);
+								 LazyTupleTableSlot *oldSlot);
 
 	/* see table_tuple_update() for reference about parameters */
 	TM_Result	(*tuple_update) (Relation rel,
@@ -540,11 +545,11 @@ typedef struct TableAmRoutine
 								 CommandId cid,
 								 Snapshot snapshot,
 								 Snapshot crosscheck,
-								 bool wait,
+								 int options,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
 								 TU_UpdateIndexes *update_indexes,
-								 LazyTupleTableSlot *lockedSlot);
+								 LazyTupleTableSlot *oldSlot);
 
 	/* see table_tuple_lock() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
@@ -1459,7 +1464,7 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 }
 
 /*
- * Delete a tuple (or lock last tuple version if lockedSlot is given).
+ * Delete a tuple (and optionally lock the last tuple version).
  *
  * NB: do not call this directly unless prepared to deal with
  * concurrent-update conditions.  Use simple_table_tuple_delete instead.
@@ -1470,36 +1475,46 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
  *	cid - delete command ID (used for visibility test, and stored into
  *		cmax if successful)
  *	crosscheck - if not InvalidSnapshot, also check tuple against this
- *	wait - true if should wait for any conflicting update to commit/abort
+ *	options - true if should wait for any conflicting update to commit/abort
  * Output parameters:
  *	tmfd - filled in failure cases (see below)
  *	changingPart - true iff the tuple is being moved to another partition
  *		table due to an update of the partition key. Otherwise, false.
- *	lockedSlot - lazy slot to save the locked tuple if should lock the last
- *		row version during the concurrent update. NULL if not needed.
+ *	oldSlot - lazy slot to save the deleted or locked tuple. Can be
+ *		NULL if none of TABLE_MODIFY_FETCH_OLD_TUPLE or
+ *		TABLE_MODIFY_LOCK_UPDATED is specified.
  *
  * Normal, successful return value is TM_Ok, which means we did actually
  * delete it.  Failure return codes are TM_SelfModified, TM_Updated, and
  * TM_BeingModified (the last only possible if wait == false).
  *
+ * When TABLE_MODIFY_WAIT option is given, waits for any conflicting update to
+ * commit/abort.
+ *
+ * When TABLE_MODIFY_FETCH_OLD_TUPLE option is given, the existing tuple is
+ * fetched into oldSlot when the update is successful.
+ *
+ * When TABLE_MODIFY_LOCK_UPDATED option is given and the tuple is concurrently
+ * updated, then the last tuple version is locked and fetched into oldSlot.
+ *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax, and, if possible, and, if possible, t_cmax.  See comments for
  * struct TM_FailureData for additional info.
  */
 static inline TM_Result
 table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
-				   Snapshot snapshot, Snapshot crosscheck, bool wait,
+				   Snapshot snapshot, Snapshot crosscheck, int options,
 				   TM_FailureData *tmfd, bool changingPart,
-				   LazyTupleTableSlot *lockedSlot)
+				   LazyTupleTableSlot *oldSlot)
 {
 	return rel->rd_tableam->tuple_delete(rel, tid, cid,
 										 snapshot, crosscheck,
-										 wait, tmfd, changingPart,
-										 lockedSlot);
+										 options, tmfd, changingPart,
+										 oldSlot);
 }
 
 /*
- * Update a tuple (or lock last tuple version if lockedSlot is given).
+ * Update a tuple (and optionally lock the last tuple version).
  *
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_table_tuple_update instead.
@@ -1511,19 +1526,29 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
  *	cid - update command ID (used for visibility test, and stored into
  *		cmax/cmin if successful)
  *	crosscheck - if not InvalidSnapshot, also check old tuple against this
- *	wait - true if should wait for any conflicting update to commit/abort
+ *	options - true if should wait for any conflicting update to commit/abort
  * Output parameters:
  *	tmfd - filled in failure cases (see below)
  *	lockmode - filled with lock mode acquired on tuple
  *  update_indexes - in success cases this is set to true if new index entries
  *		are required for this tuple
- * 	lockedSlot - lazy slot to save the locked tuple if should lock the last
- *		row version during the concurrent update. NULL if not needed.
+ *	oldSlot - lazy slot to save the deleted or locked tuple. Can be
+ *		NULL if none of TABLE_MODIFY_FETCH_OLD_TUPLE or
+ *		TABLE_MODIFY_LOCK_UPDATED is specified.
 
  * Normal, successful return value is TM_Ok, which means we did actually
  * update it.  Failure return codes are TM_SelfModified, TM_Updated, and
  * TM_BeingModified (the last only possible if wait == false).
  *
+ * When TABLE_MODIFY_WAIT option is given, waits for any conflicting update to
+ * commit/abort.
+ *
+ * When TABLE_MODIFY_FETCH_OLD_TUPLE option is given, the existing tuple is
+ * fetched into oldSlot when the update is successful.
+ *
+ * When TABLE_MODIFY_LOCK_UPDATED option is given and the tuple is concurrently
+ * updated, then the last tuple version is locked and fetched into oldSlot.
+ *
  * On success, the slot's tts_tid and tts_tableOid are updated to match the new
  * stored tuple; in particular, slot->tts_tid is set to the TID where the
  * new tuple was inserted, and its HEAP_ONLY_TUPLE flag is set iff a HOT
@@ -1537,15 +1562,15 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
 static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
-				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
+				   int options, TM_FailureData *tmfd, LockTupleMode *lockmode,
 				   TU_UpdateIndexes *update_indexes,
-				   LazyTupleTableSlot *lockedSlot)
+				   LazyTupleTableSlot *oldSlot)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
-										 wait, tmfd,
+										 options, tmfd,
 										 lockmode, update_indexes,
-										 lockedSlot);
+										 oldSlot);
 }
 
 /*
@@ -2061,10 +2086,12 @@ table_scan_sample_next_tuple(TableScanDesc scan,
 
 extern void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot);
 extern void simple_table_tuple_delete(Relation rel, ItemPointer tid,
-									  Snapshot snapshot);
+									  Snapshot snapshot,
+									  TupleTableSlot *oldSlot);
 extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 									  TupleTableSlot *slot, Snapshot snapshot,
-									  TU_UpdateIndexes *update_indexes);
+									  TU_UpdateIndexes *update_indexes,
+									  TupleTableSlot *oldSlot);
 
 
 /* ----------------------------------------------------------------------------
-- 
2.37.1 (Apple Git-137.1)

