From 11a7f20bef3c974c5fe7c0ce0f705ff911a58392 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Tue, 30 Dec 2025 12:55:31 +0800
Subject: [PATCH v8 2/2] Consolidate replication origin session globals into a
 single struct.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This commit moves the separate global variables for replication origin
state into a single RepOriginXactState struct. This groups logically
related variables, which improves code readability and simplifies
state management (e.g., resetting the state) by handling them as a
unit.

Author: Chao Li <lic@highgo.com>
Suggested-by: Álvaro Herrera <alvherre@kurilemu.de
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com
---
 src/backend/access/transam/twophase.c         | 32 ++++++-------
 src/backend/access/transam/xact.c             | 44 +++++++++--------
 src/backend/access/transam/xloginsert.c       | 12 +++--
 .../replication/logical/applyparallelworker.c |  6 +--
 src/backend/replication/logical/origin.c      | 38 ++++++---------
 src/backend/replication/logical/tablesync.c   |  4 +-
 src/backend/replication/logical/worker.c      | 28 +++++------
 src/include/replication/origin.h              | 48 +++++++++++++++++--
 src/tools/pgindent/typedefs.list              |  1 +
 9 files changed, 122 insertions(+), 91 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e50abb331cc..96fa86b9a0a 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1157,13 +1157,12 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = replorigin_xact_origin_isvalid();
 
 	if (replorigin)
 	{
-		hdr->origin_lsn = replorigin_session_origin_lsn;
-		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+		hdr->origin_lsn = replorigin_xact_state.origin_lsn;
+		hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	/*
@@ -1211,7 +1210,7 @@ EndPrepare(GlobalTransaction gxact)
 	if (replorigin)
 	{
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   gxact->prepare_end_lsn);
 	}
 
@@ -2330,8 +2329,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = replorigin_xact_origin_isvalid();
 
 	/* Load the injection point before entering the critical section */
 	INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@@ -2376,23 +2374,22 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
-	 * Record commit timestamp.  The value comes from plain commit timestamp
-	 * if replorigin is not enabled, or replorigin already set a value for us
-	 * in replorigin_session_origin_timestamp otherwise.
+	 * Record commit timestamp. Use one in replorigin_xact_state if set,
+	 * otherwise use plain commit timestamp.
 	 *
 	 * We don't need to WAL-log anything here, as the commit record written
 	 * above already contains the data.
 	 */
-	if (!replorigin || replorigin_session_origin_timestamp == 0)
-		replorigin_session_origin_timestamp = committs;
+	if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+		replorigin_xact_state.origin_timestamp = committs;
 
 	TransactionTreeSetCommitTsData(xid, nchildren, children,
-								   replorigin_session_origin_timestamp,
-								   replorigin_session_origin);
+								   replorigin_xact_state.origin_timestamp,
+								   replorigin_xact_state.origin);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2445,8 +2442,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = replorigin_xact_origin_isvalid();
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2472,7 +2468,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/* Always flush, since we're about to remove the 2PC state file */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c857e23552f..fb5147c4570 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1413,8 +1413,7 @@ RecordTransactionCommit(void)
 		 * Are we using the replication origins feature?  Or, in other words,
 		 * are we replaying remote actions?
 		 */
-		replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-					  replorigin_session_origin != DoNotReplicateId);
+		replorigin = replorigin_xact_origin_isvalid();
 
 		/*
 		 * Mark ourselves as within our "commit critical section".  This
@@ -1462,25 +1461,23 @@ RecordTransactionCommit(void)
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
-			replorigin_session_advance(replorigin_session_origin_lsn,
+			replorigin_session_advance(replorigin_xact_state.origin_lsn,
 									   XactLastRecEnd);
 
 		/*
-		 * Record commit timestamp.  The value comes from plain commit
-		 * timestamp if there's no replication origin; otherwise, the
-		 * timestamp was already set in replorigin_session_origin_timestamp by
-		 * replication.
+		 * Record commit timestamp. Use one in replorigin_xact_state if set,
+		 * otherwise use plain commit timestamp.
 		 *
 		 * We don't need to WAL-log anything here, as the commit record
 		 * written above already contains the data.
 		 */
 
-		if (!replorigin || replorigin_session_origin_timestamp == 0)
-			replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+		if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+			replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
 
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   replorigin_session_origin_timestamp,
-									   replorigin_session_origin);
+									   replorigin_xact_state.origin_timestamp,
+									   replorigin_xact_state.origin);
 	}
 
 	/*
@@ -1810,8 +1807,7 @@ RecordTransactionAbort(bool isSubXact)
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = replorigin_xact_origin_isvalid();
 
 	/* Fetch the data we need for the abort record */
 	nrels = smgrGetPendingDeletes(false, &rels);
@@ -1838,7 +1834,7 @@ RecordTransactionAbort(bool isSubXact)
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_xact_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
@@ -5927,13 +5923,17 @@ XactLogCommitRecord(TimestampTz commit_time,
 			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
 	}
 
-	/* dump transaction origin information */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	/*
+	 * Dump transaction origin information
+	 *
+	 * Note that DoNotReplicateId is intentionally excluded here.
+	 */
+	if (replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -6080,13 +6080,15 @@ XactLogAbortRecord(TimestampTz abort_time,
 	/*
 	 * Dump transaction origin information. We need this during recovery to
 	 * update the replication origin progress.
+	 *
+	 * Note that DoNotReplicateId is intentionally excluded here.
 	 */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..64534e45216 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -859,13 +859,17 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(BlockNumber);
 	}
 
-	/* followed by the record's origin, if any */
+	/*
+	 * followed by the record's origin, if any
+	 *
+	 * DoNotReplicateId is intentionally excluded here
+	 */
 	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
-		replorigin_session_origin != InvalidRepOriginId)
+		replorigin_xact_state.origin != InvalidRepOriginId)
 	{
 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
-		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
-		scratch += sizeof(replorigin_session_origin);
+		memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
+		scratch += sizeof(replorigin_xact_state.origin);
 	}
 
 	/* followed by toplevel XID, if not already included in previous record */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 5ebd2353fed..232f0447b30 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * origin which was already acquired by its leader process.
 	 */
 	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
+	replorigin_xact_set_origin(originid);
 	CommitTransactionCommand();
 
 	/*
@@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = abort_data->abort_lsn;
-	replorigin_session_origin_timestamp = abort_data->abort_time;
+	replorigin_xact_set_lsn_timestamp(abort_data->abort_lsn,
+									  abort_data->abort_time);
 
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 09616641903..b014dbc9ccf 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -159,10 +159,12 @@ typedef struct ReplicationStateCtl
 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
 } ReplicationStateCtl;
 
-/* external variables */
-RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
-XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+/* Global variable for per-transaction replication origin state */
+RepOriginXactState replorigin_xact_state = {
+	.origin = InvalidRepOriginId,	/* assumed identify */
+	.origin_lsn = InvalidXLogRecPtr,
+	.origin_timestamp = 0
+};
 
 /*
  * Base address into a shared memory array of replication states of size
@@ -896,7 +898,7 @@ replorigin_redo(XLogReaderState *record)
  * Tell the replication origin progress machinery that a commit from 'node'
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
- * setting up replorigin_session_origin_lsn and replorigin_session_origin
+ * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
  * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
@@ -1287,20 +1289,6 @@ replorigin_session_get_progress(bool flush)
 	return remote_lsn;
 }
 
-/*
- * Clear the per-transaction replication origin state.
- *
- * replorigin_session_origin is also cleared if clear_origin is set.
- */
-void
-replorigin_xact_clear(bool clear_origin)
-{
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
-	if (clear_origin)
-		replorigin_session_origin = InvalidRepOriginId;
-}
-
 
 /* ---------------------------------------------------------------------------
  * SQL functions for working with replication origin.
@@ -1408,7 +1396,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 	pid = PG_GETARG_INT32(1);
 	replorigin_session_setup(origin, pid);
 
-	replorigin_session_origin = origin;
+	replorigin_xact_set_origin(origin);
 
 	pfree(name);
 
@@ -1438,7 +1426,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(false, false);
 
-	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
+	PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidRepOriginId);
 }
 
 
@@ -1482,8 +1470,7 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	replorigin_session_origin_lsn = location;
-	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+	replorigin_xact_set_lsn_timestamp(location, PG_GETARG_TIMESTAMPTZ(1));
 
 	PG_RETURN_VOID();
 }
@@ -1493,6 +1480,11 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(true, false);
 
+	if (session_replication_state == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("no replication origin is configured")));
+
 	/* Do not clear the session origin */
 	replorigin_xact_clear(false);
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 73a0d2838cf..e34377a559c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 */
 		originid = replorigin_by_name(originname, false);
 		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
+		replorigin_xact_set_origin(originid);
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		CommitTransactionCommand();
@@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_xact_set_origin(originid);
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c1aceabbdf5..d5e23854250 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1318,8 +1318,7 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data->end_lsn;
-	replorigin_session_origin_timestamp = prepare_data->prepare_time;
+	replorigin_xact_set_lsn_timestamp(prepare_data->end_lsn, prepare_data->prepare_time);
 
 	PrepareTransactionBlock(gid);
 }
@@ -1421,8 +1420,7 @@ apply_handle_commit_prepared(StringInfo s)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data.end_lsn;
-	replorigin_session_origin_timestamp = prepare_data.commit_time;
+	replorigin_xact_set_lsn_timestamp(prepare_data.end_lsn, prepare_data.commit_time);
 
 	FinishPreparedTransaction(gid, true);
 	end_replication_step();
@@ -1479,8 +1477,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
-		replorigin_session_origin_timestamp = rollback_data.rollback_time;
+		replorigin_xact_set_lsn_timestamp(rollback_data.rollback_end_lsn,
+										  rollback_data.rollback_time);
 
 		/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
 		begin_replication_step();
@@ -2526,8 +2524,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = commit_data->end_lsn;
-		replorigin_session_origin_timestamp = commit_data->committime;
+		replorigin_xact_set_lsn_timestamp(commit_data->end_lsn,
+										  commit_data->committime);
 
 		CommitTransactionCommand();
 
@@ -2940,7 +2938,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 		{
 			TupleTableSlot *newslot;
 
@@ -2982,7 +2980,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									   &conflicttuple.xmin,
 									   &conflicttuple.origin,
 									   &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 			type = CT_UPDATE_DELETED;
 		else
 			type = CT_UPDATE_MISSING;
@@ -3135,7 +3133,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_xact_state.origin)
 		{
 			conflicttuple.slot = localslot;
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3477,7 +3475,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												   &conflicttuple.xmin,
 												   &conflicttuple.origin,
 												   &conflicttuple.ts) &&
-						conflicttuple.origin != replorigin_session_origin)
+						conflicttuple.origin != replorigin_xact_state.origin)
 						type = CT_UPDATE_DELETED;
 					else
 						type = CT_UPDATE_MISSING;
@@ -3503,7 +3501,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 											&conflicttuple.origin,
 											&conflicttuple.ts) &&
-					conflicttuple.origin != replorigin_session_origin)
+					conflicttuple.origin != replorigin_xact_state.origin)
 				{
 					TupleTableSlot *newslot;
 
@@ -5652,7 +5650,7 @@ run_apply_worker(void)
 	if (!OidIsValid(originid))
 		originid = replorigin_create(originname);
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_xact_set_origin(originid);
 	origin_startpos = replorigin_session_get_progress(false);
 	CommitTransactionCommand();
 
@@ -5869,7 +5867,7 @@ InitializeLogRepWorker(void)
 }
 
 /*
- * Callback on exit to reset the origin state.
+ * Callback on exit to clear transaction-level replication origin state.
  */
 static void
 on_exit_clear_state(int code, Datum arg)
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index 1eaabacde03..4d6989d180a 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -40,9 +40,49 @@ typedef struct xl_replorigin_drop
  */
 #define MAX_RONAME_LEN	512
 
-extern PGDLLIMPORT RepOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct RepOriginXactState
+{
+	RepOriginId origin;
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} RepOriginXactState;
+
+extern PGDLLIMPORT RepOriginXactState replorigin_xact_state;
+
+static inline bool
+replorigin_xact_origin_isvalid()
+{
+	return replorigin_xact_state.origin != InvalidRepOriginId &&
+		replorigin_xact_state.origin != DoNotReplicateId;
+}
+
+static inline void
+replorigin_xact_set_origin(RepOriginId origin)
+{
+	replorigin_xact_state.origin = origin;
+}
+
+static inline void
+replorigin_xact_set_lsn_timestamp(XLogRecPtr origin_lsn,
+								  TimestampTz origin_timestamp)
+{
+	replorigin_xact_state.origin_lsn = origin_lsn;
+	replorigin_xact_state.origin_timestamp = origin_timestamp;
+}
+
+/*
+ * Clear the per-transaction replication origin state.
+ *
+ * replorigin_session_origin is also cleared if clear_origin is set.
+ */
+static inline void
+replorigin_xact_clear(bool clear_origin)
+{
+	replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
+	replorigin_xact_state.origin_timestamp = 0;
+	if (clear_origin)
+		replorigin_xact_state.origin = InvalidRepOriginId;
+}
 
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
@@ -67,8 +107,6 @@ extern void replorigin_session_setup(RepOriginId node, int acquired_by);
 extern void replorigin_session_reset(void);
 extern XLogRecPtr replorigin_session_get_progress(bool flush);
 
-extern void replorigin_xact_clear(bool clear_origin);
-
 /* Checkpoint/Startup integration */
 extern void CheckPointReplicationOrigin(void);
 extern void StartupReplicationOrigin(void);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 09e7f1d420e..94a1dbed466 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2569,6 +2569,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepOriginXactState
 ReparameterizeForeignPathByChild_function
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
-- 
2.39.5 (Apple Git-154)

