From ef94fceb7be1217f8f7c0d667a8b0945d5421535 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 20 Nov 2023 11:20:52 +0300
Subject: [PATCH v4] Flush large data immediately in pqcomm

If the data is larger than send buffer size in pqcomm, we're sure that
the send buffer will be flushed at least once to fit the data depending
on how large the data is.

Instead of repeatedly calling memcpy and then flushing data larger than
the available space in the send buffer, this patch changes
internal_putbytes() logic to flush large data immediately if the buffer
is empty without unnecessarily copying it into the pqcomm send buffer.
---
 src/backend/libpq/pqcomm.c | 65 ++++++++++++++++++++++++++++----------
 1 file changed, 49 insertions(+), 16 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 6497100a1a..1bc31f3cb4 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -120,8 +120,8 @@ static List *sock_paths = NIL;
 
 static char *PqSendBuffer;
 static int	PqSendBufferSize;	/* Size send buffer */
-static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
-static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
+static size_t PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+static size_t PqSendStart;		/* Next index to send a byte in PqSendBuffer */
 
 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
@@ -144,7 +144,8 @@ static bool socket_is_send_pending(void);
 static int	socket_putmessage(char msgtype, const char *s, size_t len);
 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
 static int	internal_putbytes(const char *s, size_t len);
-static int	internal_flush(void);
+static inline int internal_flush(void);
+static int	internal_flush_buffer(const char *s, size_t *start, size_t *end);
 
 static int	Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
 static int	Setup_AF_UNIX(const char *sock_path);
@@ -1282,14 +1283,32 @@ internal_putbytes(const char *s, size_t len)
 			if (internal_flush())
 				return EOF;
 		}
-		amount = PqSendBufferSize - PqSendPointer;
-		if (amount > len)
-			amount = len;
-		memcpy(PqSendBuffer + PqSendPointer, s, amount);
-		PqSendPointer += amount;
-		s += amount;
-		len -= amount;
+
+		/*
+		 * If the buffer is empty and data length is larger than the buffer
+		 * size, send it without buffering. Otherwise, put as much data as
+		 * possible into the buffer.
+		 */
+		if (len >= PqSendBufferSize && PqSendStart == PqSendPointer)
+		{
+			size_t start = 0;
+
+			socket_set_nonblocking(false);
+			if (internal_flush_buffer(s, &start, &len))
+				return EOF;
+		}
+		else
+		{
+			amount = PqSendBufferSize - PqSendPointer;
+			if (amount > len)
+				amount = len;
+			memcpy(PqSendBuffer + PqSendPointer, s, amount);
+			PqSendPointer += amount;
+			s += amount;
+			len -= amount;
+		}
 	}
+
 	return 0;
 }
 
@@ -1321,13 +1340,27 @@ socket_flush(void)
  * and the socket is in non-blocking mode), or EOF if trouble.
  * --------------------------------
  */
-static int
+static inline int
 internal_flush(void)
+{
+	/* flush the pending output from send buffer. */
+	return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer);
+}
+
+/* --------------------------------
+ *		internal_flush_buffer - flush the given buffer content
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
+static inline int
+internal_flush_buffer(const char *s, size_t *start, size_t *end)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer + PqSendStart;
-	char	   *bufend = PqSendBuffer + PqSendPointer;
+	char	   *bufptr = (char*) s + *start;
+	char	   *bufend = (char*) s + *end;
 
 	while (bufptr < bufend)
 	{
@@ -1373,7 +1406,7 @@ internal_flush(void)
 			 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
 			 * the connection.
 			 */
-			PqSendStart = PqSendPointer = 0;
+			*start = *end = 0;
 			ClientConnectionLost = 1;
 			InterruptPending = 1;
 			return EOF;
@@ -1381,10 +1414,10 @@ internal_flush(void)
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
-		PqSendStart += r;
+		*start += r;
 	}
 
-	PqSendStart = PqSendPointer = 0;
+	*start = *end = 0;
 	return 0;
 }
 
-- 
2.34.1

