From 9666ca6b02a66e2111a99fff1b42ce3af8c7eeb9 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 16 Oct 2023 07:34:41 +0000
Subject: [PATCH v51 2/2] support cross-version upgrade

---
 .../003_upgrade_logical_replication_slots.pl  | 510 +++++++++++-------
 1 file changed, 307 insertions(+), 203 deletions(-)

diff --git a/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
index f14a670b78..c44afe6be8 100644
--- a/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_upgrade_logical_replication_slots.pl
@@ -12,216 +12,320 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+# Verify that logical replication slots can be migrated.  This function will
+# be executed when the old cluster is PG17 and later.
+sub test_for_17_and_later
+{
+	my ($old_publisher, $new_publisher, $mode) = @_;
+
+	my $oldbindir = $old_publisher->config_data('--bindir');
+	my $newbindir = $new_publisher->config_data('--bindir');
+
+	# ------------------------------
+	# TEST: Confirm pg_upgrade fails when wrong GUC is set on new cluster
+	#
+	# There are two requirements for GUCs - wal_level and
+	# max_replication_slots, but only max_replication_slots will be tested here
+	# because it reduces the execution time of the test.
+
+	# Preparations for the subsequent test:
+	# 1. Create two slots on the old cluster
+	$old_publisher->start;
+	$old_publisher->safe_psql('postgres',
+		"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+	);
+	$old_publisher->safe_psql('postgres',
+		"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
+	);
+	$old_publisher->stop();
+
+	# 2. max_replication_slots is set to smaller than the number of slots (2)
+	#	 present on the old cluster
+	$new_publisher->append_conf('postgresql.conf',
+		"max_replication_slots = 1");
+
+	# pg_upgrade will fail because the new cluster has insufficient
+	# max_replication_slots
+	command_checks_all(
+		[
+			'pg_upgrade', '--no-sync',
+			'-d', $old_publisher->data_dir,
+			'-D', $new_publisher->data_dir,
+			'-b', $oldbindir,
+			'-B', $newbindir,
+			'-s', $new_publisher->host,
+			'-p', $old_publisher->port,
+			'-P', $new_publisher->port,
+			$mode,
+		],
+		1,
+		[
+			qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+		],
+		[qr//],
+		'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+	);
+	ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+		"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+	# Clean up
+	rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+	# Set max_replication_slots to the same value as the number of slots. Both
+	# slots will be used for subsequent tests.
+	$new_publisher->append_conf('postgresql.conf',
+		"max_replication_slots = 1");
+
+
+	# ------------------------------
+	# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
+
+	# Preparations for the subsequent test:
+	# 1. Generate extra WAL records. Because these WAL records do not get
+	#	 consumed it will cause the upcoming pg_upgrade test to fail.
+	$old_publisher->start;
+	$old_publisher->safe_psql('postgres',
+		"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;");
+
+	# 2. Advance the slot test_slot2 up to the current WAL location
+	$old_publisher->safe_psql('postgres',
+		"SELECT pg_replication_slot_advance('test_slot2', NULL);");
+
+	# 3. Emit a non-transactional message. test_slot2 detects the message so
+	#	 that the upcoming pg_upgrade will also report this slot.
+	$old_publisher->safe_psql('postgres',
+		"SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');"
+	);
+	$old_publisher->stop;
+
+	# pg_upgrade will fail because the slot still has unconsumed WAL records
+	command_checks_all(
+		[
+			'pg_upgrade', '--no-sync',
+			'-d', $old_publisher->data_dir,
+			'-D', $new_publisher->data_dir,
+			'-b', $oldbindir,
+			'-B', $newbindir,
+			'-s', $new_publisher->host,
+			'-p', $old_publisher->port,
+			'-P', $new_publisher->port,
+			$mode,
+		],
+		1,
+		[
+			qr/Your installation contains logical replication slots that can't be upgraded./
+		],
+		[qr//],
+		'run of pg_upgrade of old cluster with slot having unconsumed WAL records'
+	);
+	ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+		"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+	# Verify the reason why the logical replication slot cannot be upgraded
+	my $slots_filename;
+
+	# Find a txt file that contains a list of logical replication slots that
+	# cannot be upgraded. We cannot predict the file's path because the output
+	# directory contains a milliseconds timestamp. File::Find::find must be
+	# used.
+	find(
+		sub {
+			if ($File::Find::name =~ m/invalid_logical_relication_slots\.txt/)
+			{
+				$slots_filename = $File::Find::name;
+			}
+		},
+		$new_publisher->data_dir . "/pg_upgrade_output.d");
+
+	# And check the content. Both of slots must be reported that they have
+	# unconsumed WALs after confirmed_flush_lsn.
+	like(
+		slurp_file($slots_filename),
+		qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
+		'the previous test failed due to unconsumed WALs');
+	like(
+		slurp_file($slots_filename),
+		qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
+		'the previous test failed due to unconsumed WALs');
+
+	# Clean up
+	rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+
+	# ------------------------------
+	# TEST: Successful upgrade
+
+	# Preparations for the subsequent test:
+	# 1. Setup logical replication
+	my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+
+	$old_publisher->start;
+
+	$old_publisher->safe_psql('postgres',
+		"SELECT * FROM pg_drop_replication_slot('test_slot1');");
+	$old_publisher->safe_psql('postgres',
+		"SELECT * FROM pg_drop_replication_slot('test_slot2');");
+
+	$old_publisher->safe_psql('postgres',
+		"CREATE PUBLICATION regress_pub FOR ALL TABLES;");
+
+	# Initialize subscriber cluster
+	my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+	$subscriber->init();
+
+	$subscriber->start;
+	$subscriber->safe_psql(
+		'postgres', qq[
+		CREATE TABLE tbl (a int);
+		CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
+	]);
+	$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
+
+	# 2. Temporarily disable the subscription
+	$subscriber->safe_psql('postgres',
+		"ALTER SUBSCRIPTION regress_sub DISABLE");
+	$old_publisher->stop;
+
+	# Actual run, successful upgrade is expected
+	command_ok(
+		[
+			'pg_upgrade', '--no-sync',
+			'-d', $old_publisher->data_dir,
+			'-D', $new_publisher->data_dir,
+			'-b', $oldbindir,
+			'-B', $newbindir,
+			'-s', $new_publisher->host,
+			'-p', $old_publisher->port,
+			'-P', $new_publisher->port,
+			$mode,
+		],
+		'run of pg_upgrade of old cluster');
+	ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+		"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+	# Check that the slot 'regress_sub' has migrated to the new cluster
+	$new_publisher->start;
+	my $result = $new_publisher->safe_psql('postgres',
+		"SELECT slot_name, two_phase FROM pg_replication_slots");
+	is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
+
+	# Update the connection
+	my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+	$subscriber->safe_psql(
+		'postgres', qq[
+		ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
+		ALTER SUBSCRIPTION regress_sub ENABLE;
+	]);
+
+	# Check whether changes on the new publisher get replicated to the
+	# subscriber
+	$new_publisher->safe_psql('postgres',
+		"INSERT INTO tbl VALUES (generate_series(11, 20))");
+	$new_publisher->wait_for_catchup('regress_sub');
+	$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+	is($result, qq(20), 'check changes are replicated to the subscriber');
+
+	# Clean up
+	$subscriber->stop();
+	$new_publisher->stop();
+}
+
+# Verify that logical replication slots cannot be migrated.  This function
+# will be executed when the old cluster is PG16 and prior.
+sub test_for_16_and_prior
+{
+	my ($old_publisher, $new_publisher, $mode) = @_;
+
+	my $oldbindir = $old_publisher->config_data('--bindir');
+	my $newbindir = $new_publisher->config_data('--bindir');
+
+	# ------------------------------
+	# TEST: Confirm logical replication slots cannot be migrated
+
+	# Preparations for the subsequent test:
+	# 1. Create a slot on the old cluster
+	$old_publisher->start;
+	$old_publisher->safe_psql('postgres',
+		"SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');"
+	);
+	$old_publisher->stop;
+
+	# Actual run, successful upgrade is expected
+	command_ok(
+		[
+			'pg_upgrade', '--no-sync',
+			'-d', $old_publisher->data_dir,
+			'-D', $new_publisher->data_dir,
+			'-b', $oldbindir,
+			'-B', $newbindir,
+			'-s', $new_publisher->host,
+			'-p', $old_publisher->port,
+			'-P', $new_publisher->port,
+			$mode,
+		],
+		'run of pg_upgrade of old cluster');
+
+	ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+		"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+	# Check that the slot 'test_slot' has not migrated to the new cluster
+	$new_publisher->start;
+	my $result = $new_publisher->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(0), 'check the slot does not exist on new cluster');
+
+	# Clean up
+	$new_publisher->stop();
+}
+
 # Can be changed to test the other modes
 my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 
-# Initialize old cluster
-my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
-$old_publisher->init(allows_streaming => 'logical');
+# Initialize old cluster. Cross-version checks are also supported.
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher',
+	install_path => $ENV{oldinstall});
+
+# Skip tests if the old cluster does not support logical replication slot
+if ($old_publisher->pg_version < 9.4)
+{
+	plan skip_all => 'Logical replication slots can be available since PG9.4';
+}
+
+my %node_params = ();
+$node_params{allows_streaming} = 'logical';
+
+# Set extra params if cross-version checks are required. This is needed to
+# avoid using previously initdb'd cluster
+if (defined($ENV{oldinstall}))
+{
+	my @initdb_params = ();
+	push @initdb_params, ('--encoding', 'UTF-8');
+	push @initdb_params, ('--locale', 'C');
+
+	$node_params{extra} = \@initdb_params;
+}
+$old_publisher->init(%node_params);
+
+# Set max_wal_senders to a lower value if the old cluster is prior to PG12.
+# Such clusters regard max_wal_senders as part of max_connections, but the
+# current TAP tester sets these GUCs to the same value.
+if ($old_publisher->pg_version < 12)
+{
+	$old_publisher->append_conf('postgresql.conf', "max_wal_senders = 5");
+}
 
 # Initialize new cluster
 my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
 $new_publisher->init(allows_streaming => 'logical');
 
-# Initialize subscriber cluster
-my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$subscriber->init();
-
-my $bindir = $new_publisher->config_data('--bindir');
-
-# ------------------------------
-# TEST: Confirm pg_upgrade fails when wrong GUC is set on new cluster
-#
-# There are two requirements for GUCs - wal_level and max_replication_slots,
-# but only max_replication_slots will be tested here. This is because to
-# reduce the execution time of the test.
-
-# Preparations for the subsequent test:
-# 1. Create two slots on the old cluster
-$old_publisher->start;
-$old_publisher->safe_psql('postgres',
-	"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
-);
-$old_publisher->safe_psql('postgres',
-	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
-);
-$old_publisher->stop();
-
-# 2. max_replication_slots is set to smaller than the number of slots (2)
-#	 present on the old cluster
-$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
-
-# pg_upgrade will fail because the new cluster has insufficient max_replication_slots
-command_checks_all(
-	[
-		'pg_upgrade', '--no-sync',
-		'-d', $old_publisher->data_dir,
-		'-D', $new_publisher->data_dir,
-		'-b', $bindir,
-		'-B', $bindir,
-		'-s', $new_publisher->host,
-		'-p', $old_publisher->port,
-		'-P', $new_publisher->port,
-		$mode,
-	],
-	1,
-	[
-		qr/max_replication_slots \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
-	],
-	[qr//],
-	'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
-);
-ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
-	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
-
-# Clean up
-rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
-# Set max_replication_slots to the same value as the number of slots. Both of
-# slots will be used for subsequent tests.
-$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
-
-
-# ------------------------------
-# TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
-
-# Preparations for the subsequent test:
-# 1. Generate extra WAL records. Because these WAL records do not get consumed
-#	 it will cause the upcoming pg_upgrade test to fail.
-$old_publisher->start;
-$old_publisher->safe_psql('postgres',
-	"CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;");
-
-# 2. Advance the slot test_slot2 up to the current WAL location
-$old_publisher->safe_psql('postgres',
-	"SELECT pg_replication_slot_advance('test_slot2', NULL);");
-
-# 3. Emit a non-transactional message. test_slot2 detects the message so that
-#	 this slot will be also reported by upcoming pg_upgrade.
-$old_publisher->safe_psql('postgres',
-	"SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');"
-);
-
-$old_publisher->stop;
-
-# pg_upgrade will fail because the slot still has unconsumed WAL records
-command_checks_all(
-	[
-		'pg_upgrade', '--no-sync',
-		'-d', $old_publisher->data_dir,
-		'-D', $new_publisher->data_dir,
-		'-b', $bindir,
-		'-B', $bindir,
-		'-s', $new_publisher->host,
-		'-p', $old_publisher->port,
-		'-P', $new_publisher->port,
-		$mode,
-	],
-	1,
-	[qr/Your installation contains logical replication slots that can't be upgraded./],
-	[qr//],
-	'run of pg_upgrade of old cluster with slot having unconsumed WAL records'
-);
-ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
-	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
-
-# Verify the reason why the logical replication slot cannot be upgraded
-my $slots_filename;
-
-# Find a txt file that contains a list of logical replication slots that cannot
-# be upgraded. We cannot predict the file's path because the output directory
-# contains a milliseconds timestamp. File::Find::find must be used.
-find(
-	sub {
-		if ($File::Find::name =~ m/invalid_logical_relication_slots\.txt/)
-		{
-			$slots_filename = $File::Find::name;
-		}
-	},
-	$new_publisher->data_dir . "/pg_upgrade_output.d");
-
-# And check the content. Both of slots must be reported that they have
-# unconsumed WALs after confirmed_flush_lsn.
-like(
-	slurp_file($slots_filename),
-	qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
-	'the previous test failed due to unconsumed WALs');
-like(
-	slurp_file($slots_filename),
-	qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
-	'the previous test failed due to unconsumed WALs');
-
-# Clean up
-rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
-
-
-# ------------------------------
-# TEST: Successful upgrade
-
-# Preparations for the subsequent test:
-# 1. Setup logical replication
-my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
-
-$old_publisher->start;
-
-$old_publisher->safe_psql('postgres',
-	"SELECT * FROM pg_drop_replication_slot('test_slot1');");
-$old_publisher->safe_psql('postgres',
-	"SELECT * FROM pg_drop_replication_slot('test_slot2');");
-
-$old_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION regress_pub FOR ALL TABLES;");
-$subscriber->start;
-$subscriber->safe_psql(
-	'postgres', qq[
-	CREATE TABLE tbl (a int);
-	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true')
-]);
-$subscriber->wait_for_subscription_sync($old_publisher, 'regress_sub');
-
-# 2. Temporarily disable the subscription
-$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub DISABLE");
-$old_publisher->stop;
-
-# Actual run, successful upgrade is expected
-command_ok(
-	[
-		'pg_upgrade', '--no-sync',
-		'-d', $old_publisher->data_dir,
-		'-D', $new_publisher->data_dir,
-		'-b', $bindir,
-		'-B', $bindir,
-		'-s', $new_publisher->host,
-		'-p', $old_publisher->port,
-		'-P', $new_publisher->port,
-		$mode,
-	],
-	'run of pg_upgrade of old cluster');
-ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
-	"pg_upgrade_output.d/ removed after pg_upgrade success");
-
-# Check that the slot 'regress_sub' has migrated to the new cluster
-$new_publisher->start;
-my $result = $new_publisher->safe_psql('postgres',
-	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
-
-# Update the connection
-my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
-$subscriber->safe_psql(
-	'postgres', qq[
-	ALTER SUBSCRIPTION regress_sub CONNECTION '$new_connstr';
-	ALTER SUBSCRIPTION regress_sub ENABLE;
-]);
-
-# Check whether changes on the new publisher get replicated to the subscriber
-$new_publisher->safe_psql('postgres',
-	"INSERT INTO tbl VALUES (generate_series(11, 20))");
-$new_publisher->wait_for_catchup('regress_sub');
-$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
-is($result, qq(20), 'check changes are replicated to the subscriber');
-
-# Clean up
-$subscriber->stop();
-$new_publisher->stop();
+# Switch workloads depend on the major version of the old cluster.  Upgrading
+# logical replication slots has been supported since PG17.
+if ($old_publisher->pg_version <= 16)
+{
+	test_for_16_and_prior($old_publisher, $new_publisher, $mode);
+}
+else
+{
+	test_for_17_and_later($old_publisher, $new_publisher, $mode);
+}
 
 done_testing();
-- 
2.27.0

