From 0f64083628f8b7edd057b996372700f0c2e79c23 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Tue, 30 Dec 2025 12:55:31 +0800
Subject: [PATCH v7 2/2] Consolidate replication origin session globals into a
 single struct.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This commit moves the separate global variables for replication origin
state into a single RepOriginXactState struct. This groups logically
related variables, which improves code readability and simplifies
state management (e.g., resetting the state) by handling them as a
unit.

Author: Chao Li <lic@highgo.com>
Suggested-by: Álvaro Herrera <alvherre@kurilemu.de
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com
---
 src/backend/access/transam/twophase.c         | 32 ++++++++---------
 src/backend/access/transam/xact.c             | 36 +++++++++----------
 src/backend/access/transam/xloginsert.c       |  6 ++--
 .../replication/logical/applyparallelworker.c |  6 ++--
 src/backend/replication/logical/origin.c      | 34 +++++++++---------
 src/backend/replication/logical/tablesync.c   |  6 ++--
 src/backend/replication/logical/worker.c      | 32 ++++++++---------
 src/include/replication/origin.h              | 14 +++++---
 src/tools/pgindent/typedefs.list              |  1 +
 9 files changed, 88 insertions(+), 79 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e50abb331cc..329c6bbf3c7 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_xact_state.origin != InvalidRepOriginId &&
+				  replorigin_xact_state.origin != DoNotReplicateId);
 
 	if (replorigin)
 	{
-		hdr->origin_lsn = replorigin_session_origin_lsn;
-		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+		hdr->origin_lsn = replorigin_xact_state.origin_lsn;
+		hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	/*
@@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
 	if (replorigin)
 	{
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   gxact->prepare_end_lsn);
 	}
 
@@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_xact_state.origin != InvalidRepOriginId &&
+				  replorigin_xact_state.origin != DoNotReplicateId);
 
 	/* Load the injection point before entering the critical section */
 	INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
 	 * Record commit timestamp.  The value comes from plain commit timestamp
 	 * if replorigin is not enabled, or replorigin already set a value for us
-	 * in replorigin_session_origin_timestamp otherwise.
+	 * in replorigin_xact_state.origin_timestamp otherwise.
 	 *
 	 * We don't need to WAL-log anything here, as the commit record written
 	 * above already contains the data.
 	 */
-	if (!replorigin || replorigin_session_origin_timestamp == 0)
-		replorigin_session_origin_timestamp = committs;
+	if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+		replorigin_xact_state.origin_timestamp = committs;
 
 	TransactionTreeSetCommitTsData(xid, nchildren, children,
-								   replorigin_session_origin_timestamp,
-								   replorigin_session_origin);
+								   replorigin_xact_state.origin_timestamp,
+								   replorigin_xact_state.origin);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_xact_state.origin != InvalidRepOriginId &&
+				  replorigin_xact_state.origin != DoNotReplicateId);
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/* Always flush, since we're about to remove the 2PC state file */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c857e23552f..c779ce71f07 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1413,8 +1413,8 @@ RecordTransactionCommit(void)
 		 * Are we using the replication origins feature?  Or, in other words,
 		 * are we replaying remote actions?
 		 */
-		replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-					  replorigin_session_origin != DoNotReplicateId);
+		replorigin = (replorigin_xact_state.origin != InvalidRepOriginId &&
+					  replorigin_xact_state.origin != DoNotReplicateId);
 
 		/*
 		 * Mark ourselves as within our "commit critical section".  This
@@ -1462,25 +1462,25 @@ RecordTransactionCommit(void)
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
-			replorigin_session_advance(replorigin_session_origin_lsn,
+			replorigin_session_advance(replorigin_xact_state.origin_lsn,
 									   XactLastRecEnd);
 
 		/*
 		 * Record commit timestamp.  The value comes from plain commit
 		 * timestamp if there's no replication origin; otherwise, the
-		 * timestamp was already set in replorigin_session_origin_timestamp by
-		 * replication.
+		 * timestamp was already set in replorigin_xact_state.origin_timestamp
+		 * by replication.
 		 *
 		 * We don't need to WAL-log anything here, as the commit record
 		 * written above already contains the data.
 		 */
 
-		if (!replorigin || replorigin_session_origin_timestamp == 0)
-			replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+		if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+			replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
 
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   replorigin_session_origin_timestamp,
-									   replorigin_session_origin);
+									   replorigin_xact_state.origin_timestamp,
+									   replorigin_xact_state.origin);
 	}
 
 	/*
@@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact)
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_xact_state.origin != InvalidRepOriginId &&
+				  replorigin_xact_state.origin != DoNotReplicateId);
 
 	/* Fetch the data we need for the abort record */
 	nrels = smgrGetPendingDeletes(false, &rels);
@@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact)
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
@@ -5928,12 +5928,12 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	/* dump transaction origin information */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -6081,12 +6081,12 @@ XactLogAbortRecord(TimestampTz abort_time,
 	 * Dump transaction origin information. We need this during recovery to
 	 * update the replication origin progress.
 	 */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..7f9485e08e7 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 	/* followed by the record's origin, if any */
 	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
-		replorigin_session_origin != InvalidRepOriginId)
+		replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
-		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
-		scratch += sizeof(replorigin_session_origin);
+		memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
+		scratch += sizeof(replorigin_xact_state.origin);
 	}
 
 	/* followed by toplevel XID, if not already included in previous record */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 5ebd2353fed..fbc113b099e 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * origin which was already acquired by its leader process.
 	 */
 	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
+	replorigin_xact_state.origin = originid;
 	CommitTransactionCommand();
 
 	/*
@@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = abort_data->abort_lsn;
-	replorigin_session_origin_timestamp = abort_data->abort_time;
+	replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
+	replorigin_xact_state.origin_timestamp = abort_data->abort_time;
 
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 6800a1f755d..5b09ac86f9b 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -159,10 +159,12 @@ typedef struct ReplicationStateCtl
 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
 } ReplicationStateCtl;
 
-/* external variables */
-RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
-XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+/* Global variable for per-transaction replication origin state */
+RepOriginXactState replorigin_xact_state = {
+	.origin = InvalidRepOriginId,	/* assumed identify */
+	.origin_lsn = InvalidXLogRecPtr,
+	.origin_timestamp = 0
+};
 
 /*
  * Base address into a shared memory array of replication states of size
@@ -896,7 +898,7 @@ replorigin_redo(XLogReaderState *record)
  * Tell the replication origin progress machinery that a commit from 'node'
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
- * setting up replorigin_session_origin_lsn and replorigin_session_origin
+ * setting up replorigin_xact_state.origin_lsn and replorigin_xact_state.origin
  * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
@@ -1288,17 +1290,17 @@ replorigin_session_get_progress(bool flush)
 }
 
 /*
- * Clear session replication origin state.
+ * Clear the per-transaction replication origin state.
  *
  * replorigin_session_origin is also cleared if clear_origin is set.
  */
 void
-replorigin_session_clear(bool clear_origin)
+replorigin_xact_clear(bool clear_origin)
 {
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
+	replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
+	replorigin_xact_state.origin_timestamp = 0;
 	if (clear_origin)
-		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_xact_state.origin = InvalidRepOriginId;
 }
 
 
@@ -1408,7 +1410,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 	pid = PG_GETARG_INT32(1);
 	replorigin_session_setup(origin, pid);
 
-	replorigin_session_origin = origin;
+	replorigin_xact_state.origin = origin;
 
 	pfree(name);
 
@@ -1425,7 +1427,7 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
 
 	replorigin_session_reset();
 
-	replorigin_session_clear(true);
+	replorigin_xact_clear(true);
 
 	PG_RETURN_VOID();
 }
@@ -1438,7 +1440,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(false, false);
 
-	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
+	PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidRepOriginId);
 }
 
 
@@ -1482,8 +1484,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	replorigin_session_origin_lsn = location;
-	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+	replorigin_xact_state.origin_lsn = location;
+	replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
 
 	PG_RETURN_VOID();
 }
@@ -1494,7 +1496,7 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
 	replorigin_check_prerequisites(true, false);
 
 	/* Clear only origin_lsn and origin_timestamp */
-	replorigin_session_clear(false);
+	replorigin_xact_clear(false);
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 72e45c2de19..864476b4a72 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -323,7 +323,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
 		 * This is needed to allow the origin to be dropped.
 		 */
 		replorigin_session_reset();
-		replorigin_session_clear(true);
+		replorigin_xact_clear(true);
 
 		/*
 		 * Drop the tablesync's origin tracking if exists.
@@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 */
 		originid = replorigin_by_name(originname, false);
 		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
+		replorigin_xact_state.origin = originid;
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		CommitTransactionCommand();
@@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_xact_state.origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6ad99cc5afd..1f21b56ace6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data->end_lsn;
-	replorigin_session_origin_timestamp = prepare_data->prepare_time;
+	replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
+	replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
 
 	PrepareTransactionBlock(gid);
 }
@@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data.end_lsn;
-	replorigin_session_origin_timestamp = prepare_data.commit_time;
+	replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
+	replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
 
 	FinishPreparedTransaction(gid, true);
 	end_replication_step();
@@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
-		replorigin_session_origin_timestamp = rollback_data.rollback_time;
+		replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
+		replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
 
 		/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
 		begin_replication_step();
@@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = commit_data->end_lsn;
-		replorigin_session_origin_timestamp = commit_data->committime;
+		replorigin_xact_state.origin_lsn = commit_data->end_lsn;
+		replorigin_xact_state.origin_timestamp = commit_data->committime;
 
 		CommitTransactionCommand();
 
@@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 		{
 			TupleTableSlot *newslot;
 
@@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									   &conflicttuple.xmin,
 									   &conflicttuple.origin,
 									   &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 			type = CT_UPDATE_DELETED;
 		else
 			type = CT_UPDATE_MISSING;
@@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 		{
 			conflicttuple.slot = localslot;
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												   &conflicttuple.xmin,
 												   &conflicttuple.origin,
 												   &conflicttuple.ts) &&
-						conflicttuple.origin != replorigin_session_origin)
+						conflicttuple.origin != replorigin_xact_state.origin)
 						type = CT_UPDATE_DELETED;
 					else
 						type = CT_UPDATE_MISSING;
@@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 											&conflicttuple.origin,
 											&conflicttuple.ts) &&
-					conflicttuple.origin != replorigin_session_origin)
+					conflicttuple.origin != replorigin_xact_state.origin)
 				{
 					TupleTableSlot *newslot;
 
@@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos)
 		 * transaction loss as that transaction won't be sent again by the
 		 * server.
 		 */
-		replorigin_session_clear(true);
+		replorigin_xact_clear(true);
 
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
@@ -5652,7 +5652,7 @@ run_apply_worker(void)
 	if (!OidIsValid(originid))
 		originid = replorigin_create(originname);
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_xact_state.origin = originid;
 	origin_startpos = replorigin_session_get_progress(false);
 	CommitTransactionCommand();
 
@@ -5874,7 +5874,7 @@ InitializeLogRepWorker(void)
 static void
 on_exit_clear_state(int code, Datum arg)
 {
-	replorigin_session_clear(true);
+	replorigin_xact_clear(true);
 }
 
 /*
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index 309b800bd5f..56e01fb4ba3 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop
  */
 #define MAX_RONAME_LEN	512
 
-extern PGDLLIMPORT RepOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct RepOriginXactState
+{
+	RepOriginId origin;
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} RepOriginXactState;
+
+extern PGDLLIMPORT RepOriginXactState replorigin_xact_state;
 
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
@@ -65,9 +70,10 @@ extern void replorigin_session_advance(XLogRecPtr remote_commit,
 									   XLogRecPtr local_commit);
 extern void replorigin_session_setup(RepOriginId node, int acquired_by);
 extern void replorigin_session_reset(void);
-extern void replorigin_session_clear(bool clear_origin);
 extern XLogRecPtr replorigin_session_get_progress(bool flush);
 
+extern void replorigin_xact_clear(bool clear_origin);
+
 /* Checkpoint/Startup integration */
 extern void CheckPointReplicationOrigin(void);
 extern void StartupReplicationOrigin(void);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 09e7f1d420e..94a1dbed466 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2569,6 +2569,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepOriginXactState
 ReparameterizeForeignPathByChild_function
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
-- 
2.47.3

