From f7a61501dc990d21ce522cb9de749e5ecb331831 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Fri, 19 Nov 2021 16:08:40 -0800
Subject: [PATCH v3] Respect permissions within logical replication

Prevent logical replication workers from performing insert, update,
delete, truncate, or copy commands on tables unless the subscription
owner has permission to do so.
---
 doc/src/sgml/logical-replication.sgml       |  30 ++++--
 src/backend/commands/subscriptioncmds.c     |   2 +
 src/backend/replication/logical/tablesync.c |  13 +++
 src/backend/replication/logical/worker.c    |  30 ++++++
 src/test/subscription/t/026_nosuperuser.pl  | 107 ++++++++++++++++++++
 5 files changed, 174 insertions(+), 8 deletions(-)
 create mode 100644 src/test/subscription/t/026_nosuperuser.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 45b2e1e28f..7083c46d97 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -330,6 +330,13 @@
    will simply be skipped.
   </para>
 
+  <para>
+   Logical replication operations are performed with the privileges of the role
+   which owns the subscription.  Permissions failures may cause replication
+   conflicts.  (Note that <productname>PostgreSQL</productname> prior to
+   version 15.0 did no permissions checking when applying changes.)
+  </para>
+
   <para>
    A conflict will produce an error and will stop the replication; it must be
    resolved manually by the user.  Details about the conflict can be found in
@@ -337,7 +344,7 @@
   </para>
 
   <para>
-   The resolution can be done either by changing data on the subscriber so
+   The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
    transaction that conflicts with the existing data.  The transaction can be
    skipped by calling the <link linkend="pg-replication-origin-advance">
@@ -530,9 +537,9 @@
 
   <para>
    A user able to modify the schema of subscriber-side tables can execute
-   arbitrary code as a superuser.  Limit ownership
-   and <literal>TRIGGER</literal> privilege on such tables to roles that
-   superusers trust.  Moreover, if untrusted users can create tables, use only
+   arbitrary code as the role which owns any subscription which modifies those tables.  Limit ownership
+   and <literal>TRIGGER</literal> privilege on such tables to trusted roles.
+   Moreover, if untrusted users can create tables, use only
    publications that list tables explicitly.  That is to say, create a
    subscription <literal>FOR ALL TABLES</literal> or
    <literal>FOR ALL TABLES IN SCHEMA</literal> only when superusers trust
@@ -576,13 +583,20 @@
 
   <para>
    The subscription apply process will run in the local database with the
-   privileges of a superuser.
+   privileges of the subscription owner.
+  </para>
+
+  <para>
+   On the publisher, privileges are only checked once at the start of a
+   replication connection and are not re-checked as each change record is read.
   </para>
 
   <para>
-   Privileges are only checked once at the start of a replication connection.
-   They are not re-checked as each change record is read from the publisher,
-   nor are they re-checked for each change when applied.
+   On the subscriber, the subscription owner's privileges are re-checked for
+   each transaction when applied. If a worker is in the process of applying a
+   transaction when the ownership of the subscription is changed by a
+   concurrent transaction, the application of the current transaction will
+   continue under the old owner's privileges.
   </para>
  </sect1>
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c47ba26369..27a782b6e1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1474,6 +1474,8 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId,
 							  form->oid, 0);
+
+	ApplyLauncherWakeupAtCommit();
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..2400ef8c45 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -924,6 +925,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
+	AclResult	aclresult;
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
@@ -1042,6 +1044,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 */
 	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
+	/*
+	 * Check that our table sync worker has permission to insert into the
+	 * target table.
+	 */
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+								  ACL_INSERT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->rd_rel->relkind),
+					   RelationGetRelationName(rel));
+
 	/*
 	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
 	 * ensures that both the replication slot we create (see below) and the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ae1b391bda..da57d8461c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -179,6 +179,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "tcop/tcopprot.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/dynahash.h"
@@ -1540,6 +1541,7 @@ apply_handle_insert(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -1562,6 +1564,12 @@ apply_handle_insert(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_INSERT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -1662,6 +1670,7 @@ apply_handle_update(StringInfo s)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	LogicalRepTupleData oldtup;
@@ -1689,6 +1698,12 @@ apply_handle_update(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_UPDATE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -1829,6 +1844,7 @@ apply_handle_delete(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData oldtup;
 	LogicalRepRelId relid;
+	AclResult	aclresult;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -1851,6 +1867,12 @@ apply_handle_delete(StringInfo s)
 		end_replication_step();
 		return;
 	}
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel), GetUserId(),
+								  ACL_DELETE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult,
+					   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+					   get_rel_name(rel->localreloid));
 
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
@@ -2223,6 +2245,7 @@ apply_handle_truncate(StringInfo s)
 	{
 		LogicalRepRelId relid = lfirst_oid(lc);
 		LogicalRepRelMapEntry *rel;
+		AclResult	aclresult;
 
 		rel = logicalrep_rel_open(relid, lockmode);
 		if (!should_apply_changes_for_rel(rel))
@@ -2234,6 +2257,12 @@ apply_handle_truncate(StringInfo s)
 			logicalrep_rel_close(rel, lockmode);
 			continue;
 		}
+		aclresult = pg_class_aclcheck(RelationGetRelid(rel->localrel),
+									  GetUserId(), ACL_TRUNCATE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult,
+						   get_relkind_objtype(rel->localrel->rd_rel->relkind),
+						   get_rel_name(rel->localreloid));
 
 		remote_rels = lappend(remote_rels, rel);
 		rels = lappend(rels, rel->localrel);
@@ -2915,6 +2944,7 @@ maybe_reread_subscription(void)
 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
+		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
diff --git a/src/test/subscription/t/026_nosuperuser.pl b/src/test/subscription/t/026_nosuperuser.pl
new file mode 100644
index 0000000000..05717d5111
--- /dev/null
+++ b/src/test/subscription/t/026_nosuperuser.pl
@@ -0,0 +1,107 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils qw(slurp_file);
+use Test::More tests => 3;
+use Time::HiRes qw(usleep gettimeofday tv_interval);
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# Create identical structres on publisher and subscriber
+for my $node ($node_publisher, $node_subscriber)
+{
+	$node->safe_psql('postgres', qq(
+	CREATE ROLE regress_admin SUPERUSER LOGIN;
+	CREATE ROLE regress_alice NOSUPERUSER NOLOGIN;
+	GRANT CREATE ON DATABASE postgres TO regress_alice;
+	SET SESSION AUTHORIZATION regress_alice;
+	CREATE SCHEMA alice;
+	CREATE TABLE alice.tbl (i INTEGER);
+	GRANT USAGE ON SCHEMA alice TO regress_admin;
+	ALTER TABLE alice.tbl REPLICA IDENTITY FULL;
+	GRANT SELECT ON TABLE alice.tbl TO regress_admin;
+	));
+}
+
+# Alice creates data, adjusts privileges, and creates a publication on
+# publisher node
+$node_publisher->safe_psql('postgres', qq(
+	SET SESSION AUTHORIZATION regress_alice;
+	INSERT INTO alice.tbl (i) VALUES (1);
+	CREATE PUBLICATION alice FOR TABLE alice.tbl;
+));
+
+# Superuser creates a subscription on the subscriber node
+$node_subscriber->safe_psql('postgres', qq(
+	SET SESSION AUTHORIZATION regress_admin;
+	CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for subscription catch-up on subscriber
+$node_publisher->wait_for_catchup('admin_sub');
+
+ok(1, "initial sync finished");
+
+my $result = $node_subscriber->safe_psql('postgres', qq(
+	SELECT COUNT(i), MAX(i) FROM alice.tbl));
+is ($result, '1|1', "superuser admin replicates data");
+
+# Revoke superuser from admin on the subscriber, without which the admin has no
+# authority to insert into alice's table
+$node_subscriber->safe_psql('postgres', "ALTER ROLE regress_admin NOSUPERUSER");
+
+# Alice inserts new data into her table on the publisher node
+$node_publisher->safe_psql('postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+INSERT INTO alice.tbl (i) VALUES (2);
+));
+
+# We cannot wait for catchup, as that will hang.  Instead, we poll for the
+# error message in the logs.  To avoid hanging infinitely, retry for at most
+# 180 seconds.  Also, in case the data replicates rather than failing, check
+# for it rather than burning a full three minutes waiting.
+#
+my $log;
+my $t0 = [gettimeofday];
+RETRY: {
+	$log = slurp_file($node_subscriber->logfile);
+	unless($log =~ qr/ERROR:  permission denied for table tbl/msi)
+	{
+		my $elapsed = tv_interval ( $t0 );
+
+		# For the two seconds, retry every 1/10th second
+		if ($elapsed < 1.0)
+		{
+			usleep(100000);	# sleep 1/10th second
+			redo RETRY;
+		}
+		# For the remainder of the test, retry no more than once per second,
+		# and also check if the data has come through, as there is no point
+		# waiting for the error if it we know it won't happen.
+		elsif ($elapsed < 179.0)
+		{
+			$result = $node_subscriber->safe_psql('postgres', qq(
+				SELECT MAX(i) FROM alice.tbl));
+			last RETRY if $result > 1;
+			sleep 1;
+			redo RETRY;
+		}
+	}
+}
+
+like ($log, qr/ERROR:  permission denied for table tbl/msi,
+	'subscriber lacks permission after superuser is revoked');
-- 
2.21.1 (Apple Git-122.3)

