From ad669510cc546f30b03d15c088374fbae151e2fd Mon Sep 17 00:00:00 2001
From: Shi Yu <shiy.fnst@fujitsu.com>
Date: Thu, 22 Dec 2022 13:15:27 +0800
Subject: [PATCH v5 2/2] Converting streaming tap tests by using
 logical_decoding_mode

With the new GUC logical_decoding_mode, the data size in streaming tap
tests can be reduced. This would reduce the time to run tests.
---
 src/test/subscription/t/016_stream_subxact.pl | 18 +++----
 .../t/018_stream_subxact_abort.pl             | 50 +++++++++----------
 .../t/019_stream_subxact_ddl_abort.pl         | 18 +++----
 .../subscription/t/022_twophase_cascade.pl    | 22 +++++---
 .../subscription/t/023_twophase_stream.pl     | 28 +++++------
 5 files changed, 71 insertions(+), 65 deletions(-)

diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl
index bc0a9cd053..db29f089a0 100644
--- a/src/test/subscription/t/016_stream_subxact.pl
+++ b/src/test/subscription/t/016_stream_subxact.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction containing large subtransactions
+# Test streaming of transaction containing subtransactions
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-	'logical_decoding_work_mem = 64kB');
+	'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -49,27 +49,27 @@ my $result =
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
 is($result, qq(2|2|2), 'check initial data was copied to subscriber');
 
-# Insert, update and delete enough rows to exceed 64kB limit.
+# Insert, update and delete some rows.
 $node_publisher->safe_psql(
 	'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(    3,  500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,  1000) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,  1500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,  2000) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s4;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 COMMIT;
@@ -80,7 +80,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(1667|1667|1667),
+is($result, qq(12|12|12),
 	'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
 );
 
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 551f16df6d..1458c3a0fc 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction containing multiple subtransactions and rollbacks
+# Test streaming of transaction containing multiple subtransactions and rollbacks
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-	'logical_decoding_work_mem = 64kB');
+	'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -48,25 +48,25 @@ my $result =
 	"SELECT count(*), count(c) FROM test_tab");
 is($result, qq(2|0), 'check initial data was copied to subscriber');
 
-# large (streamed) transaction with DDL, DML and ROLLBACKs
+# streamed transaction with DDL, DML and ROLLBACKs
 $node_publisher->safe_psql(
 	'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+INSERT INTO test_tab VALUES (3, md5(3::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
+INSERT INTO test_tab VALUES (5, md5(5::text));
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
+INSERT INTO test_tab VALUES (6, md5(6::text));
 ROLLBACK TO s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
+INSERT INTO test_tab VALUES (7, md5(7::text));
 ROLLBACK TO s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
+INSERT INTO test_tab VALUES (8, md5(8::text));
 SAVEPOINT s4;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
+INSERT INTO test_tab VALUES (9, md5(9::text));
 SAVEPOINT s5;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
+INSERT INTO test_tab VALUES (10, md5(10::text));
 COMMIT;
 });
 
@@ -75,24 +75,24 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2000|0),
+is($result, qq(6|0),
 	'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
 );
 
-# large (streamed) transaction with subscriber receiving out of order
-# subtransaction ROLLBACKs
+# streamed transaction with subscriber receiving out of order subtransaction
+# ROLLBACKs
 $node_publisher->safe_psql(
 	'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
+INSERT INTO test_tab VALUES (11, md5(11::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
+INSERT INTO test_tab VALUES (12, md5(12::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
+INSERT INTO test_tab VALUES (13, md5(13::text));
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
+INSERT INTO test_tab VALUES (14, md5(14::text));
 RELEASE s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
+INSERT INTO test_tab VALUES (15, md5(15::text));
 ROLLBACK TO s1;
 COMMIT;
 });
@@ -102,18 +102,18 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2500|0),
+is($result, qq(7|0),
 	'check rollback to savepoint was reflected on subscriber');
 
-# large (streamed) transaction with subscriber receiving rollback
+# streamed transaction with subscriber receiving rollback
 $node_publisher->safe_psql(
 	'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
+INSERT INTO test_tab VALUES (16, md5(16::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
+INSERT INTO test_tab VALUES (17, md5(17::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
+INSERT INTO test_tab VALUES (18, md5(18::text));
 ROLLBACK;
 });
 
@@ -122,7 +122,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2500|0), 'check rollback was reflected on subscriber');
+is($result, qq(7|0), 'check rollback was reflected on subscriber');
 
 $node_subscriber->stop;
 $node_publisher->stop;
diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
index 4d7da82b7a..c6719c1af8 100644
--- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
+++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
+# Test streaming of transaction with subtransactions, DDLs, DMLs, and
 # rollbacks
 use strict;
 use warnings;
@@ -13,7 +13,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-	'logical_decoding_work_mem = 64kB');
+	'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -49,23 +49,23 @@ my $result =
 	"SELECT count(*), count(c) FROM test_tab");
 is($result, qq(2|0), 'check initial data was copied to subscriber');
 
-# large (streamed) transaction with DDL, DML and ROLLBACKs
+# streamed transaction with DDL, DML and ROLLBACKs
 $node_publisher->safe_psql(
 	'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+INSERT INTO test_tab VALUES (3, md5(3::text));
 ALTER TABLE test_tab ADD COLUMN c INT;
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text), -4);
 ALTER TABLE test_tab ADD COLUMN d INT;
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
+INSERT INTO test_tab VALUES (5, md5(5::text), -5, 5*2);
 ALTER TABLE test_tab ADD COLUMN e INT;
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
+INSERT INTO test_tab VALUES (6, md5(6::text), -6, 6*2, -6*3);
 ALTER TABLE test_tab DROP COLUMN c;
 ROLLBACK TO s1;
-INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text), 4);
 COMMIT;
 });
 
@@ -74,7 +74,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c) FROM test_tab");
-is($result, qq(1000|500),
+is($result, qq(4|1),
 	'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
 );
 
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
index 7a797f37ba..aeefb64447 100644
--- a/src/test/subscription/t/022_twophase_cascade.pl
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -23,7 +23,6 @@ $node_A->init(allows_streaming => 'logical');
 $node_A->append_conf(
 	'postgresql.conf', qq(
 max_prepared_transactions = 10
-logical_decoding_work_mem = 64kB
 ));
 $node_A->start;
 # node_B
@@ -32,7 +31,6 @@ $node_B->init(allows_streaming => 'logical');
 $node_B->append_conf(
 	'postgresql.conf', qq(
 max_prepared_transactions = 10
-logical_decoding_work_mem = 64kB
 ));
 $node_B->start;
 # node_C
@@ -41,7 +39,6 @@ $node_C->init(allows_streaming => 'logical');
 $node_C->append_conf(
 	'postgresql.conf', qq(
 max_prepared_transactions = 10
-logical_decoding_work_mem = 64kB
 ));
 $node_C->start;
 
@@ -260,6 +257,15 @@ is($result, qq(21), 'Rows committed are present on subscriber C');
 # 2PC + STREAMING TESTS
 # ---------------------
 
+# Set logical_decoding_mode to immediate, so each change will be streamed.
+$node_A->safe_psql('postgres',
+	'ALTER SYSTEM SET logical_decoding_mode = immediate');
+$node_A->reload;
+
+$node_B->safe_psql('postgres',
+	'ALTER SYSTEM SET logical_decoding_mode = immediate');
+$node_B->reload;
+
 my $oldpid_B = $node_A->safe_psql(
 	'postgres', "
 	SELECT pid FROM pg_stat_replication
@@ -301,12 +307,12 @@ $node_B->poll_query_until(
 # Expect all data is replicated on subscriber(s) after the commit.
 ###############################
 
-# Insert, update and delete enough rows to exceed the 64kB limit.
+# Insert, update and delete some rows.
 # Then 2PC PREPARE
 $node_A->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -331,12 +337,12 @@ $node_B->wait_for_catchup($appname_C);
 # check that transaction was committed on subscriber(s)
 $result = $node_B->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
 	'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults'
 );
 $result = $node_C->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
 	'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults'
 );
 
@@ -369,7 +375,7 @@ $node_A->safe_psql(
 	BEGIN;
 	INSERT INTO test_tab VALUES (9999, 'foobar');
 	SAVEPOINT sp_inner;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	ROLLBACK TO SAVEPOINT sp_inner;
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 9b454106bd..a191129b9d 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -18,7 +18,7 @@ $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
 max_prepared_transactions = 10
-logical_decoding_work_mem = 64kB
+logical_decoding_mode = immediate
 ));
 $node_publisher->start;
 
@@ -80,11 +80,11 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber');
 ###############################
 
 # check that 2PC gets replicated to subscriber
-# Insert, update and delete enough rows to exceed the 64kB limit.
+# Insert, update and delete some rows.
 $node_publisher->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -105,7 +105,7 @@ $node_publisher->wait_for_catchup($appname);
 # check that transaction is committed on subscriber
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
 	'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
 );
 $result = $node_subscriber->safe_psql('postgres',
@@ -124,11 +124,11 @@ is($result, qq(0), 'transaction is committed on subscriber');
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -158,7 +158,7 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 
 ###############################
 # Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
-# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 1. insert, update and delete some rows.
 # 2. Then server crashes before the 2PC transaction is committed.
 # 3. After servers are restarted the pending transaction is committed.
 #
@@ -169,7 +169,7 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 $node_publisher->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -188,7 +188,7 @@ $node_publisher->wait_for_catchup($appname);
 # check inserts are visible
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
 	'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
 );
 
@@ -206,11 +206,11 @@ is($result, qq(3334|3334|3334),
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -257,11 +257,11 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
 	'postgres', q{
 	BEGIN;
-	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
 	PREPARE TRANSACTION 'test_prepared_tab';});
@@ -287,7 +287,7 @@ $node_publisher->wait_for_catchup($appname);
 # check that transaction is committed on subscriber
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3335|3335|3335),
+is($result, qq(5|5|5),
 	'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
 );
 
-- 
2.31.1

