From 65ad6d794bb242e43232dd34a5f843d2f4c3c3dd Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 26 Sep 2022 12:59:43 +1000
Subject: [PATCH v3] Add common function ReplicationOriginNameForLogicalRep.

Make a common replication origin name formatting function to replace multiple
snprintf() expressions. This also includes logic previously done by
ReplicationOriginNameForTablesync().

Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com
---
 src/backend/commands/subscriptioncmds.c     | 11 ++++++-----
 src/backend/replication/logical/tablesync.c | 18 +++---------------
 src/backend/replication/logical/worker.c    | 29 +++++++++++++++++++++++++++--
 src/include/replication/worker_internal.h   |  4 ++--
 4 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f3bfcca..0793234 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
 	/*
@@ -946,7 +946,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 * origin and by this time the origin might be already
 					 * removed. For these reasons, passing missing_ok = true.
 					 */
-					ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
 													  sizeof(originname));
 					replorigin_drop_by_name(originname, true, false);
 				}
@@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					char		originname[NAMEDATALEN];
 					XLogRecPtr	remote_lsn;
 
-					snprintf(originname, sizeof(originname), "pg_%u", subid);
+					ReplicationOriginNameForLogicalRep(subid, InvalidOid,
+													   originname, sizeof(originname));
 					originid = replorigin_by_name(originname, false);
 					remote_lsn = replorigin_get_progress(originid, false);
 
@@ -1521,7 +1522,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		 * worker so passing missing_ok = true. This can happen for the states
 		 * before SUBREL_STATE_FINISHEDCOPY.
 		 */
-		ReplicationOriginNameForTablesync(subid, relid, originname,
+		ReplicationOriginNameForLogicalRep(subid, relid, originname,
 										  sizeof(originname));
 		replorigin_drop_by_name(originname, true, false);
 	}
@@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	RemoveSubscriptionRel(subid, InvalidOid);
 
 	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_drop_by_name(originname, true, false);
 
 	/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 9e52fc4..ff0f359 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -353,7 +353,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		StartTransactionCommand();
 
-		ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+		ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 										  MyLogicalRepWorker->relid,
 										  originname,
 										  sizeof(originname));
@@ -505,7 +505,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
 				 */
-				ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 												  rstate->relid,
 												  originname,
 												  sizeof(originname));
@@ -1194,18 +1194,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
 }
 
 /*
- * Form the origin name for tablesync.
- *
- * Return the name in the supplied buffer.
- */
-void
-ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-								  char *originname, Size szorgname)
-{
-	snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
-}
-
-/*
  * Start syncing the table in the sync worker.
  *
  * If nothing needs to be done to sync the table, we exit the worker without
@@ -1274,7 +1262,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
 
 	/* Assign the origin tracking record name. */
-	ReplicationOriginNameForTablesync(MySubscription->oid,
+	ReplicationOriginNameForLogicalRep(MySubscription->oid,
 									  MyLogicalRepWorker->relid,
 									  originname,
 									  sizeof(originname));
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 207a580..c2cd61a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -365,6 +365,30 @@ static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr ls
 static inline void reset_apply_error_context_info(void);
 
 /*
+ * Form the origin name for the subscription.
+ *
+ * This is a common function for tablesync and other workers. Tablesync workers
+ * must pass a valid relid. Other callers must pass relid = InvalidOid.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+								   char *originname, Size szoriginname)
+{
+	if (OidIsValid(relid))
+	{
+		/* Replication origin name for tablesync workers. */
+		snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
+	}
+	else
+	{
+		/* Replication origin name for non-tablesync workers. */
+		snprintf(originname, szoriginname, "pg_%u", suboid);
+	}
+}
+
+/*
  * Should this worker apply changes for given relation.
  *
  * This is mainly needed for initial relation data sync as that runs in
@@ -3679,7 +3703,7 @@ ApplyWorkerMain(Datum main_arg)
 		 * Allocate the origin name in long-lived context for error context
 		 * message.
 		 */
-		ReplicationOriginNameForTablesync(MySubscription->oid,
+		ReplicationOriginNameForLogicalRep(MySubscription->oid,
 										  MyLogicalRepWorker->relid,
 										  originname,
 										  sizeof(originname));
@@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg)
 
 		/* Setup replication origin tracking. */
 		StartTransactionCommand();
-		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+										   originname, sizeof(originname));
 		originid = replorigin_by_name(originname, true);
 		if (!OidIsValid(originid))
 			originid = replorigin_create(originname);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f82bc51..40a0b06 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
-extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-											  char *originname, Size szorgname);
+extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+											  char *originname, Size szoriginname);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 
 extern bool AllTablesyncsReady(void);
-- 
1.8.3.1

