From 7ac49fc29af24bbd3f53c98576b0100cad356e04 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Thu, 24 Jul 2025 17:03:34 +0800
Subject: [PATCH v53] Support the conflict detection for update_deleted

This patch supports detecting update_deleted conflicts during update
operations. If the target row cannot be found when applying update operations,
we perform an additional scan of the table using snapshotAny. This scan aims to
locate the most recently deleted row that matches the old column values from
the remote update operation and has not yet been removed by VACUUM. If any such
tuples are found, we report the update_deleted conflict along with the origin
and transaction information that deleted the tuple.
---
 doc/src/sgml/catalogs.sgml                 |   3 +-
 doc/src/sgml/logical-replication.sgml      |  16 ++
 doc/src/sgml/monitoring.sgml               |  11 +
 doc/src/sgml/ref/create_subscription.sgml  |   5 +-
 src/backend/catalog/system_views.sql       |   1 +
 src/backend/executor/execReplication.c     | 241 ++++++++++++++++++++-
 src/backend/replication/logical/conflict.c |  22 ++
 src/backend/replication/logical/relation.c |   1 +
 src/backend/replication/logical/worker.c   | 145 +++++++++++--
 src/backend/utils/adt/pgstatfuncs.c        |  18 +-
 src/include/catalog/pg_proc.dat            |   6 +-
 src/include/executor/executor.h            |  12 +-
 src/include/replication/conflict.h         |   3 +
 src/include/replication/worker_internal.h  |   5 +-
 src/test/regress/expected/rules.out        |   3 +-
 src/test/subscription/t/035_conflicts.pl   |  65 +++++-
 16 files changed, 512 insertions(+), 45 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 97f547b3cc4..a855a89b388 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8087,7 +8087,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subretaindeadtuples</structfield> <type>bool</type>
       </para>
       <para>
-       If true, the information (e.g., dead tuples, commit timestamps, and
+       If true, he detection of <xref linkend="conflict-update-deleted"/> is
+       enabled and the information (e.g., dead tuples, commit timestamps, and
        origins) on the subscriber that is useful for conflict detection is
        retained.
       </para></entry>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fcac55aefe6..b75777c6e8d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1775,6 +1775,22 @@ Publications:
       </para>
      </listitem>
     </varlistentry>
+    <varlistentry id="conflict-update-deleted" xreflabel="update_deleted">
+     <term><literal>update_deleted</literal></term>
+     <listitem>
+      <para>
+       The tuple to be updated was recently deleted by another origin. The update
+       will simply be skipped in this scenario. Note that this conflict can only
+       be detected when
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       and <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+       are enabled. Note that if a tuple cannot be found due to the table being
+       truncated, only a <literal>update_missing</literal> conflict will
+       arise. Additionally, if the tuple was deleted by the same origin, an
+       <literal>update_missing</literal> conflict will arise.
+      </para>
+     </listitem>
+    </varlistentry>
     <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
      <term><literal>update_origin_differs</literal></term>
      <listitem>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 823afe1b30b..3d02400c870 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2199,6 +2199,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_deleted</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was recently deleted by another
+       source during the application of changes. See <xref linkend="conflict-update-deleted"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b8cd15f3280..e847e39fa84 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -445,8 +445,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           Specifies whether the information (e.g., dead tuples, commit
           timestamps, and origins) required for conflict detection on the
           subscriber is retained. The default is <literal>false</literal>.
-          If set to <literal>true</literal>, a physical replication slot named
-          <quote><literal>pg_conflict_detection</literal></quote> will be
+          If set to <literal>true</literal>, the detection of
+          <xref linkend="conflict-update-deleted"/> is enabled, and a physical
+          replication slot named <quote><literal>pg_conflict_detection</literal></quote>
           created on the subscriber to prevent the conflict information from
           being removed.
          </para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f6eca09ee15..e1470e48bd6 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1397,6 +1397,7 @@ CREATE VIEW pg_stat_subscription_stats AS
         ss.apply_error_count,
         ss.sync_error_count,
         ss.confl_insert_exists,
+        ss.confl_update_deleted,
         ss.confl_update_origin_differs,
         ss.confl_update_exists,
         ss.confl_update_missing,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index f262e7a66f7..9a95bfb3c52 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -14,12 +14,14 @@
 
 #include "postgres.h"
 
+#include "access/commit_ts.h"
 #include "access/genam.h"
 #include "access/gist.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/heapam.h"
 #include "catalog/pg_am_d.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
@@ -27,6 +29,7 @@
 #include "replication/conflict.h"
 #include "replication/logicalrelation.h"
 #include "storage/lmgr.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
@@ -36,7 +39,7 @@
 
 
 static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-						 TypeCacheEntry **eq);
+						 TypeCacheEntry **eq, Bitmapset *columns);
 
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
@@ -221,7 +224,7 @@ retry:
 			if (eq == NULL)
 				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-			if (!tuples_equal(outslot, searchslot, eq))
+			if (!tuples_equal(outslot, searchslot, eq, NULL))
 				continue;
 		}
 
@@ -277,10 +280,13 @@ retry:
 
 /*
  * Compare the tuples in the slots by checking if they have equal values.
+ *
+ * If 'columns' is not null, only the columns specified within it will be
+ * considered for the equality check, ignoring all other columns.
  */
 static bool
 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-			 TypeCacheEntry **eq)
+			 TypeCacheEntry **eq, Bitmapset *columns)
 {
 	int			attrnum;
 
@@ -305,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/*
+		 * Ignore columns that are not listed for checking.
+		 */
+		if (columns &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   columns))
+			continue;
+
 		/*
 		 * If one value is NULL and other is not, then they are certainly not
 		 * equal
@@ -380,7 +394,7 @@ retry:
 	/* Try to find the tuple */
 	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
 	{
-		if (!tuples_equal(scanslot, searchslot, eq))
+		if (!tuples_equal(scanslot, searchslot, eq, NULL))
 			continue;
 
 		found = true;
@@ -455,6 +469,225 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
 	}
 }
 
+/*
+ * If the tuple is identified as dead and was deleted by a transaction with a
+ * more recent commit timestamp, update the transaction ID, deletion time, and
+ * origin information associated with this tuple.
+ */
+static void
+update_recent_dead_tuple_info(TupleTableSlot *scanslot,
+							  TransactionId oldestxmin,
+							  TransactionId *delete_xid,
+							  TimestampTz *delete_time,
+							  RepOriginId *delete_origin)
+{
+	BufferHeapTupleTableSlot *hslot;
+	HeapTuple	tuple;
+	Buffer		buf;
+	bool		recently_dead = false;
+	TransactionId xmax;
+	TimestampTz localts;
+	RepOriginId localorigin;
+
+	hslot = (BufferHeapTupleTableSlot *) scanslot;
+
+	tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
+	buf = hslot->buffer;
+
+	LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+	/*
+	 * We do not consider HEAPTUPLE_DEAD status because it indicates either
+	 * tuples whose inserting transaction was aborted (meaning there is no
+	 * commit timestamp or origin), or tuples deleted by a transaction older
+	 * than oldestxmin, making it safe to ignore them during conflict
+	 * detection (See comments atop worker.c for details).
+	 */
+	if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
+		recently_dead = true;
+
+	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+	if (!recently_dead)
+		return;
+
+	xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
+	if (!TransactionIdIsValid(xmax))
+		return;
+
+	/* Select the dead tuple with the most recent commit timestamp */
+	if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
+		(TimestampDifferenceExceeds(*delete_time, localts, 0) ||
+		 *delete_time == 0))
+	{
+		*delete_xid = xmax;
+		*delete_time = localts;
+		*delete_origin = localorigin;
+	}
+}
+
+/*
+ * Searches the relation 'rel' for the most recently deleted tuple that matches
+ * the values in 'searchslot' and is not yet removable by VACUUM. The function
+ * returns the transaction ID, origin, and commit timestamp of the transaction
+ * that deleted this tuple.
+ *
+ * We scan all matching dead tuples in the relation to find the most recently
+ * deleted one, rather than stopping at the first match. This is because only
+ * the latest deletion information is relevant for resolving conflicts.
+ * Returning solely the first, potentially outdated tuple can lead users to
+ * mistakenly apply remote changes using a last-update-win strategy, even when a
+ * more recent deleted tuple is available. See comments atop worker.c for
+ * details.
+ *
+ * The commit timestamp of the transaction that deleted the tuple is used to
+ * determine whether the tuple is the most recently deleted one.
+ *
+ * This function performs a full table scan instead of using indexes, and it
+ * should be used only when the index scans could miss deleted tuples, such as
+ * when an index has been re-indexed or re-created using CONCURRENTLY option
+ * during change applications. While this approach may be slow on large tables,
+ * it is considered acceptable because it is only used in rare conflict cases
+ * where the target row for an update cannot be found.
+ */
+bool
+FindRecentlyDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
+								TransactionId *delete_xid,
+								RepOriginId *delete_origin,
+								TimestampTz *delete_time)
+{
+	TupleTableSlot *scanslot;
+	TableScanDesc scan;
+	TypeCacheEntry **eq;
+	TransactionId oldestxmin;
+	Bitmapset  *indexbitmap;
+	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+	Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+
+	*delete_xid = InvalidTransactionId;
+	*delete_origin = InvalidRepOriginId;
+	*delete_time = 0;
+
+	/* Exit early if the commit timestamp data is not available */
+	if (!track_commit_timestamp)
+		return false;
+
+	/* Get the cutoff xmin for HeapTupleSatisfiesVacuum */
+	oldestxmin = GetOldestNonRemovableTransactionId(rel);
+
+	/* Get the index column bitmap for tuples_equal */
+	indexbitmap = RelationGetIndexAttrBitmap(rel,
+											 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	/* fallback to PK if no replica identity */
+	if (!indexbitmap)
+		indexbitmap = RelationGetIndexAttrBitmap(rel,
+												 INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+	eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
+
+	/*
+	 * Start a heap scan using SnapshotAny to identify dead tuples that are
+	 * not visible under a standard MVCC snapshot.
+	 */
+	scan = table_beginscan(rel, SnapshotAny, 0, NULL);
+	scanslot = table_slot_create(rel, NULL);
+
+	table_rescan(scan, NULL);
+
+	/* Try to find the tuple */
+	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+	{
+		if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
+			continue;
+
+		update_recent_dead_tuple_info(scanslot, oldestxmin, delete_xid,
+									  delete_time, delete_origin);
+	}
+
+	table_endscan(scan);
+	ExecDropSingleTupleTableSlot(scanslot);
+
+	return *delete_time != 0;
+}
+
+/*
+ * Similar to FindRecentlyDeletedTupleInfoSeq() but using index scan to locate
+ * the deleted tuple.
+ */
+bool
+FindRecentlyDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+									TupleTableSlot *searchslot,
+									TransactionId *delete_xid,
+									RepOriginId *delete_origin,
+									TimestampTz *delete_time)
+{
+	Relation	idxrel;
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
+	IndexScanDesc scan;
+	TupleTableSlot *scanslot;
+	TypeCacheEntry **eq;
+	TransactionId oldestxmin;
+	bool		isIdxSafeToSkipDuplicates;
+	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+	Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+	Assert(OidIsValid(idxoid));
+
+	*delete_xid = InvalidTransactionId;
+	*delete_time = 0;
+	*delete_origin = InvalidRepOriginId;
+
+	/* Return if the commit timestamp data is not available */
+	if (!track_commit_timestamp)
+		return false;
+
+	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+	oldestxmin = GetOldestNonRemovableTransactionId(rel);
+
+	scanslot = table_slot_create(rel, NULL);
+
+	idxrel = index_open(idxoid, RowExclusiveLock);
+
+	/* Build scan key. */
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
+
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+	/* Try to find the tuple */
+	while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
+	{
+		/*
+		 * Avoid expensive equality check if the index is primary key or
+		 * replica identity index.
+		 */
+		if (!isIdxSafeToSkipDuplicates)
+		{
+			if (eq == NULL)
+				eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
+
+			if (!tuples_equal(scanslot, searchslot, eq, NULL))
+				continue;
+		}
+
+		update_recent_dead_tuple_info(scanslot, oldestxmin, delete_xid,
+									  delete_time, delete_origin);
+	}
+
+	index_endscan(scan);
+
+	index_close(idxrel, NoLock);
+
+	ExecDropSingleTupleTableSlot(scanslot);
+
+	return *delete_time != 0;
+}
+
 /*
  * Find the tuple that violates the passed unique index (conflictindex).
  *
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 97c4e26b586..24d0b4ada4d 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -25,6 +25,7 @@
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
+	[CT_UPDATE_DELETED] = "update_deleted",
 	[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
 	[CT_UPDATE_EXISTS] = "update_exists",
 	[CT_UPDATE_MISSING] = "update_missing",
@@ -173,6 +174,7 @@ errcode_apply_conflict(ConflictType type)
 		case CT_UPDATE_EXISTS:
 		case CT_MULTIPLE_UNIQUE_CONFLICTS:
 			return errcode(ERRCODE_UNIQUE_VIOLATION);
+		case CT_UPDATE_DELETED:
 		case CT_UPDATE_ORIGIN_DIFFERS:
 		case CT_UPDATE_MISSING:
 		case CT_DELETE_ORIGIN_DIFFERS:
@@ -246,6 +248,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
 
 			break;
 
+		case CT_UPDATE_DELETED:
+			if (localts)
+			{
+				if (localorigin == InvalidRepOriginId)
+					appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+									 localxmin, timestamptz_to_str(localts));
+				else if (replorigin_by_oid(localorigin, true, &origin_name))
+					appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+									 origin_name, localxmin, timestamptz_to_str(localts));
+
+				/* The origin that modified this row has been removed. */
+				else
+					appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+									 localxmin, timestamptz_to_str(localts));
+			}
+			else
+				appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+			break;
+
 		case CT_UPDATE_ORIGIN_DIFFERS:
 			if (localorigin == InvalidRepOriginId)
 				appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f59046ad620..c93d34923c4 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -25,6 +25,7 @@
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
+#include "replication/slot.h"
 #include "replication/worker_internal.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b59221c4d06..695fad8f761 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -138,9 +138,9 @@
  * Each apply worker that enabled retain_dead_tuples option maintains a
  * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
  * prevent dead rows from being removed prematurely when the apply worker still
- * needs them to detect conflicts reliably. This helps to retain the required
- * commit_ts module information, which further helps to detect
- * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
  * otherwise, vacuum freeze could remove the required information.
  *
  * The logical replication launcher manages an internal replication slot named
@@ -185,10 +185,10 @@
  * transactions that occurred concurrently with the tuple DELETE, any
  * subsequent UPDATE from a remote node should have a later timestamp. In such
  * cases, it is acceptable to detect an update_missing scenario and convert the
- * UPDATE to an INSERT when applying it. But, detecting concurrent remote
- * transactions with earlier timestamps than the DELETE is necessary, as the
- * UPDATEs in remote transactions should be ignored if their timestamp is
- * earlier than that of the dead tuples.
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
  *
  * Note that advancing the non-removable transaction ID is not supported if the
  * publisher is also a physical standby. This is because the logical walsender
@@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
 									Oid localidxoid,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+									   Oid localidxoid,
+									   TupleTableSlot *remoteslot,
+									   TransactionId *delete_xid,
+									   RepOriginId *delete_origin,
+									   TimestampTz *delete_time);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   TupleTableSlot *remoteslot,
 									   LogicalRepTupleData *newtup,
@@ -2912,17 +2918,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	}
 	else
 	{
+		ConflictType type;
 		TupleTableSlot *newslot = localslot;
 
+		if (MySubscription->retaindeadtuples &&
+			FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+									   &conflicttuple.xmin,
+									   &conflicttuple.origin,
+									   &conflicttuple.ts) &&
+			conflicttuple.origin != replorigin_session_origin)
+			type = CT_UPDATE_DELETED;
+		else
+			type = CT_UPDATE_MISSING;
+
 		/* Store the new tuple for conflict reporting */
 		slot_store_data(newslot, relmapentry, newtup);
 
 		/*
-		 * The tuple to be updated could not be found.  Do nothing except for
-		 * emitting a log message.
+		 * The tuple to be updated could not be found or was deleted.  Do
+		 * nothing except for emitting a log message.
 		 */
-		ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
-							remoteslot, newslot, list_make1(&conflicttuple));
+		ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+							list_make1(&conflicttuple));
 	}
 
 	/* Cleanup. */
@@ -3142,6 +3159,78 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 	return found;
 }
 
+/*
+ * Check whether the index can reliably locate the deleted tuple in the local
+ * relation.
+ *
+ * Considering an index might omit all deleted tuples if it has been re-indexed
+ * or re-created using CONCURRENTLY option during change applications, an index
+ * is deemed usable only when the conflict detection slot.xmin exceeds the index
+ * tuple's xmin in pg_index. This ensures that any tuples deleted before the
+ * index creation or re-indexing are irrelevant for conflict detection.
+ *
+ * Note that this might also abandons some indexes that are updated due to other
+ * operations or without the CONCURRENTLY option. However, this is generally
+ * acceptable, as both the DDL commands that modify indexes and the need to scan
+ * dead tuples for the update_deleted are relatively rare occurrences.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid)
+{
+	HeapTuple	index_tuple;
+	TransactionId index_xmin;
+	TransactionId conflict_detection_xmin;
+	ReplicationSlot *slot;
+
+	index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+	/*
+	 * A frozen transaction ID indicates that it must fall behind the conflict
+	 * detection slot.xmin.
+	 */
+	if (HeapTupleHeaderXminFrozen(index_tuple->t_data))
+		return true;
+
+	slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+	Assert(slot);
+
+	SpinLockAcquire(&slot->mutex);
+	conflict_detection_xmin = slot->data.xmin;
+	SpinLockRelease(&slot->mutex);
+
+	index_xmin = HeapTupleHeaderGetRawXmin(index_tuple->t_data);
+
+	return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Try to find a deleted tuple in the local relation that matching the values of
+ * the tuple received from the publication side (in 'remoteslot'). The function
+ * uses either replica identity index, primary key, index or if needed,
+ * sequential scan.
+ *
+ * Return true if found the deleted tuple. The transaction ID, commit timestamp,
+ * and origin of the transaction for the deletion, if found, are
+ * stored in '*delete_xid', '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel,
+						   Oid localidxoid, TupleTableSlot *remoteslot,
+						   TransactionId *delete_xid, RepOriginId *delete_origin,
+						   TimestampTz *delete_time)
+{
+	if (OidIsValid(localidxoid) &&
+		IsIndexUsableForFindingDeletedTuple(localidxoid))
+		return FindRecentlyDeletedTupleInfoByIndex(localrel, localidxoid,
+												   remoteslot, delete_xid,
+												   delete_origin, delete_time);
+	else
+		return FindRecentlyDeletedTupleInfoSeq(localrel, remoteslot,
+											   delete_xid, delete_origin,
+											   delete_time);
+}
+
 /*
  * This handles insert, update, delete on a partitioned table.
  */
@@ -3260,18 +3349,31 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
+					ConflictType type;
 					TupleTableSlot *newslot = localslot;
 
+					if (MySubscription->retaindeadtuples &&
+						FindDeletedTupleInLocalRel(partrel,
+												   part_entry->localindexoid,
+												   remoteslot_part,
+												   &conflicttuple.xmin,
+												   &conflicttuple.origin,
+												   &conflicttuple.ts) &&
+						conflicttuple.origin != replorigin_session_origin)
+						type = CT_UPDATE_DELETED;
+					else
+						type = CT_UPDATE_MISSING;
+
 					/* Store the new tuple for conflict reporting */
 					slot_store_data(newslot, part_entry, newtup);
 
 					/*
-					 * The tuple to be updated could not be found.  Do nothing
-					 * except for emitting a log message.
+					 * The tuple to be updated could not be found or was
+					 * deleted.  Do nothing except for emitting a log message.
 					 */
 					ReportApplyConflict(estate, partrelinfo, LOG,
-										CT_UPDATE_MISSING, remoteslot_part,
-										newslot, list_make1(&conflicttuple));
+										type, remoteslot_part, newslot,
+										list_make1(&conflicttuple));
 
 					return;
 				}
@@ -4172,8 +4274,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
 {
 	/*
 	 * It is sufficient to manage non-removable transaction ID for a
-	 * subscription by the main apply worker to detect conflicts reliably even
-	 * for table sync or parallel apply workers.
+	 * subscription by the main apply worker to detect update_deleted reliably
+	 * even for table sync or parallel apply workers.
 	 */
 	if (!am_leader_apply_worker())
 		return false;
@@ -4374,10 +4476,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 * We expect the publisher and subscriber clocks to be in sync using time
 	 * sync service like NTP. Otherwise, we will advance this worker's
 	 * oldest_nonremovable_xid prematurely, leading to the removal of rows
-	 * required to detect conflicts reliably. This check primarily addresses
-	 * scenarios where the publisher's clock falls behind; if the publisher's
-	 * clock is ahead, subsequent transactions will naturally bear later
-	 * commit timestamps, conforming to the design outlined atop worker.c.
+	 * required to detect update_deleted reliably. This check primarily
+	 * addresses scenarios where the publisher's clock falls behind; if the
+	 * publisher's clock is ahead, subsequent transactions will naturally bear
+	 * later commit timestamps, conforming to the design outlined atop
+	 * worker.c.
 	 *
 	 * XXX Consider waiting for the publisher's clock to catch up with the
 	 * subscriber's before proceeding to the next phase.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 1c12ddbae49..2a084d3f3f0 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	11
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	12
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2193,19 +2193,21 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_deleted",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 3ee8fed7e53..1a3b17ea112 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5688,9 +5688,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_deleted,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..6b1f89c1feb 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "datatype/timestamp.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -759,7 +760,16 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 										 TupleTableSlot *outslot);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 									 TupleTableSlot *searchslot, TupleTableSlot *outslot);
-
+extern bool FindRecentlyDeletedTupleInfoSeq(Relation rel,
+											TupleTableSlot *searchslot,
+											TransactionId *delete_xid,
+											RepOriginId *delete_origin,
+											TimestampTz *delete_time);
+extern bool FindRecentlyDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+												TupleTableSlot *searchslot,
+												TransactionId *delete_xid,
+												RepOriginId *delete_origin,
+												TimestampTz *delete_time);
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 6c59125f256..cbd9656a60a 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -26,6 +26,9 @@ typedef enum
 	/* The row to be inserted violates unique constraint */
 	CT_INSERT_EXISTS,
 
+	/* The row to be updated was deleted by a different origin */
+	CT_UPDATE_DELETED,
+
 	/* The row to be updated was modified by a different origin */
 	CT_UPDATE_ORIGIN_DIFFERS,
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0c7b8440a61..42f4fed38ff 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -87,8 +87,9 @@ typedef struct LogicalRepWorker
 	bool		parallel_apply;
 
 	/*
-	 * The changes made by this and later transactions must be retained to
-	 * ensure reliable conflict detection during the apply phase.
+	 * The changes made by this and later transactions shouldn't be removed.
+	 * This allows the detection of update_deleted conflicts when applying
+	 * changes in this logical replication worker.
 	 *
 	 * The logical replication launcher manages an internal replication slot
 	 * named "pg_conflict_detection". It asynchronously collects this ID to
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index dce8c672b40..56704426431 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2177,6 +2177,7 @@ pg_stat_subscription_stats| SELECT ss.subid,
     ss.apply_error_count,
     ss.sync_error_count,
     ss.confl_insert_exists,
+    ss.confl_update_deleted,
     ss.confl_update_origin_differs,
     ss.confl_update_exists,
     ss.confl_update_missing,
@@ -2185,7 +2186,7 @@ pg_stat_subscription_stats| SELECT ss.subid,
     ss.confl_multiple_unique_conflicts,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_deleted, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 976d53a870e..0a4968f359d 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -150,7 +150,9 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert');
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
 
-# Initialize nodes.
+# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect
+# the conflict when attempting to update a row that was previously modified by
+# a different origin.
 
 # node_A. Increase the log_min_messages setting to DEBUG2 to debug test
 # failures. Disable autovacuum to avoid generating xid that could affect the
@@ -158,7 +160,8 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert');
 my $node_A = $node_publisher;
 $node_A->append_conf(
 	'postgresql.conf',
-	qq{autovacuum = off
+	qq{track_commit_timestamp = on
+	autovacuum = off
 	log_min_messages = 'debug2'});
 $node_A->restart;
 
@@ -270,6 +273,8 @@ $node_A->psql('postgres',
 ###############################################################################
 # Check that dead tuples on node A cannot be cleaned by VACUUM until the
 # concurrent transactions on Node B have been applied and flushed on Node A.
+# And check that an update_deleted conflict is detected when updating a row
+# that was deleted by a different origin.
 ###############################################################################
 
 # Insert a record
@@ -288,6 +293,8 @@ $node_A->poll_query_until('postgres',
 	"SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
 );
 
+my $log_location = -s $node_B->logfile;
+
 $node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
 $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
 
@@ -299,10 +306,30 @@ ok( $stderr =~
 	  qr/1 are dead but not yet removable/,
 	'the deleted column is non-removable');
 
+# Ensure the DELETE is replayed on Node B
+$node_A->wait_for_catchup($subname_BA);
+
+# Check the conflict detected on Node B
+my $logfile = slurp_file($node_B->logfile(), $log_location);
+ok( $logfile =~
+	  qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*
+.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .*
+.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+	'delete target row was modified in tab');
+
+$log_location = -s $node_A->logfile;
+
 $node_A->safe_psql(
 	'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
 $node_B->wait_for_catchup($subname_AB);
 
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+	  qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+	'update target row was deleted in tab');
+
 # Remember the next transaction ID to be assigned
 my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
 
@@ -324,6 +351,40 @@ ok( $stderr =~
 	  qr/1 removed, 1 remain, 0 are dead but not yet removable/,
 	'the deleted column is removed');
 
+###############################################################################
+# Check that dead tuples can be found through a full table sequential scan
+###############################################################################
+
+# Drop the primary key from tab on node A and set REPLICA IDENTITY to FULL to
+# enforce sequential scanning of the table.
+$node_A->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_B->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_A->safe_psql('postgres', "ALTER TABLE tab DROP CONSTRAINT tab_pkey;");
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+	"SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 4 WHERE a = 2;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 2;");
+
+$log_location = -s $node_A->logfile;
+
+$node_A->safe_psql(
+	'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+	  qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(2, 4\); replica identity full \(2, 2\)/,
+	'update target row was deleted in tab');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.
-- 
2.50.1.windows.1

