From 60cd57abf3a161ccbe5c0e6cdd506fd0db965f04 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 27 Jan 2023 03:17:30 +0000
Subject: [PATCH v41 1/2] postgres_fdw: Allow postgres_fdw_get_connections() to
 check connection

If requested, postgres_fdw_get_connections() can verify the status of
connections that are establieshed by postgres_fdw. This check wil be
done by polling the socket. This feature is currently available only
on systems that support the non-standard POLLRDHUP extension to the
poll system call, including Linux.

"closed" column is set to false if existing connection is not closed
by the remote peer.
---
 contrib/postgres_fdw/Makefile                 |   2 +-
 contrib/postgres_fdw/connection.c             | 151 +++++++++++++++++-
 .../postgres_fdw/expected/postgres_fdw.out    |  65 +++++++-
 contrib/postgres_fdw/meson.build              |   1 +
 .../postgres_fdw/postgres_fdw--1.1--1.2.sql   |  22 +++
 contrib/postgres_fdw/postgres_fdw.control     |   2 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  57 +++++++
 doc/src/sgml/postgres-fdw.sgml                |  71 ++++++--
 8 files changed, 344 insertions(+), 27 deletions(-)
 create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index c1b0cad453..6d23768389 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
 
 REGRESS = postgres_fdw
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 33e8054f64..edc2f5eb46 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -12,6 +12,10 @@
  */
 #include "postgres.h"
 
+#if HAVE_POLL_H
+#include <poll.h>
+#endif
+
 #include "access/htup_details.h"
 #include "access/xact.h"
 #include "catalog/pg_user_mapping.h"
@@ -107,12 +111,20 @@ static uint32 pgfdw_we_get_result = 0;
 					 (entry)->xact_depth, (entry)->xact_depth); \
 	} while(0)
 
+enum pgfdwVersion
+{
+	PGFDW_V1_0 = 0,
+	PGFDW_V1_2,
+}			pgfdwVersion;
+
 /*
  * SQL functions
  */
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_can_verify_connection);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -159,6 +171,12 @@ static void pgfdw_security_check(const char **keywords, const char **values,
 								 UserMapping *user, PGconn *conn);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
+static Datum postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
+												   enum pgfdwVersion api_version);
+
+/* Low layer-like functions. They are used for verifying connections. */
+static int	pgfdw_conn_check(PGconn *conn);
+static bool pgfdw_conn_checkable(void);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -1978,22 +1996,31 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
 }
 
 /*
- * List active foreign server connections.
+ * Internal function for postgres_fdw_get_connections variants.
  *
- * This function takes no input parameter and returns setof record made of
- * following values:
+ * If the api_version is 1.0, this function takes no input parameter and
+ * returns setof record made of following values:
  * - server_name - server name of active connection. In case the foreign server
  *   is dropped but still the connection is active, then the server name will
  *   be NULL in output.
  * - valid - true/false representing whether the connection is valid or not.
  * 	 Note that the connections can get invalidated in pgfdw_inval_callback.
  *
+ * If the version is 1.2 and later, this function takes an input parameter,
+ * which indicates the need for a health check. Regarding the returned record,
+ * this returns two additional values:
+ * - used_in_xact - indicates whether the server has been used in a transaction
+ *   or not.
+ * - closed - true if the local session seems to be disconnected from other
+ *   servers.
+ *
  * No records are returned when there are no cached connections at all.
  */
-Datum
-postgres_fdw_get_connections(PG_FUNCTION_ARGS)
+static Datum
+postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
+									  enum pgfdwVersion api_version)
 {
-#define POSTGRES_FDW_GET_CONNECTIONS_COLS	2
+#define POSTGRES_FDW_GET_CONNECTIONS_COLS	4
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	HASH_SEQ_STATUS scan;
 	ConnCacheEntry *entry;
@@ -2061,12 +2088,55 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
 
 		values[1] = BoolGetDatum(!entry->invalidated);
 
+		/* Add additional attributes if the version is 1.2 or later */
+		if (api_version == PGFDW_V1_2)
+		{
+			bool		require_verify = PG_GETARG_BOOL(1);
+
+			/* Has this server been used in the transaction? */
+			values[2] = BoolGetDatum(entry->xact_depth > 0);
+
+			/*
+			 * If requested and the connection is not invalidated, check the
+			 * status of the remote connection from the backend process and
+			 * return the result. Otherwise returns NULL.
+			 */
+			if (require_verify && !entry->invalidated && entry->conn)
+			{
+				values[3] = BoolGetDatum(pgfdw_conn_checkable() ?
+										 pgfdw_conn_check(entry->conn) != 0 :
+										 false);
+			}
+			else
+				nulls[3] = true;
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 	}
 
 	PG_RETURN_VOID();
 }
 
+/*
+ * List active foreign server connections.
+ *
+ * The SQL API of this function has changed in version, which could verify the
+ * status of remote connections. The actual implementation was moved to the
+ * internal function, and we could switch by the api_version to support the old
+ * SQL declaration.
+ */
+Datum
+postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
+{
+	return postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
+}
+
+Datum
+postgres_fdw_get_connections(PG_FUNCTION_ARGS)
+{
+	return postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_0);
+}
+
 /*
  * Disconnect the specified cached connections.
  *
@@ -2192,3 +2262,72 @@ disconnect_cached_connections(Oid serverid)
 
 	return result;
 }
+
+/*
+ * Check whether functions for verifying cached connections work well or not
+ */
+Datum
+postgres_fdw_can_verify_connection(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_BOOL(pgfdw_conn_checkable());
+}
+
+/*
+ * Check whether the socket peer closed the connection or not.
+ *
+ * Returns >0 if input connection is bad or remote peer seems to be closed,
+ * 0 if it is valid, and -1 if an error occurred.
+ */
+static int
+pgfdw_conn_check(PGconn *conn)
+{
+	int			sock = PQsocket(conn);
+
+	if (!pgfdw_conn_checkable())
+		return 0;
+
+	if (!conn || PQstatus(conn) != CONNECTION_OK || sock == PGINVALID_SOCKET)
+		return -1;
+
+#if (defined(HAVE_POLL) && defined(POLLRDHUP))
+	{
+		/*
+		 * This platform seems to have poll(2), and can wait POLLRDHUP event.
+		 * So construct pollfd and directly call it.
+		 */
+		struct pollfd input_fd;
+		int			result;
+
+		input_fd.fd = sock;
+		input_fd.events = POLLRDHUP;
+		input_fd.revents = 0;
+
+		do
+			result = poll(&input_fd, 1, 0);
+		while (result < 0 && errno == EINTR);
+
+		if (result < 0)
+			return -1;
+
+		return input_fd.revents;
+	}
+#else
+	/* Do not support socket checking on this platform, return 0 */
+	return 0;
+#endif
+}
+
+/*
+ * Check whether pgfdw_conn_check() can work on this platform.
+ *
+ * Returns true if this can use pgfdw_conn_check(), otherwise false.
+ */
+static bool
+pgfdw_conn_checkable(void)
+{
+#if (defined(HAVE_POLL) && defined(POLLRDHUP))
+	return true;
+#else
+	return false;
+#endif
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 0cc77190dc..4fcf73070c 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -10435,10 +10435,10 @@ drop cascades to foreign table ft7
 -- should be output as invalid connections. Also the server name for
 -- loopback3 should be NULL because the server was dropped.
 SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
- server_name | valid 
--------------+-------
- loopback    | f
-             | f
+ server_name | valid | used_in_xact | closed 
+-------------+-------+--------------+--------
+ loopback    | f     | t            | 
+             | f     | t            | 
 (2 rows)
 
 -- The invalid connections get closed in pgfdw_xact_callback during commit.
@@ -12257,3 +12257,60 @@ ANALYZE analyze_table;
 -- cleanup
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
+-- ===================================================================
+-- test for postgres_fdw_get_connections function with require_verify = true
+-- ===================================================================
+-- Disable debug_discard_caches in order to manage remote connections
+SET debug_discard_caches TO '0';
+-- -- The text of the error might vary across platforms, so only show SQLSTATE.
+\set VERBOSITY sqlstate
+-- Disconnect once and set application_name to an arbitrary value
+SELECT 1 FROM postgres_fdw_disconnect_all();
+ ?column? 
+----------
+        1
+(1 row)
+
+ALTER SERVER loopback OPTIONS (SET application_name 'healthcheck');
+-- Define procedure for testing verify functions
+CREATE PROCEDURE test_verify_function() AS $$
+DECLARE
+	can_verify boolean;
+	result boolean;
+BEGIN
+	PERFORM 1 FROM ft1 LIMIT 1;
+
+	-- Terminate the remote backend process
+	PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+		WHERE application_name = 'healthcheck';
+
+	-- Check whether we can do health check on this platform
+	SELECT INTO can_verify postgres_fdw_can_verify_connection();
+
+	-- If the checking can be done on this platform, call it
+	IF can_verify IS TRUE THEN
+		-- Set client_min_messages to ERROR temporary because the following
+		-- function only throws a WARNING on the supported platform.
+		SET LOCAL client_min_messages TO ERROR;
+
+		SELECT closed INTO result FROM postgres_fdw_get_connections(true);
+
+		RESET client_min_messages;
+	ELSE
+		result = true;
+	END IF;
+
+	-- If result is TRUE, we succeeded to detect the disconnection or it could
+	-- not be done on this platform. Raise an message.
+	IF result IS TRUE THEN
+		RAISE INFO 'postgres_fdw_get_connections could detect the disconnection, or health check cannot be used on this platform';
+	END IF;
+END;
+$$ LANGUAGE plpgsql;
+-- ..And call above function
+CALL test_verify_function();
+INFO:  00000
+ERROR:  08006
+-- Clean up
+\set VERBOSITY default
+RESET debug_discard_caches;
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 2b86d8a6ee..5f60d8ef97 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -26,6 +26,7 @@ install_data(
   'postgres_fdw.control',
   'postgres_fdw--1.0.sql',
   'postgres_fdw--1.0--1.1.sql',
+  'postgres_fdw--1.1--1.2.sql',
   kwargs: contrib_data_args,
 )
 
diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
new file mode 100644
index 0000000000..6af10110ca
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
@@ -0,0 +1,22 @@
+/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit
+
+/* First we have to remove it from the extension */
+ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_get_connections ();
+
+/* Then we can drop it */
+DROP FUNCTION postgres_fdw_get_connections ();
+
+CREATE FUNCTION postgres_fdw_get_connections
+    (IN require_verify boolean DEFAULT false, OUT server_name text,
+     OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_can_verify_connection ()
+RETURNS bool
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control
index d489382064..a4b800be4f 100644
--- a/contrib/postgres_fdw/postgres_fdw.control
+++ b/contrib/postgres_fdw/postgres_fdw.control
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.1'
+default_version = '1.2'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index b57f8cfda6..fa9c4dd36f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -4218,3 +4218,60 @@ ANALYZE analyze_table;
 -- cleanup
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
+
+-- ===================================================================
+-- test for postgres_fdw_get_connections function with require_verify = true
+-- ===================================================================
+
+-- Disable debug_discard_caches in order to manage remote connections
+SET debug_discard_caches TO '0';
+
+-- -- The text of the error might vary across platforms, so only show SQLSTATE.
+\set VERBOSITY sqlstate
+
+-- Disconnect once and set application_name to an arbitrary value
+SELECT 1 FROM postgres_fdw_disconnect_all();
+ALTER SERVER loopback OPTIONS (SET application_name 'healthcheck');
+
+-- Define procedure for testing verify functions
+CREATE PROCEDURE test_verify_function() AS $$
+DECLARE
+	can_verify boolean;
+	result boolean;
+BEGIN
+	PERFORM 1 FROM ft1 LIMIT 1;
+
+	-- Terminate the remote backend process
+	PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+		WHERE application_name = 'healthcheck';
+
+	-- Check whether we can do health check on this platform
+	SELECT INTO can_verify postgres_fdw_can_verify_connection();
+
+	-- If the checking can be done on this platform, call it
+	IF can_verify IS TRUE THEN
+		-- Set client_min_messages to ERROR temporary because the following
+		-- function only throws a WARNING on the supported platform.
+		SET LOCAL client_min_messages TO ERROR;
+
+		SELECT closed INTO result FROM postgres_fdw_get_connections(true);
+
+		RESET client_min_messages;
+	ELSE
+		result = true;
+	END IF;
+
+	-- If result is TRUE, we succeeded to detect the disconnection or it could
+	-- not be done on this platform. Raise an message.
+	IF result IS TRUE THEN
+		RAISE INFO 'postgres_fdw_get_connections could detect the disconnection, or health check cannot be used on this platform';
+	END IF;
+END;
+$$ LANGUAGE plpgsql;
+
+-- ..And call above function
+CALL test_verify_function();
+
+-- Clean up
+\set VERBOSITY default
+RESET debug_discard_caches;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 1a600382e2..fa2b50450d 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -777,27 +777,48 @@ OPTIONS (ADD password_required 'false');
 
   <variablelist>
    <varlistentry>
-    <term><function>postgres_fdw_get_connections(OUT server_name text, OUT valid boolean) returns setof record</function></term>
+    <term><function>postgres_fdw_get_connections(IN require_verify boolean DEFAULT false,
+     OUT server_name text, OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean)
+     returns setof record</function></term>
     <listitem>
      <para>
       This function returns the foreign server names of all the open
       connections that <filename>postgres_fdw</filename> established from
-      the local session to the foreign servers. It also returns whether
-      each connection is valid or not. <literal>false</literal> is returned
-      if the foreign server connection is used in the current local
-      transaction but its foreign server or user mapping is changed or
-      dropped (Note that server name of an invalid connection will be
-      <literal>NULL</literal> if the server is dropped),
-      and then such invalid connection will be closed at
-      the end of that transaction. <literal>true</literal> is returned
-      otherwise. If there are no open connections, no record is returned.
+      the local session to the foreign servers. It also returns whether each
+      connection is valid and has been used within a transaction.
+      <literal>valid</literal> is returned as <literal>false</literal> if the
+      foreign server connection is used in the current local transaction but
+      its foreign server or user mapping is changed or dropped (Note that
+      server name of an invalid connection will be <literal>NULL</literal> if
+      the server is dropped), and then such invalid connection will be closed
+      at the end of that transaction. The attribute is set to <literal>true</literal>
+      otherwise.
+     </para>
+     <para>
+      If <literal>require_verify</literal> is set to <literal>true</literal>,
+      the function checks the status of remote connections from the local
+      session to the foreign server. This check is performed by polling the
+      socket, which allows long-running transactions to be aborted sooner if
+      the kernel reports that the connection is closed. This feature is
+      currently available only on systems that support the non-standard
+      <symbol>POLLRDHUP</symbol> extension to the <symbol>poll</symbol> system
+      call, including Linux. <literal>closed</literal> attribute indicates the
+      result of this check. <literal>false</literal> means the existing
+      connection is not closed by the remote peer. <literal>true</literal>
+      means if either checked connection has been closed.
+      <literal>NULL</literal> means a valid connection to the specified foreign
+      server is not established, this function is not available on this
+      platform, or <literal>require_verify</literal> is <literal>false</literal>.
+     </para>
+     <para>
+      If there are no open connections, no record is returned.
       Example usage of the function:
 <screen>
-postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
- server_name | valid
--------------+-------
- loopback1   | t
- loopback2   | f
+postgres=*# SELECT * FROM postgres_fdw_get_connections(true) ORDER BY 1;
+ server_name | valid | used_in_xact | closed 
+-------------+-------+--------------+--------
+ loopback1   | t     | t            | f
+ loopback2   | t     | t            | f
 </screen>
      </para>
     </listitem>
@@ -847,6 +868,26 @@ postgres=# SELECT postgres_fdw_disconnect_all();
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_can_verify_connection() returns boolean</function></term>
+    <listitem>
+     <para>
+      This function checks whether <function>postgres_fdw_get_connections</function>
+      can verify the status of remote connections or not. This returns
+      <literal>true</literal> if it can be used, otherwise returns <literal>false</literal>.
+      Example usage of the function:
+
+<screen>
+postgres=# SELECT postgres_fdw_can_verify_connection();
+ postgres_fdw_can_verify_connection
+------------------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+
    </variablelist>
 
 </sect2>
-- 
2.43.0

