From a25bd7013ba491ebc894f5c5dfba3a751ab501c0 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Sat, 6 Apr 2024 19:19:28 +0200
Subject: [PATCH v6] Faster internal_putbytes

---
 src/backend/libpq/pqcomm.c | 69 ++++++++++++++++++++++++--------------
 1 file changed, 44 insertions(+), 25 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 6497100a1a4..df40ef0a5e3 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -119,9 +119,9 @@ static List *sock_paths = NIL;
 #define PQ_RECV_BUFFER_SIZE 8192
 
 static char *PqSendBuffer;
-static int	PqSendBufferSize;	/* Size send buffer */
-static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
-static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
+static size_t PqSendBufferSize; /* Size send buffer */
+static size_t PqSendPointer;	/* Next index to store a byte in PqSendBuffer */
+static size_t PqSendStart;		/* Next index to send a byte in PqSendBuffer */
 
 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
@@ -133,6 +133,7 @@ static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 static bool PqCommBusy;			/* busy sending data to the client */
 static bool PqCommReadingMsg;	/* in the middle of reading a message */
 
+#define internal_flush()	internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer)
 
 /* Internal functions */
 static void socket_comm_reset(void);
@@ -143,8 +144,9 @@ static int	socket_flush_if_writable(void);
 static bool socket_is_send_pending(void);
 static int	socket_putmessage(char msgtype, const char *s, size_t len);
 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
-static int	internal_putbytes(const char *s, size_t len);
-static int	internal_flush(void);
+static inline int internal_putbytes(const char *s, size_t len);
+static pg_noinline int internal_flush_buffer(const char *s, size_t *start,
+											 size_t *end);
 
 static int	Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
 static int	Setup_AF_UNIX(const char *sock_path);
@@ -1268,11 +1270,9 @@ pq_getmessage(StringInfo s, int maxlen)
 }
 
 
-static int
+static inline int
 internal_putbytes(const char *s, size_t len)
 {
-	size_t		amount;
-
 	while (len > 0)
 	{
 		/* If buffer is full, then flush it out */
@@ -1282,14 +1282,33 @@ internal_putbytes(const char *s, size_t len)
 			if (internal_flush())
 				return EOF;
 		}
-		amount = PqSendBufferSize - PqSendPointer;
-		if (amount > len)
-			amount = len;
-		memcpy(PqSendBuffer + PqSendPointer, s, amount);
-		PqSendPointer += amount;
-		s += amount;
-		len -= amount;
+
+		/*
+		 * If the buffer is empty and data length is larger than the buffer
+		 * size, send it without buffering. Otherwise, put as much data as
+		 * possible into the buffer.
+		 */
+		if (len >= PqSendBufferSize && PqSendStart == PqSendPointer)
+		{
+			size_t		start = 0;
+
+			socket_set_nonblocking(false);
+			if (internal_flush_buffer(s, &start, &len))
+				return EOF;
+		}
+		else
+		{
+			size_t		amount = PqSendBufferSize - PqSendPointer;
+
+			if (amount > len)
+				amount = len;
+			memcpy(PqSendBuffer + PqSendPointer, s, amount);
+			PqSendPointer += amount;
+			s += amount;
+			len -= amount;
+		}
 	}
+
 	return 0;
 }
 
@@ -1315,25 +1334,25 @@ socket_flush(void)
 }
 
 /* --------------------------------
- *		internal_flush - flush pending output
+ *		internal_flush_buffer - flush the given buffer content
  *
  * Returns 0 if OK (meaning everything was sent, or operation would block
  * and the socket is in non-blocking mode), or EOF if trouble.
  * --------------------------------
  */
-static int
-internal_flush(void)
+static pg_noinline int
+internal_flush_buffer(const char *s, size_t *start, size_t *end)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer + PqSendStart;
-	char	   *bufend = PqSendBuffer + PqSendPointer;
+	const char *bufptr = s + *start;
+	const char *bufend = s + *end;
 
 	while (bufptr < bufend)
 	{
 		int			r;
 
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		r = secure_write(MyProcPort, (char *) bufptr, bufend - bufptr);
 
 		if (r <= 0)
 		{
@@ -1373,7 +1392,7 @@ internal_flush(void)
 			 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
 			 * the connection.
 			 */
-			PqSendStart = PqSendPointer = 0;
+			*start = *end = 0;
 			ClientConnectionLost = 1;
 			InterruptPending = 1;
 			return EOF;
@@ -1381,10 +1400,10 @@ internal_flush(void)
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
-		PqSendStart += r;
+		*start += r;
 	}
 
-	PqSendStart = PqSendPointer = 0;
+	*start = *end = 0;
 	return 0;
 }
 
@@ -1487,7 +1506,7 @@ static void
 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
 {
 	int			res PG_USED_FOR_ASSERTS_ONLY;
-	int			required;
+	size_t		required;
 
 	/*
 	 * Ensure we have enough space in the output buffer for the message header

base-commit: f956ecd0353b2960f8322b2211142113fe2b6f67
-- 
2.34.1

