From be75ad4924834d4018c222125eeeae4a9d6d9a26 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Wed, 27 Dec 2023 10:22:39 +0530
Subject: [PATCH v2] Track conflicting_cause in pg_replication_slots

This patch changes the existing 'conflicting' field to 'conflict_cause'
in pg_replication_slots. This new field indicates the cause of logical
slot's conflict with recovery. It is always NULL for physical slots,
as well as for logical slots which are not invalidated. The non-NULL
values indicate that the slot is marked as invalidated. Possible values
are:
    wal_removed = required WAL has been removed.
    rows_removed = required rows have been removed.
    wal_level_insufficient = wal_level insufficient on the primary server.
---
 doc/src/sgml/system-views.sgml                | 29 ++++++--
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/replication/slotfuncs.c           | 22 ++++--
 src/bin/pg_upgrade/info.c                     |  4 +-
 src/include/catalog/pg_proc.dat               |  4 +-
 .../t/035_standby_logical_decoding.pl         | 72 ++++++++++---------
 src/test/regress/expected/rules.out           |  4 +-
 7 files changed, 87 insertions(+), 50 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 0ef1745631..ca0cc0d739 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2525,11 +2525,30 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>conflicting</structfield> <type>bool</type>
-      </para>
-      <para>
-       True if this logical slot conflicted with recovery (and so is now
-       invalidated). Always NULL for physical slots.
+       <structfield>conflict_cause</structfield> <type>text</type>
+      </para>
+      <para>
+       Cause of logical slot's conflict with recovery. It is always NULL
+       for physical slots, as well as for logical slots which are not
+       invalidated. The non-NULL values indicate that the slot is marked
+       as invalidated. Possible values are:
+        <itemizedlist spacing="compact">
+         <listitem>
+          <para>
+           <literal>wal_removed</literal> = required WAL has been removed.
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           <literal>rows_removed</literal> = required rows have been removed.
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           <literal>wal_level_insufficient</literal> = wal_level insufficient on the primary server.
+          </para>
+         </listitem>
+        </itemizedlist>
       </para></entry>
      </row>
     </tbody>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 058fc47c91..9eb10efe09 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.conflicting
+            L.conflict_cause
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 4b694a03d0..8e3d27adc8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -406,10 +406,24 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			nulls[i++] = true;
 		else
 		{
-			if (slot_contents.data.invalidated != RS_INVAL_NONE)
-				values[i++] = BoolGetDatum(true);
-			else
-				values[i++] = BoolGetDatum(false);
+			switch (slot_contents.data.invalidated)
+			{
+				case RS_INVAL_NONE:
+					nulls[i++] = true;
+					break;
+
+				case RS_INVAL_WAL_REMOVED:
+					values[i++] = CStringGetTextDatum("wal_removed");
+					break;
+
+				case RS_INVAL_HORIZON:
+					values[i++] = CStringGetTextDatum("rows_removed");
+					break;
+
+				case RS_INVAL_WAL_LEVEL:
+					values[i++] = CStringGetTextDatum("wal_level_insufficient");
+					break;
+			}
 		}
 
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 4878aa22bf..0e0828c890 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -662,13 +662,13 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
-							"%s as caught_up, conflicting as invalid "
+							"%s as caught_up, conflict_cause IS NOT NULL as invalid "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
 							"temporary IS FALSE;",
 							live_check ? "FALSE" :
-							"(CASE WHEN conflicting THEN FALSE "
+							"(CASE WHEN conflict_cause IS NOT NULL THEN FALSE "
 							"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
 							"END)");
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9052f5262a..e88ab3e0ff 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11115,9 +11115,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}',
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_cause}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 5d7c278d01..3bedac32ac 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -168,27 +168,25 @@ sub change_hot_standby_feedback_and_wait_for_xmins
 	}
 }
 
-# Check conflicting status in pg_replication_slots.
-sub check_slots_conflicting_status
+# Check conflict_cause in pg_replication_slots.
+sub check_slots_conflict_cause
 {
-	my ($conflicting) = @_;
+	my ($slot_prefix, $cause) = @_;
 
-	if ($conflicting)
-	{
-		$res = $node_standby->safe_psql(
-			'postgres', qq(
-				 select bool_and(conflicting) from pg_replication_slots;));
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
 
-		is($res, 't', "Logical slots are reported as conflicting");
-	}
-	else
-	{
-		$res = $node_standby->safe_psql(
-			'postgres', qq(
-				select bool_or(conflicting) from pg_replication_slots;));
+	$res = $node_standby->safe_psql(
+		'postgres', qq(
+			 select conflict_cause from pg_replication_slots where slot_name = '$active_slot';));
 
-		is($res, 'f', "Logical slots are reported as non conflicting");
-	}
+	is($res, "$cause", "$active_slot conflicting cause is $cause");
+
+	$res = $node_standby->safe_psql(
+		'postgres', qq(
+			 select conflict_cause from pg_replication_slots where slot_name = '$inactive_slot';));
+
+	is($res, "$cause", "$inactive_slot conflicting cause is $cause");
 }
 
 # Drop the slots, re-create them, change hot_standby_feedback,
@@ -260,13 +258,13 @@ $node_primary->safe_psql('testdb',
 	qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
 );
 
-# Check conflicting is NULL for physical slot
+# Check conflict_cause is NULL for physical slot
 $res = $node_primary->safe_psql(
 	'postgres', qq[
-		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
+		 SELECT conflict_cause is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
 );
 
-is($res, 't', "Physical slot reports conflicting as NULL");
+is($res, 't', "Physical slot reports conflict_cause as NULL");
 
 my $backup_name = 'b1';
 $node_primary->backup($backup_name);
@@ -483,8 +481,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is 'rows_removed' in pg_replication_slots
+check_slots_conflict_cause('vacuum_full_', 'rows_removed');
 
 $handle =
   make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
@@ -502,8 +500,8 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 ##################################################
 $node_standby->restart;
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is retained across a restart.
+check_slots_conflict_cause('vacuum_full_', 'rows_removed');
 
 ##################################################
 # Verify that invalidated logical slots do not lead to retaining WAL.
@@ -511,7 +509,7 @@ check_slots_conflicting_status(1);
 
 # Get the restart_lsn from an invalidated slot
 my $restart_lsn = $node_standby->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflicting is true;"
+	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflict_cause is not null;"
 );
 
 chomp($restart_lsn);
@@ -565,8 +563,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is 'rows_removed' in pg_replication_slots
+check_slots_conflict_cause('row_removal_', 'rows_removed');
 
 $handle =
   make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
@@ -604,8 +602,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_for_invalidation('shared_row_removal_', $logstart,
 	'with vacuum on pg_authid');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is 'rows_removed' in pg_replication_slots
+check_slots_conflict_cause('shared_row_removal_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 	\$stderr);
@@ -657,7 +655,13 @@ ok( $node_standby->poll_query_until(
 ) or die "Timed out waiting confl_active_logicalslot to be updated";
 
 # Verify slots are reported as non conflicting in pg_replication_slots
-check_slots_conflicting_status(0);
+is( $node_standby->safe_psql(
+		'postgres',
+		q[select bool_or(conflicting) from
+		  (select conflict_cause is not NULL as conflicting
+		   from pg_replication_slots WHERE slot_type = 'logical')]),
+	'f',
+	'Logical slots are reported as non conflicting');
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 0);
@@ -693,8 +697,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is 'rows_removed' in pg_replication_slots
+check_slots_conflict_cause('pruning_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
@@ -737,8 +741,8 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
-# Verify slots are reported as conflicting in pg_replication_slots
-check_slots_conflicting_status(1);
+# Verify conflict_cause is 'wal_level_insufficient' in pg_replication_slots
+check_slots_conflict_cause('wal_level_', 'wal_level_insufficient');
 
 $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f645e8486b..692cdbd781 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,8 +1473,8 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.conflicting
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
+    l.conflict_cause
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_cause)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

