From 98a67a65eee6b7ee1275e48e5053ba8ab3055014 Mon Sep 17 00:00:00 2001
From: Jelte Fennema <jelte.fennema@microsoft.com>
Date: Wed, 12 Jan 2022 09:52:05 +0100
Subject: [PATCH 2/2] Add non-blocking version of PQcancel

This patch does two things:
1. Change PQrequestCancel to use the regular connection establishement,
   to address a few security issues.
2. Add PQrequestCancelStart which is a thread safe and non blocking
   version of this new PQrequestCancel implementation.

The existing PQcancel API is using blocking IO. This makes PQcancel
impossible to use in an event loop based codebase, without blocking the
event loop until the call returns.

This patch adds a new cancellation API to libpq which is called
PQrequestCancelStart. This API can be used to send cancellations in a
non-blocking fashion.

This patch also includes a test for this all of libpq cancellation APIs.
The test can be easily run like this:

    cd src/test/modules/libpq_pipeline
    make && ./libpq_pipeline cancel
---
 contrib/dblink/dblink.c                       |  12 +-
 contrib/postgres_fdw/connection.c             |  11 +-
 doc/src/sgml/libpq.sgml                       | 212 +++++++++++++----
 src/fe_utils/connect_utils.c                  |  10 +-
 src/interfaces/libpq/exports.txt              |   2 +
 src/interfaces/libpq/fe-connect.c             | 225 +++++++++++++++---
 src/interfaces/libpq/fe-misc.c                |  15 +-
 src/interfaces/libpq/fe-secure-openssl.c      |   2 +-
 src/interfaces/libpq/fe-secure.c              |   6 +
 src/interfaces/libpq/libpq-fe.h               |  13 +-
 src/interfaces/libpq/libpq-int.h              |   2 +
 src/test/isolation/isolationtester.c          |  29 +--
 .../modules/libpq_pipeline/libpq_pipeline.c   | 214 ++++++++++++++++-
 13 files changed, 627 insertions(+), 126 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index a06d4bd12d..30cbb22a22 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1380,22 +1380,14 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res;
 	PGconn	   *conn;
-	PGcancel   *cancel;
-	char		errbuf[256];
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
-
-	res = PQcancel(cancel, errbuf, 256);
-	PQfreeCancel(cancel);
-
-	if (res == 1)
+	if (PQrequestCancel(conn))
 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
 	else
-		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
+		PG_RETURN_TEXT_P(cstring_to_text(PQerrorMessage(conn)));
 }
 
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 129ca79221..2e182645f7 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -1263,8 +1263,6 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
 static bool
 pgfdw_cancel_query(PGconn *conn)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
 	PGresult   *result = NULL;
 	TimestampTz endtime;
 	bool		timed_out;
@@ -1279,19 +1277,14 @@ pgfdw_cancel_query(PGconn *conn)
 	 * Issue cancel request.  Unfortunately, there's no good way to limit the
 	 * amount of time that we might block inside PQgetCancel().
 	 */
-	if ((cancel = PQgetCancel(conn)))
-	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		if (!PQrequestCancel(conn))
 		{
 			ereport(WARNING,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
+							pchomp(PQerrorMessage(conn)))));
 			return false;
 		}
-		PQfreeCancel(cancel);
-	}
 
 	/* Get and discard the result of the query. */
 	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0b2a8720f0..6b0683b9b0 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -265,7 +265,7 @@ PGconn *PQsetdb(char *pghost,
     <varlistentry id="libpq-PQconnectStartParams">
      <term><function>PQconnectStartParams</function><indexterm><primary>PQconnectStartParams</primary></indexterm></term>
      <term><function>PQconnectStart</function><indexterm><primary>PQconnectStart</primary></indexterm></term>
-     <term><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
+     <term id="libpq-PQconnectPoll"><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
      <listitem>
       <para>
        <indexterm><primary>nonblocking connection</primary></indexterm>
@@ -499,6 +499,30 @@ switch(PQstatus(conn))
      </listitem>
     </varlistentry>
 
+    <varlistentry id="libpq-PQconnectComplete">
+     <term><function>PQconnectComplete</function><indexterm><primary>PQconnectComplete</primary></indexterm></term>
+     <listitem>
+      <para>
+       Complete the connection attempt on a nonblocking connection and block
+       until it is completed.
+
+<synopsis>
+int PQconnectPoll(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This function can be used instead of
+       <xref linkend="libpq-PQconnectPoll"/>
+       to complete a connection that was initially started in a non blocking
+       manner. However, instead of continuing to complete the connection in a
+       non blocking way, calling this function will block until the connection
+       is completed. This is especially useful to complete connections that were
+       started by <xref linkend="libpq-PQrequestCancelStart"/>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="libpq-PQconndefaults">
      <term><function>PQconndefaults</function><indexterm><primary>PQconndefaults</primary></indexterm></term>
      <listitem>
@@ -660,7 +684,7 @@ void PQreset(PGconn *conn);
 
     <varlistentry id="libpq-PQresetStart">
      <term><function>PQresetStart</function><indexterm><primary>PQresetStart</primary></indexterm></term>
-     <term><function>PQresetPoll</function><indexterm><primary>PQresetPoll</primary></indexterm></term>
+     <term id="libpq-PQresetPoll"><function>PQresetPoll</function><indexterm><primary>PQresetPoll</primary></indexterm></term>
      <listitem>
       <para>
        Reset the communication channel to the server, in a nonblocking manner.
@@ -5617,13 +5641,137 @@ int PQsetSingleRowMode(PGconn *conn);
    this section.
 
    <variablelist>
+    <varlistentry id="libpq-PQrequestCancel">
+     <term><function>PQrequestCancel</function><indexterm><primary>PQrequestCancel</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Requests that the server abandons processing of the current command.
+<synopsis>
+int PQrequestCancel(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This request is made over a connection that uses the same connection
+       options as the the original <structname>PGconn</structname>. So when the
+       original connection is encrypted (using TLS or GSS), the connection for
+       the cancel request connection is encrypted in the same. Any connection
+       options that only make sense for authentication or after authentication
+       are ignored though, because cancellation requests do not require
+       authentication.
+      </para>
+
+      <para>
+       This function operates directly on the <structname>PGconn</structname>
+       object, and in case of failure stores the error message in the
+       <structname>PGconn</structname> object (whence it can be retrieved
+       by <xref linkend="libpq-PQerrorMessage"/>). This behaviour makes this
+       function unsafe to call from within multi-threaded programs or
+       signal handlers, since it is possible that overwriting the
+       <structname>PGconn</structname>'s error message will
+       mess up the operation currently in progress on the connection in another
+       thread.
+      </para>
+
+      <para>
+       The return value is 1 if the cancel request was successfully
+       dispatched and 0 if not. Successful dispatch is no guarantee that the
+       request will have any effect, however. If the cancellation is effective,
+       the current command will terminate early and return an error result. If
+       the cancellation fails (say, because the server was already done
+       processing the command), then there will be no visible result at
+       all.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQrequestCancelStart">
+     <term><function>PQrequestCancelStart</function><indexterm><primary>PQrequestCancelStart</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of
+       <xref linkend="libpq-PQrequestCancel"/>
+       that can be used in thread-safe and/or non-blocking manner.
+<synopsis>
+PGconn *PQrequestCancelStart(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This function returns a new <structname>PGconn</structname>. This
+       connection object can be used to cancel the query that's running on the
+       original connection in a thread-safe way. To do so
+       <xref linkend="libpq-PQrequestCancel"/>
+       must be called while no other thread is using the original PGconn. Then
+       the returned <structname>PGconn</structname>
+       can be used at a later point in any thread to send a cancel request.
+       A cancel request can be sent using the returned PGconn in two ways,
+       non-blocking using <xref linkend="libpq-PQconnectPoll"/>
+       or blocking using  <xref linkend="libpq-PQconnectComplete"/>.
+      </para>
+
+      <para>
+       In addition to all the statuses that a regular
+       <structname>PGconn</structname>
+       can have returned connection can have two additional statuses:
+
+       <variablelist>
+        <varlistentry id="libpq-connection-starting">
+         <term><symbol>CONNECTION_STARTING</symbol></term>
+         <listitem>
+          <para>
+           Waiting for the first call to <xref linkend="libpq-PQconnectPoll"/>,
+           to actually open the socket. This is the connection state right after
+           calling <xref linkend="libpq-PQrequestCancel"/>. No connection to the
+           server has been initiated yet at this point. To start cancel request
+           initiation use <xref linkend="libpq-PQconnectPoll"/>
+           for non-blocking behaviour and <xref linkend="libpq-PQconnectComplete"/>
+           for blocking behaviour.
+          </para>
+         </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-connection-cancel-finished">
+         <term><symbol>CONNECTION_CANCEL_FINISHED</symbol></term>
+         <listitem>
+          <para>
+           Cancel request was successfully sent. It's not possible to continue
+           using the cancellation connection now, so it should be freed using
+           <xref linkend="libpq-PQfinish"/>. It's also possible to reset the
+           cancellation connection instead using
+           <xref linkend="libpq-PQresetStart"/>, that way it can be reused to
+           cancel a future query on the same connection.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+
+      <para>
+       Since this object represents a connection only meant for cancellations it
+       can only be used with a limited subset of the functions that can be used
+       for a regular <structname>PGconn</structname> object. The functions that
+       this object can be passed to are
+       <xref linkend="libpq-PQstatus"/>,
+       <xref linkend="libpq-PQerrorMessage"/>,
+       <xref linkend="libpq-PQconnectComplete"/>,
+       <xref linkend="libpq-PQconnectPoll"/>,
+       <xref linkend="libpq-PQsocket"/>,
+       <xref linkend="libpq-PQresetStart"/>, and
+       <xref linkend="libpq-PQfinish"/>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="libpq-PQgetCancel">
      <term><function>PQgetCancel</function><indexterm><primary>PQgetCancel</primary></indexterm></term>
 
      <listitem>
       <para>
        Creates a data structure containing the information needed to cancel
-       a command issued through a particular database connection.
+       a command using <xref linkend="libpq-PQcancel"/>.
 <synopsis>
 PGcancel *PQgetCancel(PGconn *conn);
 </synopsis>
@@ -5665,7 +5813,9 @@ void PQfreeCancel(PGcancel *cancel);
 
      <listitem>
       <para>
-       Requests that the server abandon processing of the current command.
+       A less secure version of
+       <xref linkend="libpq-PQrequestCancel"/>
+       that can be used safely from within a signal handler.
 <synopsis>
 int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 </synopsis>
@@ -5679,15 +5829,6 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
        recommended size is 256 bytes).
       </para>
 
-      <para>
-       Successful dispatch is no guarantee that the request will have
-       any effect, however.  If the cancellation is effective, the current
-       command will terminate early and return an error result.  If the
-       cancellation fails (say, because the server was already done
-       processing the command), then there will be no visible result at
-       all.
-      </para>
-
       <para>
        <xref linkend="libpq-PQcancel"/> can safely be invoked from a signal
        handler, if the <parameter>errbuf</parameter> is a local variable in the
@@ -5696,33 +5837,24 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
        also be invoked from a thread that is separate from the one
        manipulating the <structname>PGconn</structname> object.
       </para>
-     </listitem>
-    </varlistentry>
-   </variablelist>
-
-   <variablelist>
-    <varlistentry id="libpq-PQrequestCancel">
-     <term><function>PQrequestCancel</function><indexterm><primary>PQrequestCancel</primary></indexterm></term>
-
-     <listitem>
-      <para>
-       <xref linkend="libpq-PQrequestCancel"/> is a deprecated variant of
-       <xref linkend="libpq-PQcancel"/>.
-<synopsis>
-int PQrequestCancel(PGconn *conn);
-</synopsis>
-      </para>
 
       <para>
-       Requests that the server abandon processing of the current
-       command.  It operates directly on the
-       <structname>PGconn</structname> object, and in case of failure stores the
-       error message in the <structname>PGconn</structname> object (whence it can
-       be retrieved by <xref linkend="libpq-PQerrorMessage"/>).  Although
-       the functionality is the same, this approach is not safe within
-       multiple-thread programs or signal handlers, since it is possible
-       that overwriting the <structname>PGconn</structname>'s error message will
-       mess up the operation currently in progress on the connection.
+       To achieve signal-safety, some concessions needed to be made in the
+       implementation of <xref linkend="libpq-PQcancel"/>. Not all connection
+       options of the original connection are used when establishing a
+       connection for the cancellation request. When calling this function a
+       connection is made to the postgres host using the same port. The only
+       connection options that are honored during this connection are
+       <varname>keepalives</varname>,
+       <varname>keepalives_idle</varname>,
+       <varname>keepalives_interval</varname>,
+       <varname>keepalives_count</varname>, and
+       <varname>tcp_user_timeout</varname>.
+       So, for example
+       <varname>connect_timeout</varname>,
+       <varname>gssencmode</varname>, and
+       <varname>sslmode</varname> are ignored. This means the connection
+       is never encrypted using TLS or GSS.
       </para>
      </listitem>
     </varlistentry>
@@ -8835,10 +8967,10 @@ int PQisthreadsafe();
   </para>
 
   <para>
-   The deprecated functions <xref linkend="libpq-PQrequestCancel"/> and
+   The functions <xref linkend="libpq-PQrequestCancel"/> and
    <xref linkend="libpq-PQoidStatus"/> are not thread-safe and should not be
    used in multithread programs.  <xref linkend="libpq-PQrequestCancel"/>
-   can be replaced by <xref linkend="libpq-PQcancel"/>.
+   can be replaced by <xref linkend="libpq-PQrequestCancelStart"/>.
    <xref linkend="libpq-PQoidStatus"/> can be replaced by
    <xref linkend="libpq-PQoidValue"/>.
   </para>
diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c
index a30c66f13a..ff18dab043 100644
--- a/src/fe_utils/connect_utils.c
+++ b/src/fe_utils/connect_utils.c
@@ -162,19 +162,11 @@ connectMaintenanceDatabase(ConnParams *cparams,
 void
 disconnectDatabase(PGconn *conn)
 {
-	char		errbuf[256];
-
 	Assert(conn != NULL);
 
 	if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
 	{
-		PGcancel   *cancel;
-
-		if ((cancel = PQgetCancel(conn)))
-		{
-			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
-			PQfreeCancel(cancel);
-		}
+		(void) PQrequestCancel(conn);
 	}
 
 	PQfinish(conn);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..f7609d0c64 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,5 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQrequestCancelStart      187
+PQconnectComplete         188
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index cf554d389f..5462e1305c 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -378,6 +378,7 @@ static int	connectDBComplete(PGconn *conn);
 static PGPing internal_ping(PGconn *conn);
 static PGconn *makeEmptyPGconn(void);
 static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
+static bool copyPGconn(PGconn *srcConn, PGconn *dstConn);
 static void freePGconn(PGconn *conn);
 static void closePGconn(PGconn *conn);
 static void release_conn_addrinfo(PGconn *conn);
@@ -604,8 +605,11 @@ pqDropServerData(PGconn *conn)
 	if (conn->write_err_msg)
 		free(conn->write_err_msg);
 	conn->write_err_msg = NULL;
-	conn->be_pid = 0;
-	conn->be_key = 0;
+	if (!conn->cancelRequest)
+	{
+		conn->be_pid = 0;
+		conn->be_key = 0;
+	}
 }
 
 
@@ -737,6 +741,58 @@ PQping(const char *conninfo)
 	return ret;
 }
 
+/*
+ *		PQcancelConnectStart
+ *
+ * Asynchronously cancel a request on the given connection. This requires
+ * polling the returned PGconn to actually complete the cancellation of the
+ * request.
+ */
+PGconn *
+PQrequestCancelStart(PGconn *conn)
+{
+	PGconn	   *cancelConn = makeEmptyPGconn();
+
+	if (cancelConn == NULL)
+		return NULL;
+
+	/* Check we have an open connection */
+	if (!conn)
+	{
+		appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection was NULL\n"));
+		return cancelConn;
+	}
+
+	if (conn->sock == PGINVALID_SOCKET)
+	{
+		appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection is not open\n"));
+		return cancelConn;
+	}
+
+	/*
+	 * Indicate that this connection is used to send a cancellation
+	 */
+	cancelConn->cancelRequest = true;
+
+	if (!copyPGconn(conn, cancelConn))
+		return (PGconn *) cancelConn;
+
+	/*
+	 * Copy over information needed to cancel
+	 */
+	cancelConn->be_pid = conn->be_pid;
+	cancelConn->be_key = conn->be_key;
+
+	/*
+	 * Compute derived options
+	 */
+	if (!connectOptions2(cancelConn))
+		return cancelConn;
+
+	cancelConn->status = CONNECTION_STARTING;
+	return cancelConn;
+}
+
 /*
  *		PQconnectStartParams
  *
@@ -914,6 +970,46 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions)
 	return true;
 }
 
+/*
+ * Copy over option values from srcConn to dstConn
+ *
+ * Don't put anything cute here --- intelligence should be in
+ * connectOptions2 ...
+ *
+ * Returns true on success. On failure, returns false and sets error message of
+ * dstConn.
+ */
+static bool
+copyPGconn(PGconn *srcConn, PGconn *dstConn)
+{
+	const internalPQconninfoOption *option;
+
+	/* copy over connection options */
+	for (option = PQconninfoOptions; option->keyword; option++)
+	{
+		if (option->connofs >= 0)
+		{
+			const char **tmp = (const char **) ((char *) srcConn + option->connofs);
+
+			if (*tmp)
+			{
+				char	  **dstConnmember = (char **) ((char *) dstConn + option->connofs);
+
+				if (*dstConnmember)
+					free(*dstConnmember);
+				*dstConnmember = strdup(*tmp);
+				if (*dstConnmember == NULL)
+				{
+					appendPQExpBufferStr(&dstConn->errorMessage,
+										 libpq_gettext("out of memory\n"));
+					return false;
+				}
+			}
+		}
+	}
+	return true;
+}
+
 /*
  *		connectOptions1
  *
@@ -2134,6 +2230,15 @@ connectDBComplete(PGconn *conn)
 	if (conn == NULL || conn->status == CONNECTION_BAD)
 		return 0;
 
+	if (conn->status == CONNECTION_STARTING)
+	{
+		if (!connectDBStart(conn))
+		{
+			conn->status = CONNECTION_BAD;
+			return 0;
+		}
+	}
+
 	/*
 	 * Set up a time limit, if connect_timeout isn't zero.
 	 */
@@ -2274,13 +2379,15 @@ PQconnectPoll(PGconn *conn)
 	switch (conn->status)
 	{
 			/*
-			 * We really shouldn't have been polled in these two cases, but we
-			 * can handle it.
+			 * We really shouldn't have been polled in these three cases, but
+			 * we can handle it.
 			 */
 		case CONNECTION_BAD:
 			return PGRES_POLLING_FAILED;
 		case CONNECTION_OK:
 			return PGRES_POLLING_OK;
+		case CONNECTION_CANCEL_FINISHED:
+			return PGRES_POLLING_OK;
 
 			/* These are reading states */
 		case CONNECTION_AWAITING_RESPONSE:
@@ -2292,6 +2399,17 @@ PQconnectPoll(PGconn *conn)
 				/* Load waiting data */
 				int			n = pqReadData(conn);
 
+				if (n == -2 && conn->cancelRequest)
+				{
+					/*
+					 * This is the expected end state for cancel connections.
+					 * They are closed once the cancel is processed by the
+					 * server.
+					 */
+					conn->status = CONNECTION_CANCEL_FINISHED;
+					resetPQExpBuffer(&conn->errorMessage);
+					return PGRES_POLLING_OK;
+				}
 				if (n < 0)
 					goto error_return;
 				if (n == 0)
@@ -2301,6 +2419,7 @@ PQconnectPoll(PGconn *conn)
 			}
 
 			/* These are writing states, so we just proceed. */
+		case CONNECTION_STARTING:
 		case CONNECTION_STARTED:
 		case CONNECTION_MADE:
 			break;
@@ -2758,6 +2877,16 @@ keep_going:						/* We will come back to here until there is
 				}
 			}
 
+		case CONNECTION_STARTING:
+			{
+				if (!connectDBStart(conn))
+				{
+					goto error_return;
+				}
+				conn->status = CONNECTION_STARTED;
+				return PGRES_POLLING_WRITING;
+			}
+
 		case CONNECTION_STARTED:
 			{
 				socklen_t	optlen = sizeof(optval);
@@ -2966,6 +3095,25 @@ keep_going:						/* We will come back to here until there is
 				}
 #endif							/* USE_SSL */
 
+				if (conn->cancelRequest)
+				{
+					CancelRequestPacket cancelpacket;
+
+					packetlen = sizeof(cancelpacket);
+					cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
+					cancelpacket.backendPID = pg_hton32(conn->be_pid);
+					cancelpacket.cancelAuthCode = pg_hton32(conn->be_key);
+					if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK)
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext("could not send cancel packet: %s\n"),
+										  SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf)));
+						goto error_return;
+					}
+					conn->status = CONNECTION_AWAITING_RESPONSE;
+					return PGRES_POLLING_READING;
+				}
+
 				/*
 				 * Build the startup packet.
 				 */
@@ -4194,6 +4342,11 @@ release_conn_addrinfo(PGconn *conn)
 static void
 sendTerminateConn(PGconn *conn)
 {
+	if (conn->cancelRequest)
+	{
+		return;
+	}
+
 	/*
 	 * Note that the protocol doesn't allow us to send Terminate messages
 	 * during the startup phase.
@@ -4311,6 +4464,12 @@ PQresetStart(PGconn *conn)
 	{
 		closePGconn(conn);
 
+		if (conn->cancelRequest)
+		{
+			conn->status = CONNECTION_STARTING;
+			return 1;
+		}
+
 		return connectDBStart(conn);
 	}
 
@@ -4663,6 +4822,22 @@ cancel_errReturn:
 	return false;
 }
 
+/*
+ * PQconnectComplete: takes a non blocking cancel connection and completes it
+ * in a blocking manner.
+ *
+ * Returns 1 if able to connect successfully and 0 if not.
+ *
+ * This can useful if you only care about the thread safety of
+ * PQrequestCancelStart and not about its non blocking functionality.
+ */
+int
+PQconnectComplete(PGconn *cancelConn)
+{
+	connectDBComplete(cancelConn);
+	return cancelConn->status != CONNECTION_BAD;
+}
+
 
 /*
  * PQrequestCancel: old, not thread-safe function for requesting query cancel
@@ -4679,45 +4854,31 @@ cancel_errReturn:
 int
 PQrequestCancel(PGconn *conn)
 {
-	int			r;
-	PGcancel   *cancel;
+	PGconn	   *cancelConn = NULL;
 
-	/* Check we have an open connection */
-	if (!conn)
-		return false;
-
-	if (conn->sock == PGINVALID_SOCKET)
+	cancelConn = PQrequestCancelStart(conn);
+	if (!cancelConn)
 	{
-		strlcpy(conn->errorMessage.data,
-				"PQrequestCancel() -- connection is not open\n",
-				conn->errorMessage.maxlen);
-		conn->errorMessage.len = strlen(conn->errorMessage.data);
-		conn->errorReported = 0;
-
+		appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("out of memory\n"));
 		return false;
 	}
 
-	cancel = PQgetCancel(conn);
-	if (cancel)
-	{
-		r = PQcancel(cancel, conn->errorMessage.data,
-					 conn->errorMessage.maxlen);
-		PQfreeCancel(cancel);
-	}
-	else
+	if (cancelConn->status == CONNECTION_BAD)
 	{
-		strlcpy(conn->errorMessage.data, "out of memory",
-				conn->errorMessage.maxlen);
-		r = false;
+		appendPQExpBufferStr(&conn->errorMessage, PQerrorMessage(cancelConn));
+		freePGconn(cancelConn);
+		return false;
 	}
 
-	if (!r)
+	if (!PQconnectComplete(cancelConn))
 	{
-		conn->errorMessage.len = strlen(conn->errorMessage.data);
-		conn->errorReported = 0;
+		appendPQExpBufferStr(&conn->errorMessage, PQerrorMessage(cancelConn));
+		freePGconn(cancelConn);
+		return false;
 	}
 
-	return r;
+	freePGconn(cancelConn);
+	return true;
 }
 
 
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index d76bb3957a..a944cb2c12 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -558,8 +558,11 @@ pqPutMsgEnd(PGconn *conn)
  * Possible return values:
  *	 1: successfully loaded at least one more byte
  *	 0: no data is presently available, but no error detected
- *	-1: error detected (including EOF = connection closure);
+ *	-1: error detected (excluding EOF = connection closure);
  *		conn->errorMessage set
+ *	-2: EOF detected, connection is closed
+ *		conn->errorMessage set
+ *
  * NOTE: callers must not assume that pointers or indexes into conn->inBuffer
  * remain valid across this call!
  * ----------
@@ -642,7 +645,7 @@ retry3:
 
 			default:
 				/* pqsecure_read set the error message for us */
-				return -1;
+				return nread;
 		}
 	}
 	if (nread > 0)
@@ -737,7 +740,7 @@ retry4:
 
 			default:
 				/* pqsecure_read set the error message for us */
-				return -1;
+				return nread;
 		}
 	}
 	if (nread > 0)
@@ -755,13 +758,17 @@ definitelyEOF:
 						 libpq_gettext("server closed the connection unexpectedly\n"
 									   "\tThis probably means the server terminated abnormally\n"
 									   "\tbefore or while processing the request.\n"));
+	/* Do *not* drop any already-read data; caller still wants it */
+	pqDropConnection(conn, false);
+	conn->status = CONNECTION_BAD;	/* No more connection to backend */
+	return -2;
 
 	/* Come here if lower-level code already set a suitable errorMessage */
 definitelyFailed:
 	/* Do *not* drop any already-read data; caller still wants it */
 	pqDropConnection(conn, false);
 	conn->status = CONNECTION_BAD;	/* No more connection to backend */
-	return -1;
+	return nread < 0 ? nread : -1;
 }
 
 /*
diff --git a/src/interfaces/libpq/fe-secure-openssl.c b/src/interfaces/libpq/fe-secure-openssl.c
index d3bf57b850..4ffaea63c1 100644
--- a/src/interfaces/libpq/fe-secure-openssl.c
+++ b/src/interfaces/libpq/fe-secure-openssl.c
@@ -252,7 +252,7 @@ rloop:
 			appendPQExpBufferStr(&conn->errorMessage,
 								 libpq_gettext("SSL connection has been closed unexpectedly\n"));
 			result_errno = ECONNRESET;
-			n = -1;
+			n = -2;
 			break;
 		default:
 			appendPQExpBuffer(&conn->errorMessage,
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index a1dc7b796d..9771805dd3 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -201,6 +201,12 @@ pqsecure_close(PGconn *conn)
  * On failure, this function is responsible for appending a suitable message
  * to conn->errorMessage.  The caller must still inspect errno, but only
  * to determine whether to continue/retry after error.
+ *
+ * Returns -1 in case of failures, except in the case of where a failure means
+ * that there was a clean connection closure, in those cases -2 is return.
+ * Currently only the TLS implementation of pqsecure_read ever returns -2. For
+ * the other implementations a clean connection closure is detected in
+ * pqReadData instead.
  */
 ssize_t
 pqsecure_read(PGconn *conn, void *ptr, size_t len)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 7986445f1a..42367d4886 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -59,12 +59,15 @@ typedef enum
 {
 	CONNECTION_OK,
 	CONNECTION_BAD,
+	CONNECTION_CANCEL_FINISHED,
 	/* Non-blocking mode only below here */
 
 	/*
 	 * The existence of these should never be relied upon - they should only
 	 * be used for user feedback or similar purposes.
 	 */
+	CONNECTION_STARTING,		/* Waiting for connection attempt to be
+								 * started.  */
 	CONNECTION_STARTED,			/* Waiting for connection to be made.  */
 	CONNECTION_MADE,			/* Connection OK; waiting to send.     */
 	CONNECTION_AWAITING_RESPONSE,	/* Waiting for a response from the
@@ -165,6 +168,10 @@ typedef enum
  */
 typedef struct pg_conn PGconn;
 
+/* PGcancelConn encapsulates a cancel connection to the backend.
+ * The contents of this struct are not supposed to be known to applications.
+ */
+
 /* PGresult encapsulates the result of a query (or more precisely, of a single
  * SQL command --- a query string given to PQsendQuery can contain multiple
  * commands and thus return multiple PGresult objects).
@@ -282,6 +289,7 @@ extern PGconn *PQconnectStart(const char *conninfo);
 extern PGconn *PQconnectStartParams(const char *const *keywords,
 									const char *const *values, int expand_dbname);
 extern PostgresPollingStatusType PQconnectPoll(PGconn *conn);
+extern int	PQconnectComplete(PGconn *conn);
 
 /* Synchronous (blocking) */
 extern PGconn *PQconnectdb(const char *conninfo);
@@ -330,9 +338,12 @@ extern void PQfreeCancel(PGcancel *cancel);
 /* issue a cancel request */
 extern int	PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 
-/* backwards compatible version of PQcancel; not thread-safe */
+/* more secure version of PQcancel */
 extern int	PQrequestCancel(PGconn *conn);
 
+/* non blocking and thread safe version of PQrequestCancel */
+extern PGconn *PQrequestCancelStart(PGconn *conn);
+
 /* Accessor functions for PGconn objects */
 extern char *PQdb(const PGconn *conn);
 extern char *PQuser(const PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index e0cee4b142..ff9555e263 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -394,6 +394,8 @@ struct pg_conn
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 	char	   *target_session_attrs;	/* desired session properties */
 
+	bool		cancelRequest;
+
 	/* Optional file to write trace info to */
 	FILE	   *Pfdebug;
 	int			traceFlags;
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 12179f2514..fe1ca168c8 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -948,26 +948,17 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
 			 */
 			if (td > max_step_wait && !canceled)
 			{
-				PGcancel   *cancel = PQgetCancel(conn);
-
-				if (cancel != NULL)
-				{
-					char		buf[256];
-
-					if (PQcancel(cancel, buf, sizeof(buf)))
-					{
-						/*
-						 * print to stdout not stderr, as this should appear
-						 * in the test case's results
-						 */
-						printf("isolationtester: canceling step %s after %d seconds\n",
-							   step->name, (int) (td / USECS_PER_SEC));
-						canceled = true;
-					}
-					else
-						fprintf(stderr, "PQcancel failed: %s\n", buf);
-					PQfreeCancel(cancel);
+				if (PQrequestCancel(conn)) {
+					/*
+					 * print to stdout not stderr, as this should appear
+					 * in the test case's results
+					 */
+					printf("isolationtester: canceling step %s after %d seconds\n",
+						   step->name, (int) (td / USECS_PER_SEC));
+					canceled = true;
 				}
+				else
+					fprintf(stderr, "PQcancel failed: %s\n", PQerrorMessage(conn));
 			}
 
 			/*
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 0ff563f59a..95f1d5eb2f 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -86,6 +86,215 @@ pg_fatal_impl(int line, const char *fmt,...)
 	exit(1);
 }
 
+static void
+confirm_query_cancelled(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("query did not fail when it was expected");
+	if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
+		pg_fatal("query failed with a different error than cancellation: %s", PQerrorMessage(conn));
+	PQclear(res);
+	while (PQisBusy(conn))
+	{
+		PQconsumeInput(conn);
+	}
+}
+
+static void
+test_cancel(PGconn *conn)
+{
+	PGcancel   *cancel = NULL;
+	PGconn	   *cancelConn = NULL;
+	char		errorbuf[256];
+
+	fprintf(stderr, "test cancellations... ");
+
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+	/* test PQcancel */
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	cancel = PQgetCancel(conn);
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_cancelled(conn);
+
+	/* PGcancel object can be reused for the next query */
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_cancelled(conn);
+
+	PQfreeCancel(cancel);
+
+	/* test PQrequestCancel */
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (!PQrequestCancel(conn))
+		pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
+	confirm_query_cancelled(conn);
+
+	/* test PQrequestCancelStart and then polling with PQcancelConnectPoll */
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	cancelConn = PQrequestCancelStart(conn);
+	if (PQstatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQconnectPoll(cancelConn);
+		int			sock = PQsocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQerrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQstatus(cancelConn) != CONNECTION_CANCEL_FINISHED)
+		pg_fatal("unexpected cancel connection status: %s", PQerrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+
+	/*
+	 * test PQresetStart works on the cancel connection and it can be reused
+	 * after
+	 */
+	if (!PQresetStart(cancelConn))
+	{
+		pg_fatal("cancel connection reset failed: %s", PQerrorMessage(cancelConn));
+	}
+
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQstatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQresetPoll(cancelConn);
+		int			sock = PQsocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQerrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQstatus(cancelConn) != CONNECTION_CANCEL_FINISHED)
+		pg_fatal("unexpected cancel connection status: %s", PQerrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+
+	PQfinish(cancelConn);
+
+	/* test PQconnectComplete */
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	cancelConn = PQrequestCancelStart(conn);
+	if (PQstatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+	if (!PQconnectComplete(cancelConn))
+		pg_fatal("failed to send cancel: %s", PQerrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+
+	/* test PQconnectComplete with reset connection */
+	if (!PQresetStart(cancelConn))
+	{
+		pg_fatal("cancel connection reset failed: %s", PQerrorMessage(cancelConn));
+	}
+
+	if (PQsendQuery(conn, "SELECT pg_sleep(3)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQstatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn));
+	if (!PQconnectComplete(cancelConn))
+		pg_fatal("failed to send cancel: %s", PQerrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+	PQfinish(cancelConn);
+
+	fprintf(stderr, "ok\n");
+}
+
 static void
 test_disallowed_in_pipeline(PGconn *conn)
 {
@@ -1545,6 +1754,7 @@ usage(const char *progname)
 static void
 print_test_list(void)
 {
+	printf("cancel\n");
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
 	printf("nosync\n");
@@ -1642,7 +1852,9 @@ main(int argc, char **argv)
 						PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
 	}
 
-	if (strcmp(testname, "disallowed_in_pipeline") == 0)
+	if (strcmp(testname, "cancel") == 0)
+		test_cancel(conn);
+	else if (strcmp(testname, "disallowed_in_pipeline") == 0)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
-- 
2.17.1

