From 533aa256916a83ac9907c513bed765e8d8e13c6f Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 14 Jul 2023 09:58:04 +0530
Subject: [PATCH 2/4] Reuse Tablesync Workers

This commit allows reusing tablesync workers for syncing more than one
table sequentially during their lifetime, instead of exiting after
only syncing one table.

Before this commit, tablesync workers were capable of syncing only one
table. For each table, a new sync worker was launched and that worker would
exit when done processing the table.

Now, tablesync workers are not limited to processing only one
table. When done, they can move to processing another table in
the same subscription.

If there is a table that needs to be synced, an available tablesync
worker picks up that table and syncs it. Each tablesync worker
continues to pick new tables to sync until there are no tables left
requiring synchronization. If there was no available worker to
process the table, then a new tablesync worker will be launched,
provided the number of tablesync workers for the subscription does not
exceed max_sync_workers_per_subscription.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 src/backend/replication/logical/launcher.c  |   1 +
 src/backend/replication/logical/tablesync.c |  50 ++++++--
 src/backend/replication/logical/worker.c    | 121 ++++++++++++++++++--
 src/include/replication/worker_internal.h   |   7 ++
 4 files changed, 158 insertions(+), 21 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f95..72e5ef8a78 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -440,6 +440,7 @@ retry:
 	worker->stream_fileset = NULL;
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
+	worker->is_sync_completed = false;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8125bbd170..605c5bd4ec 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -129,15 +129,14 @@ static bool FetchTableStates(bool *started_tx);
 static StringInfo copybuf = NULL;
 
 /*
- * Exit routine for synchronization worker.
+ * Prepares the synchronization worker for reuse or exit.
  */
 void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+clean_sync_worker(void)
 {
 	/*
-	 * Commit any outstanding transaction. This is the usual case, unless
-	 * there was nothing to do for the table.
+	 * Commit any outstanding transaction. This is the usual case, unless there
+	 * was nothing to do for the table.
 	 */
 	if (IsTransactionState())
 	{
@@ -145,19 +144,38 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
+	/*
+	 * Disconnect from publisher. Otherwise reused sync workers causes
+	 * exceeding max_wal_senders
+	 */
+	if (LogRepWorkerWalRcvConn != NULL)
+	{
+		walrcv_disconnect(LogRepWorkerWalRcvConn);
+		LogRepWorkerWalRcvConn = NULL;
+	}
+
+	/* Find the leader apply worker and signal it. */
+	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+}
+
+/*
+ * Exit routine for synchronization worker.
+ */
+void
+pg_attribute_noreturn()
+finish_sync_worker(void)
+{
+	clean_sync_worker();
+
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
 	StartTransactionCommand();
 	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
+			(errmsg("logical replication table synchronization worker for subscription \"%s\" has finished",
+					MySubscription->name)));
 	CommitTransactionCommand();
 
-	/* Find the leader apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
 	/* Stop gracefully */
 	proc_exit(0);
 }
@@ -379,7 +397,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		replorigin_drop_by_name(originname, true, false);
 
-		finish_sync_worker();
+		/* Sync worker has completed synchronization of the current table. */
+		MyLogicalRepWorker->is_sync_completed = true;
+
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has finished",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid)));
+		CommitTransactionCommand();
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 1a530d3bb1..81f7a6de66 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3612,6 +3612,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
+				/*
+				 * apply_dispatch() may have gone into apply_handle_commit()
+				 * which can call process_syncing_tables_for_sync.
+				 *
+				 * process_syncing_tables_for_sync decides whether the sync of
+				 * the current table is completed. If it is completed,
+				 * streaming must be already ended. So, we can break the loop.
+				 */
+				if (MyLogicalRepWorker->is_sync_completed)
+				{
+					endofstream = true;
+					break;
+				}
+
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
@@ -3631,6 +3645,15 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+
+			/*
+			 * If is_sync_completed is true, this means that the tablesync
+			 * worker is done with synchronization. Streaming has already been
+			 * ended by process_syncing_tables_for_sync. We should move to the
+			 * next table if needed, or exit.
+			 */
+			if (MyLogicalRepWorker->is_sync_completed)
+				endofstream = true;
 		}
 
 		/* Cleanup the memory. */
@@ -3733,8 +3756,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = errcallback.previous;
 	apply_error_context_stack = error_context_stack;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/*
+	 * Tablesync workers should end streaming before exiting the main loop to
+	 * drop replication slot. Only end streaming here for apply workers.
+	 */
+	if (!am_tablesync_worker())
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
 /*
@@ -4506,6 +4533,8 @@ run_tablesync_worker(WalRcvStreamOptions *options,
 					 int originname_size,
 					 XLogRecPtr *origin_startpos)
 {
+	MyLogicalRepWorker->is_sync_completed = false;
+
 	/* Start table synchronization. */
 	start_table_sync(origin_startpos, &slotname);
 
@@ -4683,10 +4712,11 @@ InitializeLogRepWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("%s for subscription \"%s\", table \"%s\" has started",
+				(errmsg("%s for subscription \"%s\", table \"%s\" with relid %u has started",
 						get_worker_name(),
 						MySubscription->name,
-						get_rel_name(MyLogicalRepWorker->relid))));
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid)));
 	else
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
@@ -4793,11 +4823,84 @@ TablesyncWorkerMain(Datum main_arg)
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
-	run_tablesync_worker(&options,
-						 myslotname,
-						 originname,
-						 sizeof(originname),
-						 &origin_startpos);
+	/*
+	 * The loop where worker does its job. It loops until there is no relation
+	 * left to sync.
+	 */
+	for (;;)
+	{
+		List	   *rstates;
+		ListCell   *lc;
+		bool 	is_table_found = false;
+
+		run_tablesync_worker(&options,
+							 myslotname,
+							 originname,
+							 sizeof(originname),
+							 &origin_startpos);
+
+		if (IsTransactionState())
+			CommitTransactionCommand();
+
+		if (MyLogicalRepWorker->is_sync_completed)
+		{
+			/* This transaction will be committed by clean_sync_worker. */
+			StartTransactionCommand();
+
+			/*
+			 * Check if there is any table whose relation state is still INIT.
+			 * If a table in INIT state is found, the worker will not be
+			 * finished, it will be reused instead.
+			 */
+			rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+			foreach(lc, rstates)
+			{
+				SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+
+				if (rstate->state == SUBREL_STATE_SYNCDONE)
+					continue;
+
+				/*
+				 * Take exclusive lock to prevent any other sync worker from
+				 * picking the same table.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+				/*
+				 * Pick the table for the next run if it is not already picked
+				 * up by another worker.
+				 */
+				if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+				{
+					/* Update worker state for the next table */
+					MyLogicalRepWorker->relid = rstate->relid;
+					MyLogicalRepWorker->relstate = rstate->state;
+					MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+					LWLockRelease(LogicalRepWorkerLock);
+
+					/* Found a table for next iteration */
+					is_table_found = true;
+					clean_sync_worker();
+
+					StartTransactionCommand();
+					ereport(LOG,
+							(errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.",
+									get_worker_name(),
+									MySubscription->name,
+									get_rel_name(MyLogicalRepWorker->relid),
+									MyLogicalRepWorker->relid)));
+					CommitTransactionCommand();
+
+					break;
+				}
+				LWLockRelease(LogicalRepWorkerLock);
+			}
+
+			if (!is_table_found)
+				break;
+		}
+	}
 
 	finish_sync_worker();
 }
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7aba034774..1e9f8e6e72 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -56,6 +56,12 @@ typedef struct LogicalRepWorker
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
 
+	/*
+	 * Indicates whether tablesync worker has completed sycning its assigned
+	 * table. If true, no need to continue with that table.
+	 */
+	bool		is_sync_completed;
+
 	/*
 	 * Used to create the changes and subxact files for the streaming
 	 * transactions.  Upon the arrival of the first streaming transaction or
@@ -308,6 +314,7 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
 extern void finish_sync_worker(void);
+extern void clean_sync_worker(void);
 
 static inline bool
 am_tablesync_worker(void)
-- 
2.34.1

