From a17061fb2fb64897e8aad13dc7f3ce34f2b2d815 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Sun, 21 Jan 2024 16:50:44 +0800
Subject: [PATCH] Track transactions committed in BUILDING_SNAPSHOT.

We previously didn't track the transaction that begin after BUILDING_SNAPSHOT
and commit before FULL_SNAPSHOT when building historic snapshot in logical
decoding. This can cause a transaction which begin after FULL_SNAPSHOT to take
an incorrect historic snapshot because transactions committed in BUILDING_SNAPSHOT
state will not be processed by SnapBuildCommitTxn().

To fix it, we track the transaction that begin after BUILDING_SNAPSHOT and
commit before FULL_SNAPSHOT forcely by using SnapBuildCommitTxn(). Note that
there is no need to track transactions that begin before BUILDING_SNAPSHOT
because they are all finished when we reach FULL_SNAPSHOT state.
---
 src/backend/replication/logical/decode.c    | 38 ++++++++++++++++++++-
 src/backend/replication/logical/snapbuild.c |  8 +++--
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6a0f22b209..89a3f23ec5 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -54,6 +54,8 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid,
 						 bool two_phase);
+static void DecodeCommitWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+                                             XLogRecordBuffer *buf);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						xl_xact_parsed_abort *parsed, TransactionId xid,
 						bool two_phase);
@@ -210,10 +212,17 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	/*
 	 * If the snapshot isn't yet fully built, we cannot decode anything, so
-	 * bail out.
+	 * bail out. However, we need track some transactions committed in
+	 * BUILDING_SNAPSHOT or we may have an incorrect historic snapshot
+	 * when reaching FULL_SNAPSHOT state.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+	{
+		if (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+			(info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED))
+			DecodeCommitWhenBuildingSnapshot(ctx, buf);
 		return;
+	}
 
 	switch (info)
 	{
@@ -746,6 +755,33 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	UpdateDecodingStats(ctx);
 }
 
+/*
+ * Track the transaction that begin after BUILDING_SNAPSHOT and commit before
+ * FULL_SNAPSHOT. Should only be called when we are in BUILDING_SNAPSHOT state.
+ */
+static void
+DecodeCommitWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+                                 XLogRecordBuffer *buf)
+{
+	TransactionId 	xid;
+	xl_xact_commit 	*xlrec;
+	xl_xact_parsed_commit parsed;
+	XLogReaderState *r = buf->record;
+
+	/* The record must be XLOG_XACT_COMMIT or XLOG_XACT_COMMIT_PREPARED */
+	xlrec = (xl_xact_commit *) XLogRecGetData(r);
+	ParseCommitRecord(XLogRecGetInfo(r), xlrec, &parsed);
+
+	if (!TransactionIdIsValid(parsed.twophase_xid))
+		xid = XLogRecGetXid(r);
+	else
+		xid = parsed.twophase_xid;
+
+	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed.nsubxacts, parsed.subxacts,
+					   parsed.xinfo);
+}
+
 /*
  * Decode PREPARE record. Similar logic as in DecodeCommit.
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a0b7947d2f..e48557045f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1054,10 +1054,12 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 			builder->start_decoding_at = lsn + 1;
 
 		/*
-		 * If building an exportable snapshot, force xid to be tracked, even
-		 * if the transaction didn't modify the catalog.
+		 * If building an exportable snapshot or we are in BUILDING_SNAPSHOT
+		 * state, force xid to be tracked, even if the transaction didn't
+		 * modify the catalog.
 		 */
-		if (builder->building_full_snapshot)
+		if (builder->building_full_snapshot ||
+		    builder->state == SNAPBUILD_BUILDING_SNAPSHOT)
 		{
 			needs_timetravel = true;
 		}
-- 
2.34.1

