From 84cdb2bf2aef276255971be47cf48c6dde7e0564 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Thu, 17 Jun 2021 12:39:19 -0700
Subject: [PATCH v2] Optionally disabling subscriptions on error

Logical replication apply workers for a subscription can easily get
stuck in an infinite loop of attempting to apply a change,
triggering an error (such as a constraint violation), exiting with
an error written to the subscription worker log, and restarting.

To partially remedy the situation, adding a new
subscription_parameter named 'disable_on_error'.  To be consistent
with old behavior, the parameter defaults to false.  When true, the
apply worker catches errors thrown, and for errors that are deemed
not to be transient, disables the subscription in order to break the
loop.  A new column in the pg_subscription table helps diagnose the
situation:  when this has occurred, 'suberrmsg' includes the message
field of the error.  The error is still also written to the logs.

In addition to helping on production systems, this makes writing TAP
tests involving error conditions simpler.  Rather than having to
open and parse the apply worker's log file, the test can query the
pg_subscription table.  It also helps that the workers don't go into
an infinite loop during the test.
---
 doc/src/sgml/catalogs.sgml                    |  10 +
 doc/src/sgml/ref/create_subscription.sgml     |  12 +
 src/backend/catalog/pg_subscription.c         |   9 +
 src/backend/catalog/system_views.sql          |   8 +-
 src/backend/commands/subscriptioncmds.c       |  57 +++++
 src/backend/replication/logical/launcher.c    |   1 +
 src/backend/replication/logical/worker.c      | 189 +++++++++++++++-
 src/bin/pg_dump/pg_dump.c                     |   6 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/include/catalog/pg_subscription.h         |   8 +
 src/test/perl/PostgresNode.pm                 |  40 ++++
 .../subscription/t/022_disable_on_error.pl    | 205 ++++++++++++++++++
 12 files changed, 540 insertions(+), 6 deletions(-)
 create mode 100644 src/test/subscription/t/022_disable_on_error.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f517a7d4af..6fd95cb7ca 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7671,6 +7671,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>suberrmsg</structfield> <type>text</type>
+      </para>
+      <para>
+       The message from the error which disabled the subscription, if it has
+       been automatically disabled.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subpublications</structfield> <type>text[]</type>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e812beee37..6ec6524901 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -117,6 +117,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should be automatically disabled
+          if replicating data from the publisher triggers non-transient errors
+          such as referential integrity or permissions errors.  The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>enabled</literal> (<type>boolean</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 29fc4218cd..5781da062c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->disableonerr = subform->subdisableonerr;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 
@@ -95,6 +96,14 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->synccommit = TextDatumGetCString(datum);
 
+	/* Get errmsg */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_suberrmsg,
+							&isnull);
+	Assert(!isnull);
+	sub->errmsg = TextDatumGetCString(datum);
+
 	/* Get publications */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
 							tup,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 999d984068..0e840c1a34 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1252,8 +1252,10 @@ CREATE VIEW pg_replication_origin_status AS
 
 REVOKE ALL ON pg_replication_origin_status FROM public;
 
--- All columns of pg_subscription except subconninfo are publicly readable.
+-- All columns of pg_subscription except subconninfo and suberrmsg are publicly
+-- readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subslotname, subsynccommit, subpublications)
+GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subdisableonerr,
+			  subbinary, substream, subslotname, subsynccommit,
+			  subpublications)
     ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 75e195f286..9f8adf529c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -63,6 +63,7 @@ static void
 parse_subscription_options(List *options,
 						   bool *connect,
 						   bool *enabled_given, bool *enabled,
+						   bool *disableonerr_given, bool *disableonerr,
 						   bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data,
@@ -84,13 +85,24 @@ parse_subscription_options(List *options,
 		*connect = true;
 	if (enabled)
 	{
+		Assert(enabled_given);
+
 		*enabled_given = false;
 		*enabled = true;
 	}
+	if (disableonerr)
+	{
+		Assert(disableonerr_given);
+
+		*disableonerr_given = false;
+		*disableonerr = false;
+	}
 	if (create_slot)
 		*create_slot = true;
 	if (slot_name)
 	{
+		Assert(slot_name_given);
+
 		*slot_name_given = false;
 		*slot_name = NULL;
 	}
@@ -102,11 +114,15 @@ parse_subscription_options(List *options,
 		*refresh = true;
 	if (binary)
 	{
+		Assert(binary_given);
+
 		*binary_given = false;
 		*binary = false;
 	}
 	if (streaming)
 	{
+		Assert(streaming_given);
+
 		*streaming_given = false;
 		*streaming = false;
 	}
@@ -136,6 +152,16 @@ parse_subscription_options(List *options,
 			*enabled_given = true;
 			*enabled = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "disable_on_error") == 0 &&
+				 disableonerr)
+		{
+			if (*disableonerr_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			*disableonerr_given = true;
+			*disableonerr = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
 		{
 			if (create_slot_given)
@@ -334,6 +360,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		connect;
 	bool		enabled_given;
 	bool		enabled;
+	bool		disableonerr_given;
+	bool		disableonerr;
 	bool		copy_data;
 	bool		streaming;
 	bool		streaming_given;
@@ -355,6 +383,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options,
 							   &connect,
 							   &enabled_given, &enabled,
+							   &disableonerr_given,
+							   &disableonerr,
 							   &create_slot,
 							   &slotname_given, &slotname,
 							   &copy_data,
@@ -427,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subdisableonerr - 1] =
+		BoolGetDatum(disableonerr);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
 	values[Anum_pg_subscription_subconninfo - 1] =
@@ -438,6 +470,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
 		CStringGetTextDatum(synchronous_commit);
+	values[Anum_pg_subscription_suberrmsg - 1] =
+		CStringGetTextDatum("");
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
@@ -799,6 +833,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
+				bool		disableonerr;
+				bool		disableonerr_given;
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
@@ -810,6 +846,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   &disableonerr_given,
+										   &disableonerr,
 										   NULL,	/* no "create_slot" */
 										   &slotname_given, &slotname,
 										   NULL,	/* no "copy_data" */
@@ -818,6 +856,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   &binary_given, &binary,
 										   &streaming_given, &streaming);
 
+				if (disableonerr_given)
+				{
+					values[Anum_pg_subscription_subdisableonerr - 1] =
+						BoolGetDatum(disableonerr);
+					replaces[Anum_pg_subscription_subdisableonerr - 1 ] =
+						true;
+				}
+
 				if (slotname_given)
 				{
 					if (sub->enabled && !slotname)
@@ -867,6 +913,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   &enabled_given, &enabled,
+										   NULL, NULL,	/* no "disableonerr" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   NULL,	/* no "copy_data" */
@@ -885,6 +932,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					BoolGetDatum(enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
+				/*
+				 * Manually enabling or disabling a subscription clears any
+				 * error which automatically disabled it.
+				 */
+				values[Anum_pg_subscription_suberrmsg - 1] = CStringGetTextDatum("");
+				replaces[Anum_pg_subscription_suberrmsg - 1] = true;
+
 				if (enabled)
 					ApplyLauncherWakeupAtCommit();
 
@@ -912,6 +966,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disableonerr" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   &copy_data,
@@ -958,6 +1013,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disableonerr" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   isadd ? &copy_data : NULL,	/* for drop, no
@@ -1005,6 +1061,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
 										   NULL, NULL,	/* no "enabled" */
+										   NULL, NULL,	/* no "disableonerr" */
 										   NULL,	/* no "create_slot" */
 										   NULL, NULL,	/* no "slot_name" */
 										   &copy_data,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e3b11daa89..e7279ea86b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -132,6 +132,7 @@ get_subscription_list(void)
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
 		sub->enabled = subform->subenabled;
+		sub->disableonerr = subform->subdisableonerr;
 		sub->name = pstrdup(NameStr(subform->subname));
 		/* We don't fill fields we are not interested in. */
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bbb659dad0..54fd61afe5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -62,6 +62,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -2412,6 +2413,160 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
+/*
+ * Errors which are transient, network protocol related, or resource exhaustion
+ * related, should not disable a subscription.  These may clear up without user
+ * intervention in the subscription, schema, or data being replicated.
+ */
+static bool
+IsSubscriptionDisablingError(void)
+{
+	switch (geterrcode())
+	{
+		case ERRCODE_CONNECTION_EXCEPTION:
+		case ERRCODE_CONNECTION_DOES_NOT_EXIST:
+		case ERRCODE_CONNECTION_FAILURE:
+		case ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION:
+		case ERRCODE_SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION:
+		case ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN:
+		case ERRCODE_PROTOCOL_VIOLATION:
+		case ERRCODE_INSUFFICIENT_RESOURCES:
+		case ERRCODE_DISK_FULL:
+		case ERRCODE_OUT_OF_MEMORY:
+		case ERRCODE_TOO_MANY_CONNECTIONS:
+		case ERRCODE_CONFIGURATION_LIMIT_EXCEEDED:
+		case ERRCODE_PROGRAM_LIMIT_EXCEEDED:
+		case ERRCODE_STATEMENT_TOO_COMPLEX:
+		case ERRCODE_TOO_MANY_COLUMNS:
+		case ERRCODE_TOO_MANY_ARGUMENTS:
+		case ERRCODE_OPERATOR_INTERVENTION:
+		case ERRCODE_QUERY_CANCELED:
+		case ERRCODE_ADMIN_SHUTDOWN:
+		case ERRCODE_CRASH_SHUTDOWN:
+		case ERRCODE_CANNOT_CONNECT_NOW:
+		case ERRCODE_DATABASE_DROPPED:
+		case ERRCODE_IDLE_SESSION_TIMEOUT:
+			return false;
+		default:
+			break;
+	}
+
+	return true;
+}
+
+/*
+ * Recover from a possibly aborted transaction state and disable the current
+ * subscription.
+ */
+static ErrorData *
+DisableSubscriptionOnError(MemoryContext mcxt)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription subform;
+	ErrorData  *edata;
+
+	/*
+	 * Clean up from the error and get a fresh transaction in which to
+	 * disable the subscription.
+	 */
+	MemoryContextSwitchTo(mcxt);
+	edata = CopyErrorData();
+
+	ereport(LOG,
+			(errmsg("logical replication subscription \"%s\" will be disabled due to error: %s",
+					MySubscription->name, edata->message)));
+
+	AbortOutOfAnyTransaction();
+	FlushErrorState();
+
+	StartTransactionCommand();
+
+	/* Look up our subscription in the catalogs */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(MySubscription->name));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("subscription \"%s\" does not exist",
+						MySubscription->name)));
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = subform->oid;
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+	/*
+	 * We would not be here unless this subscription's disableonerr
+	 * field was true when our worker began applying changes, but check
+	 * whether that field has changed in the interim.
+	 */
+	if (!subform->subdisableonerr)
+		ReThrowError(edata);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set the subscription to disabled, and note the reason. */
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+	replaces[Anum_pg_subscription_subenabled - 1] = true;
+	values[Anum_pg_subscription_suberrmsg - 1] =
+		CStringGetTextDatum(edata->message ? edata->message : "");
+	replaces[Anum_pg_subscription_suberrmsg - 1] = true;
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	CommitTransactionCommand();
+
+	return edata;
+}
+
+/*
+ * Apply main loop with logic to disable a stuck subscription
+ *
+ * Applies changes, and for non-transient errors, catches the error and
+ * disables the subscription before rethrowing.
+ */
+static void
+LogicalRepApplyLoopTry(XLogRecPtr last_received)
+{
+	MemoryContext mcxt = CurrentMemoryContext;
+	bool		did_error = false;
+
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(last_received);
+	}
+	PG_CATCH();
+	{
+		if (IsSubscriptionDisablingError())
+			did_error = true;
+		else
+			PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	/*
+	 * If we caught an error above, disable the subscription and record copies
+	 * of the relevant error text, then rethrow the error we caught so it gets
+	 * logged and our process exits appropriately.
+	 */
+	if (did_error)
+		ReThrowError(DisableSubscriptionOnError(mcxt));
+}
+
 /*
  * Send a Standby Status Update message to server.
  *
@@ -3167,7 +3322,34 @@ ApplyWorkerMain(Datum main_arg)
 		char	   *syncslotname;
 
 		/* This is table synchronization worker, call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		if (MySubscription->disableonerr)
+		{
+			MemoryContext mcxt = CurrentMemoryContext;
+			bool	did_error = false;
+
+			PG_TRY();
+			{
+				syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+			}
+			PG_CATCH();
+			{
+				if (IsSubscriptionDisablingError())
+					did_error = true;
+				else
+					PG_RE_THROW();
+			}
+			PG_END_TRY();
+
+			/*
+			 * If we caught an error above, disable the subscription and record copies
+			 * of the relevant error text, then rethrow the error we caught so it gets
+			 * logged and our process exits appropriately.
+			 */
+			if (did_error)
+				ReThrowError(DisableSubscriptionOnError(mcxt));
+		}
+		else
+			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3241,7 +3423,10 @@ ApplyWorkerMain(Datum main_arg)
 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	if (MySubscription->disableonerr)
+		LogicalRepApplyLoopTry(origin_startpos);
+	else
+		LogicalRepApplyLoop(origin_startpos);
 
 	proc_exit(0);
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8f53cc7c3b..16d9d006d1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4307,6 +4307,7 @@ getSubscriptions(Archive *fout)
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
+	int			i_suberrmsg;
 	int			i_subpublications;
 	int			i_subbinary;
 	int			i,
@@ -4338,7 +4339,7 @@ getSubscriptions(Archive *fout)
 					  "SELECT s.tableoid, s.oid, s.subname,\n"
 					  " (%s s.subowner) AS rolname,\n"
 					  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
-					  " s.subpublications,\n",
+					  " s.suberrmsg, s.subpublications,\n",
 					  username_subquery);
 
 	if (fout->remoteVersion >= 140000)
@@ -4367,6 +4368,7 @@ getSubscriptions(Archive *fout)
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
+	i_suberrmsg = PQfnumber(res, "suberrmsg");
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
@@ -4389,6 +4391,8 @@ getSubscriptions(Archive *fout)
 			subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
 		subinfo[i].subsynccommit =
 			pg_strdup(PQgetvalue(res, i, i_subsynccommit));
+		subinfo[i].suberrmsg =
+			pg_strdup(PQgetvalue(res, i, i_suberrmsg));
 		subinfo[i].subpublications =
 			pg_strdup(PQgetvalue(res, i, i_subpublications));
 		subinfo[i].subbinary =
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 49e1b0a09c..3c2a8dd38c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -638,6 +638,7 @@ typedef struct _SubscriptionInfo
 	char	   *subbinary;
 	char	   *substream;
 	char	   *subsynccommit;
+	char	   *suberrmsg;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0060ebfb40..b094bd082f 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -52,6 +52,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subdisableonerr;	/* True if apply errors should
+									 * disable the subscription upon error */
+
 	bool		subbinary;		/* True if the subscription wants the
 								 * publisher to send data in binary */
 
@@ -67,6 +70,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	/* Synchronous commit setting for worker */
 	text		subsynccommit BKI_FORCE_NOT_NULL;
 
+	/* Message from error which disabled this subscription */
+	text		suberrmsg BKI_FORCE_NOT_NULL;
+
 	/* List of publications subscribed to */
 	text		subpublications[1] BKI_FORCE_NOT_NULL;
 #endif
@@ -91,12 +97,14 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		disableonerr;	/* Whether errors automatically disable */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
+	char	   *errmsg;			/* Message from error which disabled */
 	List	   *publications;	/* List of publication names to subscribe to */
 } Subscription;
 
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 2027cbf43d..33dc9d5367 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2442,6 +2442,46 @@ sub wait_for_slot_catchup
 	return;
 }
 
+=pot
+
+=item $node->wait_for_subscription($dbname, @subcriptions)
+
+Wait for the named subscriptions to catch up or to be disabled.
+
+=cut
+
+sub wait_for_subscriptions
+{
+	my ($self, $dbname, @subscriptions) = @_;
+
+	# Unique-ify the subscriptions passed by the caller
+	my %unique = map { $_ => 1 } @subscriptions;
+	my @unique = sort keys %unique;
+	my $unique_count = scalar(@unique);
+
+	# Construct a SQL list from the unique subscription names
+	my @escaped = map { s/'/''/g; s/\\/\\\\/g; $_ } @unique;
+	my $sublist = join(', ', map { "'$_'" } @escaped);
+
+	my $polling_sql = qq(
+		SELECT COUNT(1) = $unique_count FROM
+			(SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				LEFT JOIN pg_catalog.pg_subscription_rel sr
+				ON sr.srsubid = s.oid
+				WHERE (sr IS NULL OR sr.srsubstate IN ('s', 'r'))
+				  AND s.subname IN ($sublist)
+				  AND s.subenabled IS TRUE
+			 UNION
+			 SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				WHERE s.subname IN ($sublist)
+				  AND s.subenabled IS FALSE
+			) AS synced_or_disabled
+		);
+	return $self->poll_query_until($dbname, $polling_sql);
+}
+
 =pod
 
 =item $node->query_hash($dbname, $query, @columns)
diff --git a/src/test/subscription/t/022_disable_on_error.pl b/src/test/subscription/t/022_disable_on_error.pl
new file mode 100644
index 0000000000..5c10981a6b
--- /dev/null
+++ b/src/test/subscription/t/022_disable_on_error.pl
@@ -0,0 +1,205 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+
+my @schemas = qw(s1 s2);
+my ($schema, $cmd);
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical schema, table and index on both the publisher and
+# subscriber
+#
+for $schema (@schemas)
+{
+	$cmd = qq(
+CREATE SCHEMA $schema;
+CREATE TABLE $schema.tbl (i INT);
+ALTER TABLE $schema.tbl REPLICA IDENTITY FULL;
+CREATE INDEX ${schema}_tbl_idx ON $schema.tbl(i));
+	$node_publisher->safe_psql('postgres', $cmd);
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Create non-unique data in both schemas on the publisher.
+#
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (1), (1), (1));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Create an additional unique index in schema s1 on the subscriber only.  When
+# we create subscriptions, below, this should cause subscription "s1" on the
+# subscriber to fail during initial synchronization and to get automatically
+# disabled.
+#
+$cmd = qq(CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Create publications and subscriptions linking the schemas on
+# the publisher with those on the subscriber.  This tests that the
+# uniqueness violations cause subscription "s1" to fail during
+# initial synchronization.
+#
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+for $schema (@schemas)
+{
+	# Create the publication for this table
+	$cmd = qq(
+CREATE PUBLICATION $schema FOR TABLE $schema.tbl);
+	$node_publisher->safe_psql('postgres', $cmd);
+
+	# Create the subscription for this table
+	$cmd = qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$publisher_connstr'
+	PUBLICATION $schema
+	WITH (disable_on_error = true));
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Wait for the initial subscription synchronizations to finish or fail.
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+$cmd = qq(
+SELECT suberrmsg FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	qq(duplicate key value violates unique constraint "s1_tbl_unique"),
+	"subscription s1 disabled during synchronization by unique key violation");
+
+# Subscription "s2" should have copied the initial data without incident.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT i, COUNT(*) FROM s2.tbl GROUP BY i);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"1|3",
+	"subscription s2 replicated initial data");
+
+# Enter unique data for both schemas on the publisher.  This should succeed on
+# the publisher node, and not cause any additional problems on the subscriber
+# side either, though disabled subscription "s1" should not replicate anything.
+#
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (2));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that the
+# problems encountered by subscription "s1" do not cause subscription "s2" to
+# get stuck.
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should still be disabled and have replicated no data
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 still disabled");
+$cmd = qq(
+SELECT suberrmsg FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	qq(duplicate key value violates unique constraint "s1_tbl_unique"),
+	"subscription s1 still disabled by unique key violation");
+
+# Subscription "s2" should still be enabled and have replicated all changes
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s2 replicated data");
+
+# Drop the unique index on "s1" which caused the subscription to be disabled
+#
+$cmd = qq(DROP INDEX s1.s1_tbl_unique);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Re-enable the subscription "s1"
+#
+$cmd = q(ALTER SUBSCRIPTION s1 ENABLE);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Wait for the data to replicate
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check that we have the new data in s1.tbl
+#
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s1.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s1 replicated data");
+
+# Delete the data from the subscriber only, and recreate the unique index
+#
+$cmd = q(
+DELETE FROM s1.tbl;
+CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Add more non-unique data to the publisher
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (3), (3), (3));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that
+# uniqueness violations encountered during replication cause s1 to be disabled.
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+$cmd = qq(
+SELECT suberrmsg FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	qq(duplicate key value violates unique constraint "s1_tbl_unique"),
+	"subscription s1 disabled during replication by unique key violation");
+
+# Subscription "s2" should have copied the initial data without incident.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"3|7",
+	"subscription s2 replicated additional data");
+
+$node_subscriber->stop;
+$node_publisher->stop;
-- 
2.21.1 (Apple Git-122.3)

