From 31b844ab11b6933421cd4ac2a56d6999b69a84ae Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 29 Jun 2020 09:08:02 +0530
Subject: [PATCH v1] Performance Improvement For Copy From Binary Files

Read data from binary file in RAW_BUF_SIZE bytes at a time for
COPY FROM command, instead of reading everytime from file for
field count, size and field data. This avoids fread calls
significantly and improves performance of the COPY FROM binary
files command.
---
 src/backend/commands/copy.c | 160 ++++++++++++++++++++++++++++--------
 1 file changed, 127 insertions(+), 33 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6b1fd6d4cc..dd50643097 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -390,8 +390,6 @@ static int	CopyGetData(CopyState cstate, void *databuf,
 static void CopySendInt32(CopyState cstate, int32 val);
 static bool CopyGetInt32(CopyState cstate, int32 *val);
 static void CopySendInt16(CopyState cstate, int16 val);
-static bool CopyGetInt16(CopyState cstate, int16 *val);
-
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -760,24 +758,6 @@ CopySendInt16(CopyState cstate, int16 val)
 	CopySendData(cstate, &buf, sizeof(buf));
 }
 
-/*
- * CopyGetInt16 reads an int16 that appears in network byte order
- */
-static bool
-CopyGetInt16(CopyState cstate, int16 *val)
-{
-	uint16		buf;
-
-	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
-	{
-		*val = 0;				/* suppress compiler warning */
-		return false;
-	}
-	*val = (int16) pg_ntoh16(buf);
-	return true;
-}
-
-
 /*
  * CopyLoadRawBuf loads some more data into raw_buf
  *
@@ -3367,7 +3347,12 @@ BeginCopyFrom(ParseState *pstate,
 	initStringInfo(&cstate->attribute_buf);
 	initStringInfo(&cstate->line_buf);
 	cstate->line_buf_converted = false;
-	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+
+	if (cstate->binary)
+		cstate->raw_buf = (char *) palloc0(RAW_BUF_SIZE);
+	else
+		cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
 
 	/* Assign range table, we'll need it in CopyFrom. */
@@ -3736,15 +3721,58 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		/* binary */
 		int16		fld_count;
 		ListCell   *cur;
+		int readbytes = 0;
 
 		cstate->cur_lineno++;
 
-		if (!CopyGetInt16(cstate, &fld_count))
+		if (cstate->cur_lineno == 1)
 		{
-			/* EOF detected (end of file, or protocol-level EOF) */
-			return false;
+			/* This is for the first time, so read in buff size amount
+			 * of data from file.
+			 */
+			cstate->raw_buf_index = 0;
+
+			readbytes = CopyGetData(cstate, &cstate->raw_buf[0], 1, RAW_BUF_SIZE);
+
+			/* If true, then the file is empty, so return from here. */
+			if (cstate->reached_eof)
+				return false;
 		}
 
+		if (cstate->raw_buf_index + sizeof(fld_count) >= (RAW_BUF_SIZE - 1))
+		{
+			/* If the fld_count is not entirely within the current raw_buf's
+			 * data, so let's read next set of data from file.
+			 */
+			uint8 movebytes = 0;
+
+			/* Move bytes can either be 0, 1, or 2. */
+			movebytes = RAW_BUF_SIZE - cstate->raw_buf_index;
+
+			if (movebytes > 0)
+				memmove(&cstate->raw_buf[0], &cstate->raw_buf[cstate->raw_buf_index], movebytes);
+
+			readbytes = CopyGetData(cstate, &cstate->raw_buf[movebytes], 1, (RAW_BUF_SIZE - movebytes));
+
+			if (cstate->reached_eof)
+				ereport(ERROR,
+					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					errmsg("unexpected EOF in COPY data")));
+
+			/* If readbytes are lesser than the requested bytes, then initialize the
+			 * remaining bytes in the raw_buf to 0. This will be useful for checking
+			 * error "received copy data after EOF marker".
+			 */
+			if (readbytes < (RAW_BUF_SIZE - movebytes))
+				MemSet(&cstate->raw_buf[readbytes], 0, (RAW_BUF_SIZE - movebytes));
+
+			cstate->raw_buf_index = 0;
+		}
+
+		memcpy(&fld_count, &cstate->raw_buf[cstate->raw_buf_index], sizeof(fld_count));
+
+		fld_count = (int16) pg_ntoh16(fld_count);
+
 		if (fld_count == -1)
 		{
 			/*
@@ -3759,10 +3787,8 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			 * error if there's data after the EOF marker, for consistency
 			 * with the new-protocol case.
 			 */
-			char		dummy;
-
 			if (cstate->copy_dest != COPY_OLD_FE &&
-				CopyGetData(cstate, &dummy, 1, 1) > 0)
+				cstate->raw_buf[cstate->raw_buf_index + sizeof(fld_count)] != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("received copy data after EOF marker")));
@@ -3775,6 +3801,8 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 					 errmsg("row field count is %d, expected %d",
 							(int) fld_count, attr_count)));
 
+		cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
+
 		foreach(cur, cstate->attnumlist)
 		{
 			int			attnum = lfirst_int(cur);
@@ -4716,16 +4744,55 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo,
 {
 	int32		fld_size;
 	Datum		result;
+	int   		readbytes;
 
-	if (!CopyGetInt32(cstate, &fld_size))
-		ereport(ERROR,
+	if ((cstate->raw_buf_index + sizeof(fld_size)) >= (RAW_BUF_SIZE - 1))
+	{
+		/* If the fld_size is not entirely within the current raw_buf's
+		* data, so let's read next set of data from file.
+		*/
+		uint8 		movebytes = 0;
+
+		/* Move bytes can either be 0, 1, 2, 3 or 4. */
+		movebytes = RAW_BUF_SIZE - cstate->raw_buf_index;
+
+		if (movebytes > 0)
+			memmove(&cstate->raw_buf[0], &cstate->raw_buf[cstate->raw_buf_index], movebytes);
+
+		readbytes = CopyGetData(cstate, &cstate->raw_buf[movebytes], 1, (RAW_BUF_SIZE - movebytes));
+
+		if (cstate->reached_eof)
+			ereport(ERROR,
 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-				 errmsg("unexpected EOF in COPY data")));
+				errmsg("unexpected EOF in COPY data")));
+
+		/* If readbytes are lesser than the requested bytes, then initialize the
+		* remaining bytes in the raw_buf to 0. This will be useful for checking
+		* error "received copy data after EOF marker".
+		*/
+		if (readbytes < (RAW_BUF_SIZE - movebytes))
+			MemSet(&cstate->raw_buf[readbytes], 0, (RAW_BUF_SIZE - movebytes));
+
+		cstate->raw_buf_index = 0;
+	}
+
+	memcpy(&fld_size, &cstate->raw_buf[cstate->raw_buf_index], sizeof(fld_size));
+
+	cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size);
+
+	fld_size = (int32) pg_ntoh32(fld_size);
+
 	if (fld_size == -1)
 	{
 		*isnull = true;
 		return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
 	}
+
+	if (fld_size == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+				 errmsg("unexpected EOF in COPY data")));
+
 	if (fld_size < 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
@@ -4735,12 +4802,39 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo,
 	resetStringInfo(&cstate->attribute_buf);
 
 	enlargeStringInfo(&cstate->attribute_buf, fld_size);
-	if (CopyGetData(cstate, cstate->attribute_buf.data,
-					fld_size, fld_size) != fld_size)
-		ereport(ERROR,
+
+	if ((RAW_BUF_SIZE - cstate->raw_buf_index) >= fld_size)
+	{
+		/* The field/column lies entirely within the current
+		 * raw_buf data.
+		 */
+		memcpy(&cstate->attribute_buf.data[0], &cstate->raw_buf[cstate->raw_buf_index], fld_size);
+		cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
+	}
+	else
+	{
+		/* The field/column does not lie entirely within the current
+		 * raw_buf data, so let's first take the available field data
+		 * from current raw_buf data and get the remaning field data
+		 * directly from file.
+		 */
+		int32 remainingbytesincurrdatablock = RAW_BUF_SIZE - cstate->raw_buf_index;
+
+		memcpy(&cstate->attribute_buf.data[0], &cstate->raw_buf[cstate->raw_buf_index], remainingbytesincurrdatablock);
+
+		if (CopyGetData(cstate, &cstate->attribute_buf.data[remainingbytesincurrdatablock],
+					(fld_size - remainingbytesincurrdatablock), (fld_size - remainingbytesincurrdatablock)) != (fld_size - remainingbytesincurrdatablock))
+			ereport(ERROR,
 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 				 errmsg("unexpected EOF in COPY data")));
 
+		/* After getting the complete field/column data, now refill the
+		 * raw_buf with the data read from the file.
+		 */
+		readbytes = CopyGetData(cstate, &cstate->raw_buf[0], 1, RAW_BUF_SIZE);
+		cstate->raw_buf_index = 0;
+	}
+
 	cstate->attribute_buf.len = fld_size;
 	cstate->attribute_buf.data[fld_size] = '\0';
 
-- 
2.25.1

