From cc69040e063a91acce0a7c7b9b4defe14500bb31 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Jun 2021 13:18:58 +0900
Subject: [PATCH v1 1/3] Add ALTER SUBSCRIPTION SET SKIP TRANSACTION.

---
 src/backend/catalog/pg_subscription.c    |  10 ++
 src/backend/commands/subscriptioncmds.c  |  21 +++
 src/backend/parser/gram.y                |  17 ++
 src/backend/replication/logical/worker.c | 216 ++++++++++++++++++++---
 src/include/catalog/pg_subscription.h    |   4 +
 src/include/nodes/parsenodes.h           |   5 +-
 6 files changed, 243 insertions(+), 30 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 29fc4218cd..7b79de6351 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -103,6 +103,16 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
 
+	/* Get skip XID */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subskipxid,
+							&isnull);
+	if (!isnull)
+		sub->skipxid = DatumGetTransactionId(datum);
+	else
+		sub->skipxid = InvalidTransactionId;
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b862e59f1d..97fd56e371 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -429,6 +429,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -1020,6 +1021,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SET_SKIP_XID:
+			{
+				if (sub->skipxid != stmt->skip_xid)
+				{
+					values[Anum_pg_subscription_subskipxid - 1] =
+						TransactionIdGetDatum(stmt->skip_xid);
+					replaces[Anum_pg_subscription_subskipxid - 1] = true;
+					update_tuple = true;
+				}
+
+				break;
+			}
+		case ALTER_SUBSCRIPTION_RESET_SKIP_XID:
+			{
+				nulls[Anum_pg_subscription_subskipxid - 1] = true;
+				replaces[Anum_pg_subscription_subskipxid - 1] = true;
+				update_tuple = true;
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 				 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb24195438..ef9213570f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9772,6 +9772,23 @@ AlterSubscriptionStmt:
 											(Node *)makeInteger(false), @1));
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name SET SKIP TRANSACTION Iconst
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_SET_SKIP_XID;
+					n->subname = $3;
+					n->skip_xid = $7;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name RESET SKIP TRANSACTION
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_RESET_SKIP_XID;
+					n->subname = $3;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bbb659dad0..b90a8df166 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -62,6 +62,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -181,6 +182,19 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * True if we're skipping changes of the specified transaction in
+ * MySubscription->skip_xid.  Please note that We don’t skip receiving those
+ * changes. We decide whether or not to skip applying the changes when starting
+ * to apply.  That is, for streamed transactions, we receive the streamed changes
+ * anyway and then cleanup streamed files when applying the stream-commit or
+ * stream-abort message.  When stopping the skipping behavior, we reset the skip
+ * XID (subskipxid) in the pg_subscription and associate origin status to the
+ * transaction that resets the skip XID so that we can start streaming from the
+ * next transaction.
+ */
+static bool skipping_changes = false;
+
 /*
  * Hash table for storing the streaming xid information along with shared file
  * set for streaming and subxact files.
@@ -236,8 +250,7 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
-static void apply_handle_commit_internal(StringInfo s,
-										 LogicalRepCommitData *commit_data);
+static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
@@ -256,6 +269,13 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   TupleTableSlot *remoteslot,
 									   LogicalRepTupleData *newtup,
 									   CmdType operation);
+static void apply_streamed_changes(TransactionId xid,
+								   LogicalRepCommitData *commit_data);
+
+static bool start_skipping_changes(TransactionId xid);
+static bool stop_skipping_changes(bool reset_xid,
+								  LogicalRepCommitData *commit_data);
+
 
 /*
  * Should this worker apply changes for given relation.
@@ -771,7 +791,9 @@ apply_handle_begin(StringInfo s)
 
 	remote_final_lsn = begin_data.final_lsn;
 
-	in_remote_transaction = true;
+	/* Start skipping all changes of this transaction if necessary */
+	if (!start_skipping_changes(begin_data.xid))
+		in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
 }
@@ -795,7 +817,12 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(s, &commit_data);
+	/*
+	 * Stop the skipping transaction if enabled. Otherwise, commit the
+	 * changes that are just applied.
+	 */
+	if (!stop_skipping_changes(true, &commit_data))
+		apply_handle_commit_internal(&commit_data);
 
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
@@ -813,9 +840,10 @@ apply_handle_origin(StringInfo s)
 {
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
-	 * remote transaction and before any actual writes.
+	 * remote transaction and before any actual writes unless we're skipping
+	 * changes of the transaction.
 	 */
-	if (!in_streamed_transaction &&
+	if (!in_streamed_transaction && !skipping_changes &&
 		(!in_remote_transaction ||
 		 (IsTransactionState() && !am_tablesync_worker())))
 		ereport(ERROR,
@@ -837,6 +865,9 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("duplicate STREAM START message")));
 
+	/* extract XID of the top-level transaction */
+	stream_xid = logicalrep_read_stream_start(s, &first_segment);
+
 	/*
 	 * Start a transaction on stream start, this transaction will be committed
 	 * on the stream stop unless it is a tablesync worker in which case it
@@ -849,9 +880,6 @@ apply_handle_stream_start(StringInfo s)
 	/* notify handle methods we're processing a remote transaction */
 	in_streamed_transaction = true;
 
-	/* extract XID of the top-level transaction */
-	stream_xid = logicalrep_read_stream_start(s, &first_segment);
-
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1016,6 +1044,9 @@ apply_handle_stream_abort(StringInfo s)
 		end_replication_step();
 		CommitTransactionCommand();
 	}
+
+	/* Stop the skipping transaction if enabled */
+	stop_skipping_changes(true, NULL);
 }
 
 /*
@@ -1025,14 +1056,7 @@ static void
 apply_handle_stream_commit(StringInfo s)
 {
 	TransactionId xid;
-	StringInfoData s2;
-	int			nchanges;
-	char		path[MAXPGPATH];
-	char	   *buffer = NULL;
 	LogicalRepCommitData commit_data;
-	StreamXidHash *ent;
-	MemoryContext oldcxt;
-	BufFile    *fd;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1041,8 +1065,40 @@ apply_handle_stream_commit(StringInfo s)
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
 
+	remote_final_lsn = commit_data.commit_lsn;
+
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
+	/*
+	 * Stop skipping transaction information if enabled. Otherwise, apply
+	 * all streamed changes and commit the transaction.
+	 */
+	if (!stop_skipping_changes(true, &commit_data))
+		apply_streamed_changes(xid, &commit_data);
+
+	/* unlink the files with serialized changes and subxact info */
+	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data.end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Apply all streamed changes with the xid.
+ */
+static void
+apply_streamed_changes(TransactionId xid, LogicalRepCommitData *commit_data)
+{
+	StringInfoData s2;
+	int			nchanges;
+	char		path[MAXPGPATH];
+	char	   *buffer = NULL;
+	StreamXidHash *ent;
+	MemoryContext oldcxt;
+	BufFile    *fd;
+
 	/* Make sure we have an open transaction */
 	begin_replication_step();
 
@@ -1074,8 +1130,6 @@ apply_handle_stream_commit(StringInfo s)
 
 	MemoryContextSwitchTo(oldcxt);
 
-	remote_final_lsn = commit_data.commit_lsn;
-
 	/*
 	 * Make sure the handle apply_dispatch methods are aware we're in a remote
 	 * transaction.
@@ -1153,22 +1207,15 @@ apply_handle_stream_commit(StringInfo s)
 	elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
 		 nchanges, path);
 
-	apply_handle_commit_internal(s, &commit_data);
-
-	/* unlink the files with serialized changes and subxact info */
-	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
-
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(commit_data.end_lsn);
-
-	pgstat_report_activity(STATE_IDLE, NULL);
+	/* commit the streamed transaction */
+	apply_handle_commit_internal(commit_data);
 }
 
 /*
  * Helper function for apply_handle_commit and apply_handle_stream_commit.
  */
 static void
-apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
+apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 {
 	if (IsTransactionState())
 	{
@@ -2020,6 +2067,17 @@ apply_dispatch(StringInfo s)
 {
 	LogicalRepMsgType action = pq_getmsgbyte(s);
 
+	/*
+	 * Skip all data-modification changes if we're skipping changes of this
+	 * transaction.
+	 */
+	if (skipping_changes &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE))
+		return;
+
 	switch (action)
 	{
 		case LOGICAL_REP_MSG_BEGIN:
@@ -2309,7 +2367,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		/* confirm all writes so far */
 		send_feedback(last_received, false, false);
 
-		if (!in_remote_transaction && !in_streamed_transaction)
+		if (!in_remote_transaction && !in_streamed_transaction &&
+			!skipping_changes)
 		{
 			/*
 			 * If we didn't get any transactions for a while there might be
@@ -3254,3 +3313,102 @@ IsLogicalWorker(void)
 {
 	return MyLogicalRepWorker != NULL;
 }
+
+/*
+ * Start skipping changes of the given transaction. Return true if we
+ * enabled the skipping behavior.
+ */
+static bool
+start_skipping_changes(TransactionId xid)
+{
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	if (!TransactionIdIsValid(MySubscription->skipxid) ||
+		MySubscription->skipxid != xid)
+		return false;
+
+	skipping_changes = true;
+	ereport(LOG,
+			errmsg("start skipping logical replication transaction with xid %u",
+				   xid));
+
+	return true;
+}
+
+/*
+ * Stop skipping changes and reset the skip XID. Return true if we were
+ * skipping changes and have stopped it.
+ *
+ * reset_xid is true if the caller wants to reset the skip XID (subskipxid)
+ * after disabling the skipping behavior.  Also, if *commit_data is non-NULL,
+ * we set origin state to the transaction commit that resets the skip XID so
+ * that we can start streaming from the transaction next to the one that we
+ * just skipped.
+ */
+static bool
+stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+
+	if (!skipping_changes)
+		return false;
+
+	Assert(TransactionIdIsValid(MySubscription->skipxid));
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	/* Stop skipping changes */
+	skipping_changes = false;
+
+	ereport(LOG,
+			errmsg("done skipping logical replication transaction with xid %u",
+				   MySubscription->skipxid));
+
+	if (!reset_xid)
+		return true;
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Update the system catalog to reset the skip XID */
+	StartTransactionCommand();
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  TransactionIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	/* Set subskipxid to null */
+	nulls[Anum_pg_subscription_subskipxid - 1] = true;
+	replaces[Anum_pg_subscription_subskipxid - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	if (commit_data)
+	{
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = commit_data->end_lsn;
+		replorigin_session_origin_timestamp = commit_data->committime;
+	}
+
+	CommitTransactionCommand();
+
+	return true;
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0060ebfb40..a13b936891 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -57,6 +57,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	TransactionId	subskipxid	BKI_FORCE_NULL;	/* All changes associated with
+												   this XID are skipped */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -94,6 +97,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	TransactionId skipxid;		/* All changes of the XID are skipped */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..49e5d3e1e5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3664,7 +3664,9 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
-	ALTER_SUBSCRIPTION_ENABLED
+	ALTER_SUBSCRIPTION_ENABLED,
+	ALTER_SUBSCRIPTION_SET_SKIP_XID,
+	ALTER_SUBSCRIPTION_RESET_SKIP_XID,
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
@@ -3675,6 +3677,7 @@ typedef struct AlterSubscriptionStmt
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
+	TransactionId	skip_xid;	/* XID to skip */
 } AlterSubscriptionStmt;
 
 typedef struct DropSubscriptionStmt
-- 
2.24.3 (Apple Git-128)

