From 7dbaa8c89085fbede1b21643635114810274613c Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 21 Dec 2020 20:01:02 +1100
Subject: [PATCH v5] WIP patch for the Solution1.

This patch applies onto the v30 patch set [1] from other 2PC thread:
[1] https://www.postgresql.org/message-id/CAFPTHDYA8yE6tEmQ2USYS68kNt%2BkM%3DSwKgj%3Djy4AvFD5e9-UTQ%40mail.gmail.com

====

Coded / WIP:

* tablesync slot is now permanent instead of temporary. The tablesync slot name is no longer tied to the Subscripotion slot name.

* the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions

* tablesync worked now allowing multiple tx instead of single tx

* a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart.

* if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase.

* tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for apply worker). The origin is advanced when first created.

* tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply

TODO / Known Issues:

* I think if there are crashed tablesync workers they may not be current known to DropSubscription. This might be a problem to cleanup slots and/or origin tracking belonging to those unknown workers.

* Help / comments / cleanup

* There is temporary "!!>>" excessive logging of mine scattered around which I added to help my testing during development

* Address review comments
---
 src/backend/commands/subscriptioncmds.c     | 113 ++++++++++++++
 src/backend/replication/logical/origin.c    |   4 +-
 src/backend/replication/logical/tablesync.c | 219 ++++++++++++++++++++++++----
 src/backend/replication/logical/worker.c    |  21 +--
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/replication/slot.h              |   3 +
 6 files changed, 312 insertions(+), 49 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b0745d5..e6594c8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -47,6 +48,8 @@
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 
+void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
@@ -1070,6 +1073,44 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
+		/*
+		 * Is this a tablesync worker?
+		 *
+		 * If yes, drop the tablesync's slot, and clean-up and remove replication origin tracking.
+		 */
+		if (OidIsValid(w->relid))
+		{
+			/*
+			 * FIXME - Crashed tablesync workers may also have remaining slots because I don't think
+			 * such workers are even iterated by this loop, and nobody else is removing them.
+			 */
+			{
+				/* Calculate the name of the tablesync slot. */
+				char *syncslotname = ReplicationSlotNameForTablesync(subid, w->relid);
+
+				elog(LOG, "!!>> DropSubscription: now dropping the tablesync slot \"%s\".", syncslotname);
+				ReplicationSlotDropAtPubNode(
+								NULL,
+								conninfo, /* use conninfo to make a new connection. */
+								subname,
+								syncslotname);
+
+				pfree(syncslotname);
+			}
+
+			/* Remove the tablesync's origin tracking if exists. */
+			snprintf(originname, sizeof(originname), "pg_%u_%u", subid, w->relid);
+			originid = replorigin_by_name(originname, true);
+			if (originid != InvalidRepOriginId)
+			{
+				elog(LOG, "!!>> DropSubscription: dropping origin tracking for \"%s\"", originname);
+				replorigin_drop(originid, false);
+				elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname);
+			}
+
+		}
+
+		/* Stop the worker. */
 		logicalrep_worker_stop(w->subid, w->relid);
 	}
 	list_free(subworkers);
@@ -1144,6 +1185,78 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+
+/*
+ * Drop the replication slot at the publisher node
+ * using the replication connection.
+ *
+ * If the connection is passed then just use that,
+ * otherwise connect/disconnect within this function.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname)
+{
+	StringInfoData cmd;
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
+
+	/*
+ 	 * If the connection was passed then use it.
+ 	 * If the connection was not passed then make a new connection using the passed conninfo.
+ 	 */
+	if (wrconn_given != NULL)
+	{
+		Assert (conninfo == NULL);
+		wrconn = wrconn_given;
+	}
+	else
+	{
+		char	   *err = NULL;
+
+		Assert(conninfo != NULL);
+		wrconn = walrcv_connect(conninfo, true, subname, &err);
+
+		if (wrconn == NULL)
+			ereport(ERROR,
+					(errmsg("could not connect to publisher when attempting to "
+							"drop the replication slot \"%s\"", slotname),
+					 errdetail("The error was: %s", err)));
+	}
+
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							slotname),
+					 errdetail("The error was: %s", res->err)));
+		else
+			ereport(LOG,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							slotname)));
+
+		walrcv_clear_result(res);
+	}
+	PG_CATCH();
+	{
+		/* NOP. Just gobble any ERROR. */
+	}
+	PG_END_TRY();
+
+	/* Disconnect the connection (unless using one passed) */
+	if (wrconn_given == NULL)
+		walrcv_disconnect(wrconn);
+
+	pfree(cmd.data);
+}
+
 /*
  * Internal workhorse for changing a subscription owner
  */
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 15ab8e7..6b79dc6 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -843,7 +843,7 @@ replorigin_redo(XLogReaderState *record)
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
  * setting up replorigin_session_origin_lsn and replorigin_session_origin
- * that ensures we won't loose knowledge about that after a crash if the
+ * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
  * local_commit needs to be a local LSN of the commit so that we can make sure
@@ -905,7 +905,7 @@ replorigin_advance(RepOriginId node,
 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
 
 		/* Make sure it's not used by somebody else */
-		if (replication_state->acquired_by != 0)
+		if (replication_state->acquired_by != 0 && replication_state->acquired_by != MyProcPid)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1904f34..5b349f5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,6 +102,8 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -139,6 +141,24 @@ finish_sync_worker(void)
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/*
+ 	 * Cleanup the tablesync slot.
+	 */
+	{
+		/* Calculate the name of the tablesync slot */
+		char *syncslotname = ReplicationSlotNameForTablesync(
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
+
+		elog(LOG, "!!>> finish_sync_worker: dropping the tablesync slot \"%s\".", syncslotname);
+		ReplicationSlotDropAtPubNode(
+						wrconn,
+						NULL, /* use the current connection. */
+						MySubscription->name, syncslotname);
+
+		pfree(syncslotname);
+	}
+
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
@@ -270,8 +290,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
-
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -284,6 +302,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+ 		 * UpdateSubscriptionRelState must be called within a transaction.
+ 		 * That transaction will be ended within the finish_sync_worker().
+ 		 */
+ 		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -416,6 +443,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The cleanup is done here instead of in the finish_sync_worker function because
+				 * if the tablesync worker process attempted to call replorigin_drop then that will
+				 * hang because the replorigin_drop considers the owning tablesync PID as "busy".
+				 */
+				{
+					char        originname[NAMEDATALEN];
+					RepOriginId originid;
+
+					snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+					originid = replorigin_by_name(originname, true);
+					elog(LOG, "!!>> apply worker: find tablesync origin tracking for \"%s\".", originname);
+					if (OidIsValid(originid))
+					{
+						elog(LOG, "!!>> apply worker: dropping tablesync origin tracking for \"%s\".", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> apply worker: dropped tablesync origin tracking for \"%s\".", originname);
+					}
+				}
 			}
 		}
 		else
@@ -808,6 +856,32 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+/*
+ * Determine the tablesync slot name.
+ *
+ * The returned slot name is palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid)
+{
+	char *syncslotname;
+
+	/*
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.
+	 *
+	 * The name is calculated as pg_%u_sync_%u (3 + 10 + 6 + 10 + '\0').
+	 * (It's actually the NAMEDATALEN on the remote that matters, but this
+	 * scheme will also work reasonably if that is different.)
+	 */
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
+
+	syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -825,6 +899,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	bool		copied_ok;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -850,17 +925,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
 						MySubscription->oid,
 						MyLogicalRepWorker->relid);
 
@@ -875,7 +941,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed/etc
+		 * before it was able to finish normally.
+		 */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_COPYDONE.");
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -891,9 +968,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -919,29 +993,110 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot.  This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
+	elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname);
+	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	copied_ok = false;
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
+
+		copied_ok = true;
+	}
+	PG_FINALLY();
+	{
+		/* If something failed during copy table then cleanup the created slot. */
+		if (!copied_ok)
+		{
+			extern void ReplicationSlotDropAtPubNode(
+				WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+			elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Drop the tablesync slot \"%s\".", slotname);
+			ReplicationSlotDropAtPubNode(
+							wrconn,
+							NULL, /* use the current connection. */
+							MySubscription->name,
+							slotname);
+
+			pfree(slotname);
+		}
+	}
+	PG_END_TRY();
 
-	table_close(rel, NoLock);
+	CommitTransactionCommand();
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+	/* Update the persisted state to indicate the COPY phase is done; make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_COPYDONE,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
+
+copy_table_done:
+
+	/* Setup replication origin tracking. */
+	{
+		char		originname[NAMEDATALEN];
+		RepOriginId originid;
+
+		StartTransactionCommand();
+		snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+		originid = replorigin_by_name(originname, true);
+		if (!OidIsValid(originid)) 
+		{
+			/* 
+			 * Origin tracking does not exist. Create it now, and advance to LSN got from walrcv_create_slot.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname);
+			originid = replorigin_create(originname);
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname);
+			replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+							   true /* go backward */ , true /* WAL log */ );
+		}
+		else
+		{
+			/*
+			 * Origin tracktrack already exists.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname);
+			*origin_startpos = replorigin_session_get_progress(false);
+		}
+		elog(LOG, "!!>> LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", 
+				   originname,
+				   (uint32) (*origin_startpos >> 32),
+				   (uint32) *origin_startpos);
+		CommitTransactionCommand();
+	}
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9271f87..a60e9fd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -771,8 +771,7 @@ apply_handle_prepare_txn(LogicalRepPrepareData *prepare_data)
 
 	Assert(prepare_data->prepare_lsn == remote_final_lsn);
 
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * BeginTransactionBlock is necessary to balance the
@@ -1079,12 +1078,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -1161,9 +1156,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -1190,8 +1183,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1350,8 +1342,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index acc2926..e9f2b3f 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_COPYDONE	'C' /* tablesync copy phase is completed */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab69..bbc6b11 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

