From 08d642949a7f9700e5c2111605eccdeccd2737ae Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Tue, 5 Jan 2021 09:04:29 +0530
Subject: [PATCH v1] Track replication origin progress for rollbacks.

Commit 1eb6d6527a allowed to track replica origin replay progress for 2PC
but it was not complete. It misses to properly track the progress for
rollback prepared especially it missed to update the code for recovery.
Additionally, we need to allow tracking it on subscriber nodes where
wal_level might not be logical.

Author: Amit Kapila
Reviewed-by: Ajin Cherian
---
 src/backend/access/transam/twophase.c | 13 +++++++++++++
 src/backend/access/transam/xact.c     | 19 ++++++++++++++-----
 2 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index fc18b77..609cf02 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2277,6 +2277,14 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   const char *gid)
 {
 	XLogRecPtr	recptr;
+	bool		replorigin;
+
+	/*
+	 * Are we using the replication origins feature?  Or, in other words, are
+	 * we replaying remote actions?
+	 */
+	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+				  replorigin_session_origin != DoNotReplicateId);
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2299,6 +2307,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
 								xid, gid);
 
+	if (replorigin)
+		/* Move LSNs forward for this replication origin */
+		replorigin_session_advance(replorigin_session_origin_lsn,
+								   XactLastRecEnd);
+
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a2068e3..5619547 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5720,8 +5720,7 @@ XactLogAbortRecord(TimestampTz abort_time,
 
 	/* dump transaction origin information only for abort prepared */
 	if ((replorigin_session_origin != InvalidRepOriginId) &&
-		TransactionIdIsValid(twophase_xid) &&
-		XLogLogicalInfoActive())
+		TransactionIdIsValid(twophase_xid))
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
@@ -5927,7 +5926,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
  * because subtransaction commit is never WAL logged.
  */
 static void
-xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
+				XLogRecPtr lsn, RepOriginId origin_id)
 {
 	TransactionId max_xid;
 
@@ -5976,6 +5976,13 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 			StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		/* recover apply progress */
+		replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+						   false /* backward */, false /* WAL */);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
@@ -6017,7 +6024,8 @@ xact_redo(XLogReaderState *record)
 		xl_xact_parsed_abort parsed;
 
 		ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
-		xact_redo_abort(&parsed, XLogRecGetXid(record));
+		xact_redo_abort(&parsed, XLogRecGetXid(record),
+						record->EndRecPtr, XLogRecGetOrigin(record));
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
 	{
@@ -6025,7 +6033,8 @@ xact_redo(XLogReaderState *record)
 		xl_xact_parsed_abort parsed;
 
 		ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
-		xact_redo_abort(&parsed, parsed.twophase_xid);
+		xact_redo_abort(&parsed, parsed.twophase_xid,
+						record->EndRecPtr, XLogRecGetOrigin(record));
 
 		/* Delete TwoPhaseState gxact entry and/or 2PC file. */
 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
-- 
1.8.3.1

