From 3ee78ef282e11ef05025ace175e2b4a214b7955a Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v5] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 395 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 249 +++++++----
 src/backend/utils/misc/guc_parameters.dat     |   9 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 12 files changed, 418 insertions(+), 352 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 716a0922c6b..d9e3153c449 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -314,12 +314,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index ff078f22264..b05633a12bb 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -278,46 +278,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 81dc86847c0..c9c7a1b255b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -138,6 +138,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -717,6 +718,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1034,7 +1051,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..2e6a55a48b2 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -42,27 +42,6 @@
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -83,9 +62,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -115,6 +91,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kept in StringInfo for the prototype.
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before we could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -140,9 +126,7 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
 									   uint64 *fpi_bytes,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
-									uint16 hole_length, void *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -163,6 +147,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -234,6 +223,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -303,6 +293,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -356,6 +348,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -390,6 +383,7 @@ XLogRegisterData(const void *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -444,6 +438,7 @@ XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -463,6 +458,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, ((char *)rdt->data) + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -514,6 +647,7 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecData *rdt;
 		int			num_fpi = 0;
 		uint64		fpi_bytes = 0;
+		uint64		rec_size;
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -524,7 +658,16 @@ XLogInsert(RmgrId rmid, uint8 info)
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
 								 &fpw_lsn, &num_fpi, &fpi_bytes,
-								 &topxid_included);
+								 &topxid_included, &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  fpi_bytes, topxid_included);
@@ -566,12 +709,10 @@ static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
 				   XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
-				   bool *topxid_included)
+				   bool *topxid_included, uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -612,9 +753,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -667,8 +807,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 		if (include_image)
 		{
-			const PageData *page = regbuf->page;
-			uint16		compressed_len = 0;
+			const char *page = regbuf->page;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -685,32 +824,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -728,7 +855,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -739,48 +866,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
 			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
+				bimg.length = BLCKSZ - hole_length;
 
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
-			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
-
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -795,9 +884,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -843,12 +932,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -914,19 +997,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -950,92 +1020,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
-						void *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const void *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1411,4 +1400,18 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	/*
+	 * Allocate compression buffers in xloginsert_cxt to ensure they persist
+	 * for the lifetime of the backend, matching other WAL insertion buffers.
+	 */
+	if (data_before_compression == NULL)
+	{
+		MemoryContext oldcxt = MemoryContextSwitchTo(xloginsert_cxt);
+
+		data_before_compression = makeStringInfo();
+		compressed_data = makeStringInfo();
+		MemoryContextSwitchTo(oldcxt);
+	}
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c60aa9a51e9..3facf39842f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -54,6 +55,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -170,6 +173,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -533,7 +538,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_physical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -644,8 +650,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_physical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -660,16 +685,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_physical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%08X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_physical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,9 +708,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -695,7 +724,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_physical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -728,7 +757,9 @@ restart:
 			 * can handle the case where the previous record ended as being a
 			 * partial one.
 			 */
-			readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
+			readOff = ReadPageInternal(state, targetPagePtr,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
+										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
 			else if (readOff < 0)
@@ -767,19 +798,19 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_physical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%08X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_physical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
@@ -824,7 +855,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_physical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -835,11 +866,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_physical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_physical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -854,8 +885,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_physical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_physical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -865,7 +897,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_physical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -888,8 +920,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1666,6 +1709,93 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc_extended(state->decompression_buffer_size,
+														  MCXT_ALLOC_NO_OOM);
+			if (!state->decompression_buffer)
+			{
+				state->decompression_buffer_size = 0;
+				report_invalid_record(state,
+									  "out of memory while decompressing record at %X/%X",
+									  LSN_FORMAT_ARGS(recptr));
+				return NULL;
+			}
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1704,6 +1834,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1811,16 +1949,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1878,8 +2007,8 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X",
-										  blk->data_len,
+										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%08X",
+										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
 				}
@@ -2077,7 +2206,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2099,67 +2227,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%08X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 7c60b125564..182f45e4de6 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3325,6 +3325,15 @@
   options => 'wal_compression_options',
 },
 
+{ name => 'wal_compression_threshold', type => 'int', context => 'PGC_SIGHUP', group => 'WAL_SETTINGS',
+  short_desc => 'Minimum WAL record length to engage compression.',
+  flags => 'GUC_UNIT_BYTE',
+  variable => 'wal_compression_threshold',
+  boot_val => '512',
+  min => '32',
+  max => 'INT_MAX',
+},
+
 { name => 'wal_consistency_checking', type => 'string', context => 'PGC_SUSET', group => 'DEVELOPER_OPTIONS',
   short_desc => 'Sets the WAL resource managers for which WAL consistency checks are done.',
   long_desc => 'Full-page images will be logged for all data blocks and cross-checked against the results of WAL replay.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dc9e2255f8a..3693f3bf5a7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -247,6 +247,7 @@
                                         # (change requires restart)
 #wal_compression = off                  # enables compression of full-page writes;
                                         # off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512        # min 32, minimal record length to be compressed
 #wal_init_zero = on                     # zero-fill new WAL files
 #wal_recycle = on                       # recycle WAL files
 #wal_buffers = -1                       # min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0591a885dd1..11c7c9e5719 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -57,6 +57,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 16ebc76e743..c06e8037c1b 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -46,6 +46,7 @@ extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const void *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9b63b6aff75..97066eecf29 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -251,6 +251,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index e8999d3fe91..05aa8e4b46e 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,23 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
 #define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+	(((info) & (XLR_COMPRESS_PGLZ | XLR_COMPRESS_LZ4 | \
+			  XLR_COMPRESS_ZSTD)) != 0)
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +181,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index 4f501169f42..a8044752b92 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.51.2

