From ebbc80179d262f80eff31f5f7d6db218c5321354 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v6] Add whole-record WAL compression alongside FPI compression

Extends WAL compression to compress entire WAL records when their size
exceeds wal_compression_threshold (default 512 bytes), complementing
existing per-FPI compression which continues to work unchanged.

When wal_compression is enabled and a record exceeds the threshold, the
entire record body is compressed as a single unit.  This achieves better
ratios than per-FPI compression for workloads where multiple pages share
similar content, such as B-tree index builds.

Benchmark (CREATE INDEX on 10M rows of random float data, VACUUM +
CHECKPOINT before each run so B-tree pages are logged as FPIs):
  Method   No compression   FPI-only (HEAD)   Whole-record (Patched)
  ------   --------------   ---------------   ----------------------
  pglz          193 MB           193 MB              193 MB
  lz4           193 MB           160 MB              132 MB
  zstd          193 MB           125 MB               97 MB

When whole-record compression is active, per-FPI compression is skipped
during record assembly so that FPI data is not compressed twice.  If
whole-record compression fails (e.g. incompressible data) and the record
contains FPIs, the code falls back to per-FPI compression automatically.
Setting wal_compression_threshold >= wal_compression_buffer disables
whole-record compression and restores pure FPI-only behaviour.

Memory is controlled by the new wal_compression_buffer GUC.  Its default
equals MIN_WAL_COMPRESSION_BUFFER = XLR_MAX_BLOCK_ID * COMPRESS_BUFSIZE +
HEADER_SCRATCH_SIZE, sized to hold the largest possible WAL record
(32 full-page images plus headers) uncompressed.  This is the same total
memory previously occupied by per-block compressed_page arrays, so the
default overhead is unchanged.  Total memory per backend is approximately
2x wal_compression_buffer: one buffer for assembly, one for output.

The XLR_COMPRESSED flag (0x04) in xl_info marks compressed records.
Compressed records begin with XLogCompressionHeader carrying the
compression method and original decompressed length.

Author: Andrey Borodin
Discussion: https://postgr.es/m/4DC38068-976E-4A84-8EE6-4EFACBBD927A@yandex-team.ru
---
 src/backend/access/transam/xlog.c             |  18 +-
 src/backend/access/transam/xloginsert.c       | 504 +++++++++++++++---
 src/backend/access/transam/xlogreader.c       | 233 +++++++-
 src/backend/utils/misc/guc_parameters.dat     |  25 +
 src/backend/utils/misc/postgresql.conf.sample |   5 +-
 src/include/access/xlog.h                     |   2 +
 src/include/access/xlogreader.h               |   4 +
 src/include/access/xlogrecord.h               |  29 +-
 src/include/utils/guc_hooks.h                 |   2 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  15 +-
 src/test/recovery/Makefile                    |  10 +
 .../recovery/t/046_checkpoint_logical_slot.pl |   5 +-
 src/test/recovery/t/052_wal_compression.pl    |  93 ++++
 src/test/recovery/wal_compression.conf        |   3 +
 14 files changed, 846 insertions(+), 102 deletions(-)
 create mode 100644 src/test/recovery/t/052_wal_compression.pl
 create mode 100644 src/test/recovery/wal_compression.conf

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 81dc86847c0..0e5f96c5b57 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -138,6 +138,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -717,6 +718,21 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+#ifdef WAL_DEBUG
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader *) record;
+		Assert(c->decompressed_length > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+#endif
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1034,7 +1050,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..16da53ba565 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -39,6 +39,7 @@
 #include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 
@@ -63,6 +64,24 @@
 /* Buffer size required to store a compressed version of backup block image */
 #define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
 
+#define SizeOfXlogOrigin		(sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
+
+#define HEADER_SCRATCH_SIZE \
+	(SizeOfXLogRecord + \
+	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+	 SizeOfXLogTransactionId)
+
+/*
+ * Minimum buffer size for wal_compression_buffer GUC.
+ *
+ * Must accommodate the largest WAL record we might want to compress as a
+ * whole: up to XLR_MAX_BLOCK_ID full-page images (each up to COMPRESS_BUFSIZE
+ * bytes when uncompressed) plus per-block and record headers.
+ */
+#define MIN_WAL_COMPRESSION_BUFFER	(XLR_MAX_BLOCK_ID * COMPRESS_BUFSIZE + HEADER_SCRATCH_SIZE)
+
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -84,8 +103,8 @@ typedef struct
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
 
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
+	/* pointer into compression_buf for compressed page image */
+	char	   *compressed_page;
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -115,14 +134,62 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
-#define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
-#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
+/*
+ * GUC: Maximum memory per backend for WAL compression.
+ *
+ * Controls the size of the shared FPI compression buffer as well as the
+ * threshold for whole-record compression: records larger than this value
+ * will not be compressed as a whole (per-FPI compression still applies).
+ *
+ * The default equals MIN_WAL_COMPRESSION_BUFFER, which is the total memory
+ * previously used by embedded per-block compressed_page arrays.
+ *
+ * Actual memory consumption per backend is approximately 2x this value
+ * because we need both an input buffer and an output buffer for whole-record
+ * compression.
+ */
+int		wal_compression_buffer;
 
-#define HEADER_SCRATCH_SIZE \
-	(SizeOfXLogRecord + \
-	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
-	 SizeOfXLogTransactionId)
+static XLogRecData		compressed_rdt_hdr;
+
+/*
+ * Compression buffers, allocated once and grown with repalloc if
+ * wal_compression_buffer increases.  compression_buffers_size tracks the
+ * wal_compression_buffer value at which they were last allocated.
+ *
+ * compression_buf is a single wal_compression_buffer-sized staging area
+ * shared between two mutually exclusive uses:
+ *
+ *  - FPI compression (skip_fpi_compression = false): compressed block images
+ *    are packed into compression_buf one after another, with
+ *    compression_buf_offset tracking the fill level.  rdt nodes point
+ *    directly into this buffer.
+ *
+ *  - Whole-record compression (skip_fpi_compression = true): XLogCompressRdt
+ *    flattens the rdt chain into compression_buf before compressing into
+ *    compressed_data.  FPI compression is skipped in this path, so the two
+ *    uses never overlap.
+ *
+ * compressed_data holds the compressor output for whole-record compression.
+ * Its size is the maximum compressed output for wal_compression_buffer bytes.
+ */
+static char			   *compression_buf = NULL;
+static int				compression_buf_offset;		/* fill level for FPI packing */
+static char			   *compressed_data = NULL;
+static uint32			compressed_data_size;		/* allocated size of compressed_data */
+static int				compression_buffers_size;	/* wal_compression_buffer at last alloc */
+
+/*
+ * In assert builds (where MEMORY_CONTEXT_CHECKING is active), verify every
+ * palloc sentinel in the process after operations that write to compression
+ * buffers.  This catches overflows that corrupt adjacent palloc blocks.
+ * Compiles to nothing in non-assert builds.
+ */
+#ifdef MEMORY_CONTEXT_CHECKING
+#define CheckCompressionMemory() MemoryContextCheck(TopMemoryContext)
+#else
+#define CheckCompressionMemory() ((void) 0)
+#endif
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -136,11 +203,13 @@ static bool begininsert_called = false;
 /* Memory context to hold the registered buffer and data references. */
 static MemoryContext xloginsert_cxt;
 
+static void AllocCompressionBuffers(void);
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
-									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   uint64 *fpi_bytes,
-									   bool *topxid_included);
+								   XLogRecPtr *fpw_lsn, int *num_fpi,
+								   uint64 *fpi_bytes,
+								   bool *topxid_included, uint64 *rec_size,
+								   bool skip_fpi_compression);
 static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
 									uint16 hole_length, void *dest, uint16 *dlen);
 
@@ -155,6 +224,9 @@ XLogBeginInsert(void)
 	Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
 	Assert(mainrdata_len == 0);
 
+	/* Enforce that InitXLogInsert() was called before any WAL insertion */
+	Assert(xloginsert_cxt != NULL);
+
 	/* cross-check on whether we should be here or not */
 	if (!XLogInsertAllowed())
 		elog(ERROR, "cannot make new WAL entries during recovery");
@@ -234,6 +306,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buf_offset = 0;
 	begininsert_called = false;
 }
 
@@ -463,6 +536,124 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData *
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32		flat_len = 0;
+	uint32		orig_len;
+	int32		compr_len = -1;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	Assert(compression_buf != NULL);
+
+	/*
+	 * Flatten the rdt chain into compression_buf.  FPI compression is always
+	 * skipped when whole-record compression is attempted (skip_fpi_compression
+	 * = true), so compression_buf is unused at this point and safe to
+	 * overwrite.
+	 *
+	 * Caller must have checked rec_size <= wal_compression_buffer before
+	 * calling us, and AllocCompressionBuffers() guarantees the buffer is at
+	 * least wal_compression_buffer bytes.  Verify the invariant here so that
+	 * any future drift between rec_size and the actual rdt chain length is
+	 * caught immediately rather than silently corrupting memory.
+	 */
+	for (const XLogRecData *r = rdt; r != NULL; r = r->next)
+	{
+		memcpy(compression_buf + flat_len, r->data, r->len);
+		flat_len += r->len;
+	}
+	Assert(flat_len <= (uint32) wal_compression_buffer);
+
+	src_header = (XLogRecord *) compression_buf;
+	compressed_header = (XLogCompressionHeader *) compressed_data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = flat_len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char *) &src_header[1], orig_len, (char *) &compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+		compr_len = LZ4_compress_default((char *) &src_header[1], (char *) &compressed_header[1],
+									   orig_len, compressed_data_size);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+		compr_len = ZSTD_compress((char *) &compressed_header[1], compressed_data_size,
+								(char *) &src_header[1], orig_len, ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	/* Verify compression did not overflow any palloc'd buffer. */
+	CheckCompressionMemory();
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info |= XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assembled record (which may be compressed). */
+static void
+XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord *) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, ((char *)rdt->data) + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -478,6 +669,15 @@ XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info)
 {
 	XLogRecPtr	EndPos;
+	/*
+	 * When whole-record compression can apply (threshold is within the buffer
+	 * size limit), skip per-FPI compression during assembly to avoid
+	 * compressing FPI data twice.  When threshold >= buffer_size, whole-record
+	 * compression can never trigger, so FPI compression runs normally.
+	 */
+	bool		try_whole_record = (wal_compression != WAL_COMPRESSION_NONE &&
+									wal_compression_threshold <
+									wal_compression_buffer);
 
 	/* XLogBeginInsert() must have been called. */
 	if (!begininsert_called)
@@ -514,6 +714,15 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecData *rdt;
 		int			num_fpi = 0;
 		uint64		fpi_bytes = 0;
+		uint64		rec_size;
+
+		/*
+		 * Reset the FPI compression offset at the start of each iteration.
+		 * The do-while loop retries when XLogInsertRecord returns
+		 * InvalidXLogRecPtr (e.g. because doPageWrites changed), so we must
+		 * not accumulate FPI data across iterations.
+		 */
+		compression_buf_offset = 0;
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -524,7 +733,53 @@ XLogInsert(RmgrId rmid, uint8 info)
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
 								 &fpw_lsn, &num_fpi, &fpi_bytes,
-								 &topxid_included);
+								 &topxid_included, &rec_size,
+								 try_whole_record);
+
+		/*
+		 * When try_whole_record is true, FPI compression was skipped during
+		 * assembly.  Attempt whole-record compression if the record is in the
+		 * right size range.  Neither XLogCompressRdt() nor the compressors it
+		 * calls allocate memory, so this is safe inside a critical section.
+		 *
+		 * If whole-record compression was not used for any reason (record too
+		 * small, too large for our buffer, or compressor rejected it), fall
+		 * back to per-FPI compression by reassembling.  This ensures FPIs are
+		 * never silently left uncompressed because we optimistically skipped
+		 * them during the first assembly pass.
+		 */
+		if (try_whole_record)
+		{
+			bool		whole_record_compressed = false;
+
+			if (rec_size > wal_compression_threshold &&
+				rec_size <= (uint32) wal_compression_buffer)
+			{
+				XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+
+				if (rdt_compressed != NULL)
+				{
+					rdt = rdt_compressed;
+					whole_record_compressed = true;
+				}
+			}
+
+			if (!whole_record_compressed && num_fpi > 0)
+			{
+				/*
+				 * Fall back to per-FPI compression.  XLogRecordAssemble()
+				 * overwrites the same static buffers each call, so no extra
+				 * memory is required.
+				 */
+				compression_buf_offset = 0;
+				rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
+										 &fpw_lsn, &num_fpi, &fpi_bytes,
+										 &topxid_included, &rec_size,
+										 false);
+			}
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  fpi_bytes, topxid_included);
@@ -547,6 +802,87 @@ XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
 	return XLogInsert(rmid, info);
 }
 
+/*
+ * Allocate (or grow) the two compression buffers.
+ *
+ * Must be called outside a critical section.  InitXLogInsert() calls this at
+ * backend start when wal_compression is enabled.  The GUC assign hooks for
+ * wal_compression and wal_compression_buffer call it when those GUCs change
+ * at runtime, which also always happens outside critical sections.
+ *
+ * If the buffers are already sized to wal_compression_buffer this is a no-op.
+ */
+static void
+AllocCompressionBuffers(void)
+{
+	uint32		new_size = wal_compression_buffer;
+	uint32		compressed_buf_size;
+
+	Assert(CritSectionCount == 0);
+	Assert(xloginsert_cxt != NULL);
+
+	if (compression_buffers_size >= new_size)
+		return;					/* already big enough */
+
+	compressed_buf_size = PGLZ_MAX_OUTPUT(new_size);
+#ifdef USE_LZ4
+	compressed_buf_size = Max(compressed_buf_size,
+							  LZ4_COMPRESSBOUND(new_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buf_size = Max(compressed_buf_size,
+							  ZSTD_COMPRESSBOUND(new_size));
+#endif
+	compressed_buf_size += SizeOfXLogCompressedRecord;
+
+	if (compression_buf == NULL)
+	{
+		compression_buf = MemoryContextAlloc(xloginsert_cxt, new_size);
+		compressed_data = MemoryContextAlloc(xloginsert_cxt, compressed_buf_size);
+	}
+	else
+	{
+		compression_buf = repalloc(compression_buf, new_size);
+		compressed_data = repalloc(compressed_data, compressed_buf_size);
+	}
+
+	compressed_data_size = compressed_buf_size;
+	compression_buffers_size = new_size;
+	CheckCompressionMemory();
+}
+
+/*
+ * GUC assign hook for wal_compression.
+ *
+ * Allocates compression buffers immediately when compression is first enabled
+ * so that XLogInsert() never needs to allocate inside a critical section.
+ * If xloginsert_cxt is not yet set up (very early startup), the allocation is
+ * deferred to InitXLogInsert().
+ */
+void
+assign_wal_compression(int newval, void *extra)
+{
+	if (newval != WAL_COMPRESSION_NONE &&
+		xloginsert_cxt != NULL &&
+		compression_buf == NULL)
+		AllocCompressionBuffers();
+}
+
+/*
+ * GUC assign hook for wal_compression_buffer.
+ *
+ * Grows the compression buffers immediately when the GUC is increased so that
+ * XLogInsert() never needs to repalloc inside a critical section.
+ */
+void
+assign_wal_compression_buffer(int newval, void *extra)
+{
+	if (wal_compression != WAL_COMPRESSION_NONE &&
+		xloginsert_cxt != NULL &&
+		compression_buf != NULL)
+		AllocCompressionBuffers();
+}
+
 /*
  * Assemble a WAL record from the registered data and buffers into an
  * XLogRecData chain, ready for insertion with XLogInsertRecord().
@@ -566,12 +902,11 @@ static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
 				   XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
-				   bool *topxid_included)
+				   bool *topxid_included, uint64 *rec_size,
+				   bool skip_fpi_compression)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -599,6 +934,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	if (wal_consistency_checking[rmid])
 		info |= XLR_CHECK_CONSISTENCY;
 
+	/*
+	 * Compression buffers must already be allocated by this point.
+	 * InitXLogInsert() allocates them at backend start when wal_compression
+	 * is enabled; the assign hooks for wal_compression and
+	 * wal_compression_buffer handle later changes outside critical sections.
+	 * We must never allocate or repalloc here because XLogInsert() is
+	 * routinely called inside critical sections.
+	 */
+	Assert(wal_compression == WAL_COMPRESSION_NONE ||
+		   (compression_buf != NULL &&
+			compression_buffers_size >= wal_compression_buffer));
+
 	/*
 	 * Make an rdata chain containing all the data portions of all block
 	 * references. This includes the data for full-page images. Also append
@@ -613,6 +960,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
 		XLogRecordBlockCompressHeader cbimg = {0};
+		uint16		hole_length;
 		bool		samerel;
 		bool		is_compressed = false;
 		bool		include_image;
@@ -701,18 +1049,37 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 				cbimg.hole_length = 0;
 			}
 
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
+		/*
+		 * Try to compress a block image if wal_compression is enabled,
+		 * we have space in the shared FPI compression buffer, and the caller
+		 * has not requested that FPI compression be skipped (because
+		 * whole-record compression will be applied instead).
+		 */
+		if (!skip_fpi_compression &&
+			wal_compression != WAL_COMPRESSION_NONE &&
+			compression_buf != NULL &&
+			compression_buf_offset + COMPRESS_BUFSIZE <= wal_compression_buffer)
 			{
+				/* Assign pointer into shared buffer for this FPI */
+				regbuf->compressed_page = compression_buf + compression_buf_offset;
+
 				is_compressed =
 					XLogCompressBackupBlock(page, bimg.hole_offset,
 											cbimg.hole_length,
 											regbuf->compressed_page,
 											&compressed_len);
+
+				if (is_compressed)
+				{
+					compression_buf_offset += compressed_len;
+					/* Verify FPI compression did not overrun compression_buf. */
+					CheckCompressionMemory();
+				}
 			}
 
+			/* for uncompressed images, use hole_length from cbimg */
+			hole_length = cbimg.hole_length;
+
 			/*
 			 * Fill in the remaining fields in the XLogRecordBlockHeader
 			 * struct
@@ -778,9 +1145,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			}
 			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -795,9 +1162,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -914,19 +1281,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -950,7 +1304,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
+
+	*rec_size = rechdr->xl_tot_len;
 
 	return &hdr_rdt;
 }
@@ -1365,50 +1721,60 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 
 /*
  * Allocate working buffers needed for WAL record construction.
+ *
+ * Must be called exactly once per process before any XLogBeginInsert() call.
+ * Both regular backends (via InitPostgres) and auxiliary processes (via
+ * AuxiliaryProcessMainCommon) must call this.
  */
 void
 InitXLogInsert(void)
 {
 #ifdef USE_ASSERT_CHECKING
+	{
+		size_t		max_required;
 
-	/*
-	 * Check that any records assembled can be decoded.  This is capped based
-	 * on what XLogReader would require at its maximum bound.  The XLOG_BLCKSZ
-	 * addend covers the larger allocate_recordbuf() demand.  This code path
-	 * is called once per backend, more than enough for this check.
-	 */
-	size_t		max_required =
-		DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
+		/* Detect extraneous calls */
+		Assert(xloginsert_cxt == NULL);
 
-	Assert(AllocSizeIsValid(max_required));
+		/*
+		 * Check that any records assembled can be decoded.  This is capped
+		 * based on what XLogReader would require at its maximum bound.  The
+		 * XLOG_BLCKSZ addend covers the larger allocate_recordbuf() demand.
+		 * This code path is called once per backend, more than enough for
+		 * this check.
+		 */
+		max_required = DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
+		Assert(AllocSizeIsValid(max_required));
+	}
 #endif
 
 	/* Initialize the working areas */
-	if (xloginsert_cxt == NULL)
-	{
-		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
-											   "WAL record construction",
-											   ALLOCSET_DEFAULT_SIZES);
-	}
+	xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
+										   "WAL record construction",
+										   ALLOCSET_DEFAULT_SIZES);
 
-	if (registered_buffers == NULL)
-	{
-		registered_buffers = (registered_buffer *)
-			MemoryContextAllocZero(xloginsert_cxt,
-								   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
-		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
-	}
-	if (rdatas == NULL)
-	{
-		rdatas = MemoryContextAlloc(xloginsert_cxt,
-									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
-		max_rdatas = XLR_NORMAL_RDATAS;
-	}
+	registered_buffers = (registered_buffer *)
+		MemoryContextAllocZero(xloginsert_cxt,
+							   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
+	max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
+
+	rdatas = MemoryContextAlloc(xloginsert_cxt,
+								sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
+	max_rdatas = XLR_NORMAL_RDATAS;
 
 	/*
 	 * Allocate a buffer to hold the header information for a WAL record.
 	 */
-	if (hdr_scratch == NULL)
-		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
-											 HEADER_SCRATCH_SIZE);
+	hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE);
+
+	/*
+	 * Pre-allocate compression buffers if compression is already enabled.
+	 * This ensures the buffers exist before the first XLogInsert() call,
+	 * which may happen inside a critical section where palloc is unsafe.
+	 * If wal_compression is later turned on or wal_compression_buffer is
+	 * increased, the GUC assign hooks handle allocation at that point
+	 * (also always outside critical sections).
+	 */
+	if (wal_compression != WAL_COMPRESSION_NONE)
+		AllocCompressionBuffers();
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c60aa9a51e9..080ab2aa0da 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -54,6 +55,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord *XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -170,6 +173,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -533,7 +538,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_physical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -644,8 +650,67 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_physical = record->xl_tot_len;
 
+	/*
+	 * Determine total_len_decomp for pre-validation allocation.
+	 *
+	 * Only xl_tot_len (the very first field) is guaranteed to be on this page.
+	 * Any other XLogRecord field, including xl_info, may be on the next page
+	 * when the record starts near the end of a WAL page.  Only read xl_info
+	 * after confirming the full header fits here.
+	 */
+	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+	{
+		/* Full header is on this page; safe to read xl_info. */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+			{
+				total_len_decomp = -1; /* XLogCompressionHeader spans pages */
+			}
+			else if (total_len_physical < SizeOfXLogCompressedRecord)
+			{
+				total_len_decomp = -1; /* Cannot be a valid compressed record */
+			}
+			else
+			{
+				XLogCompressionHeader *c = (XLogCompressionHeader *) record;
+				uint32		dlen = c->decompressed_length;
+				char		method = c->method;
+				bool		valid_method;
+
+				valid_method = (method == XLR_COMPRESS_PGLZ)
+#ifdef USE_LZ4
+					|| (method == XLR_COMPRESS_LZ4)
+#endif
+#ifdef USE_ZSTD
+					|| (method == XLR_COMPRESS_ZSTD)
+#endif
+					;
+
+				/* Sanity-check before using for allocation */
+				if (dlen <= SizeOfXLogRecord ||
+					dlen > MaxAllocSize ||
+					!valid_method)
+					total_len_decomp = -1;
+				else
+					total_len_decomp = dlen;
+			}
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
+	}
+	else
+	{
+		/*
+		 * Header spans pages; cannot read xl_info yet.  Use xl_tot_len as
+		 * a conservative pre-allocation size.  If the record turns out to be
+		 * compressed after assembly, total_len_decomp will be corrected from
+		 * XLogCompressionHeader.decompressed_length below.
+		 */
+		total_len_decomp = record->xl_tot_len;
+	}
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -660,16 +725,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_physical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%08X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_physical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,9 +748,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -695,7 +764,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_physical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -728,7 +797,9 @@ restart:
 			 * can handle the case where the previous record ended as being a
 			 * partial one.
 			 */
-			readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
+			readOff = ReadPageInternal(state, targetPagePtr,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
+										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
 			else if (readOff < 0)
@@ -767,19 +838,19 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_physical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%08X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_physical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
@@ -824,7 +895,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_physical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -835,11 +906,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_physical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_physical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -854,8 +925,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_physical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_physical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -865,7 +937,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_physical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -888,8 +960,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp may not yet reflect the actual decompressed size */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader *) record;
+			Assert(c->decompressed_length > 0);
+			Assert(c->decompressed_length < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1666,6 +1749,110 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord *
+XLogDecompressRecordIfNeeded(XLogReaderState *state,
+							 XLogRecord *record,
+							 XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *src = (XLogCompressionHeader *) record;
+		/* decompressed_length covers the XLogRecord header + body */
+		uint32		body_len = src->decompressed_length - SizeOfXLogRecord;
+		uint32		srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		bool		decomp_success = true;
+		char	   *dst;
+		XLogRecord *dst_h;
+
+		/*
+		 * Grow the decompression buffer if needed, rounding up to BLCKSZ to
+		 * avoid frequent small reallocations.  Since the buffer content is
+		 * always fully overwritten, we simply pfree and reallocate.
+		 */
+		if (state->decompression_buffer_size < src->decompressed_length)
+		{
+			uint32		new_size = (uint32) TYPEALIGN(BLCKSZ, src->decompressed_length);
+
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			state->decompression_buffer =
+				palloc_extended(new_size, MCXT_ALLOC_NO_OOM);
+			if (!state->decompression_buffer)
+			{
+				state->decompression_buffer_size = 0;
+				report_invalid_record(state,
+									  "out of memory while decompressing record at %X/%X",
+									  LSN_FORMAT_ARGS(recptr));
+				return NULL;
+			}
+			state->decompression_buffer_size = new_size;
+		}
+
+		dst_h = (XLogRecord *) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char *) &dst_h[1];
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			/*
+			 * pglz_decompress with check_complete=true verifies that the
+			 * output length exactly equals rawsize, so pass body_len, not the
+			 * total buffer size.
+			 */
+			if (pglz_decompress((char *) &src[1], srclen, dst,
+								body_len, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char *) &src[1], dst,
+									srclen, body_len) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state,
+								  "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr) recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst, body_len,
+														(char *) &src[1], srclen);
+
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state,
+								  "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr) recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state,
+								  "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr) recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state,
+								  "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS(recptr));
+			return NULL;
+		}
+
+		return (XLogRecord *) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1704,6 +1891,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1878,8 +2073,8 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X",
-										  blk->data_len,
+										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%08X",
+										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
 				}
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 7c60b125564..4958e9016a0 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3323,6 +3323,31 @@
   variable => 'wal_compression',
   boot_val => 'WAL_COMPRESSION_NONE',
   options => 'wal_compression_options',
+  assign_hook => 'assign_wal_compression',
+},
+
+{ name => 'wal_compression_buffer', type => 'int', context => 'PGC_SUSET', group => 'WAL_SETTINGS',
+  short_desc => 'Buffer size for WAL compression.',
+  long_desc => 'Sets the buffer size for WAL compression, used for both FPI compression '
+             . 'and whole-record compression. Must be large enough to hold compressed '
+             . 'full page images for up to 32 blocks (XLR_MAX_BLOCK_ID). '
+             . 'Note: actual memory consumption per backend is approximately 2x this value '
+             . 'plus ~5% overhead, as additional buffers are needed for whole-record compression.',
+  flags => 'GUC_UNIT_BYTE',
+  variable => 'wal_compression_buffer',
+  boot_val => '295972',
+  min => '295972',
+  max => '1073741824',
+  assign_hook => 'assign_wal_compression_buffer',
+},
+
+{ name => 'wal_compression_threshold', type => 'int', context => 'PGC_SUSET', group => 'WAL_SETTINGS',
+  short_desc => 'Minimum WAL record length to engage compression.',
+  flags => 'GUC_UNIT_BYTE',
+  variable => 'wal_compression_threshold',
+  boot_val => '512',
+  min => '32',
+  max => 'INT_MAX',
 },
 
 { name => 'wal_consistency_checking', type => 'string', context => 'PGC_SUSET', group => 'DEVELOPER_OPTIONS',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dc9e2255f8a..c1f02bc013a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -245,8 +245,11 @@
 #full_page_writes = on                  # recover from partial page writes
 #wal_log_hints = off                    # also do full page writes of non-critical updates
                                         # (change requires restart)
-#wal_compression = off                  # enables compression of full-page writes;
+#wal_compression = lz4                  # enables compression of full-page writes;
                                         # off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512        # min 32, minimal record length to be compressed
+#wal_compression_buffer = 295972        # buffer for FPI and whole-record compression;
+                                        # actual memory usage is ~2x this value
 #wal_init_zero = on                     # zero-fill new WAL files
 #wal_recycle = on                       # recycle WAL files
 #wal_buffers = -1                       # min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0591a885dd1..a6e759847f2 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -57,6 +57,8 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
+extern PGDLLIMPORT int wal_compression_buffer;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9b63b6aff75..207d58dcf54 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -251,6 +251,10 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/* Buffer for decompressing whole-record compressed WAL records */
+	char	   *decompression_buffer;
+	uint32		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index e8999d3fe91..c3e05bb86bf 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -157,13 +155,13 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_HAS_HOLE		0x01	/* page image has "hole" */
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
-/* compression methods supported */
+/* Compression methods supported for FPI (stored in bimg_info) */
 #define BKPIMAGE_COMPRESS_PGLZ	0x04
 #define BKPIMAGE_COMPRESS_LZ4	0x08
 #define BKPIMAGE_COMPRESS_ZSTD	0x10
 
 #define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
+	(((info) & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
 			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
 
 /*
@@ -178,6 +176,21 @@ typedef struct XLogRecordBlockCompressHeader
 #define SizeOfXLogRecordBlockCompressHeader \
 	sizeof(XLogRecordBlockCompressHeader)
 
+/* compression methods supported for whole-record compression */
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
+
+/* Header prepended to a whole-record compressed WAL record */
+typedef struct XLogCompressionHeader
+{
+	XLogRecord	record_header;
+	uint8		method;			/* XLR_COMPRESS_* */
+	uint32		decompressed_length;
+} XLogCompressionHeader;
+
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
+
 /*
  * Maximum size of the header for a block reference. This is used to size a
  * temporary buffer for constructing the header.
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index f723668da9e..6b5f60c9b4c 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -169,6 +169,8 @@ extern const char *show_unix_socket_permissions(void);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern bool check_wal_consistency_checking(char **newval, void **extra,
 										   GucSource source);
+extern void assign_wal_compression(int newval, void *extra);
+extern void assign_wal_compression_buffer(int newval, void *extra);
 extern void assign_wal_consistency_checking(const char *newval, void *extra);
 extern bool check_wal_segment_size(int *newval, void **extra, GucSource source);
 extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 955dfc0e7f8..55e4aa74f49 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -720,6 +720,13 @@ sub init
 	# TEMP_CONFIG.  Otherwise, print it before TEMP_CONFIG, thereby permitting
 	# overrides.  Settings that merely improve performance or ease debugging
 	# belong before TEMP_CONFIG.
+
+	# Prevent whole-record WAL compression from triggering in tests that are
+	# not specifically testing it.  Tests that want whole-record compression
+	# can either lower this with append_conf() after init(), or supply a lower
+	# value via TEMP_CONFIG.
+	print $conf "wal_compression_threshold = '1GB'\n";
+
 	print $conf PostgreSQL::Test::Utils::slurp_file($ENV{TEMP_CONFIG})
 	  if defined $ENV{TEMP_CONFIG};
 
@@ -3145,10 +3152,16 @@ sub emit_wal
 {
 	my ($self, $size) = @_;
 
+	# Disable whole-record WAL compression for this emission.  Tests call
+	# emit_wal() to place an exact number of bytes in WAL (e.g. to reach a
+	# specific page offset); whole-record compression would shrink the record
+	# and break those position-sensitive calculations.
 	return int(
 		$self->safe_psql(
 			'postgres',
-			"SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
+			"SET wal_compression_threshold = 2147483647;
+			 SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0';
+			 RESET wal_compression_threshold"
 		));
 }
 
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index d41aaaf8ae1..f4c9cd89811 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -24,6 +24,16 @@ export enable_injection_points
 REGRESS_SHLIB=$(abs_top_builddir)/src/test/regress/regress$(DLSUFFIX)
 export REGRESS_SHLIB
 
+# Exercise WAL compression in recovery tests.  Set USE_WAL_COMPRESSION_CONFIG=
+# to disable, or TEMP_CONFIG=/path to use a different config.
+USE_WAL_COMPRESSION_CONFIG ?= 1
+ifneq ($(USE_WAL_COMPRESSION_CONFIG),)
+TEMP_CONFIG ?= $(srcdir)/wal_compression.conf
+export TEMP_CONFIG
+else
+unexport TEMP_CONFIG
+endif
+
 check:
 	$(prove_check)
 
diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl
index 0bf7f024177..633b7a6ac9a 100644
--- a/src/test/recovery/t/046_checkpoint_logical_slot.pl
+++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl
@@ -116,10 +116,9 @@ $node->safe_psql('postgres',
 	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
 );
 
-# Generate a long WAL record, spawning at least two pages for the follow-up
+# Generate a long WAL record, spanning at least two pages for the follow-up
 # post-recovery check.
-$node->safe_psql('postgres',
-	q{select pg_logical_emit_message(false, '', repeat('123456789', 1000))});
+$node->emit_wal(9000);
 
 # Continue the checkpoint and wait for its completion.
 my $log_offset = -s $node->logfile;
diff --git a/src/test/recovery/t/052_wal_compression.pl b/src/test/recovery/t/052_wal_compression.pl
new file mode 100644
index 00000000000..ea772c7eb5e
--- /dev/null
+++ b/src/test/recovery/t/052_wal_compression.pl
@@ -0,0 +1,93 @@
+
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+#
+# Test whole-record WAL compression via wal_compression and
+# wal_compression_threshold.  Exercises compression during replication
+# (walsender path) and decompression during crash recovery (startup path).
+#
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Determine which compression methods are compiled in, preferring the
+# faster hardware-accelerated codecs first for quicker failure feedback.
+my @methods = ();
+push @methods, 'zstd' if check_pg_config('#define HAVE_LIBZSTD 1');
+push @methods, 'lz4'  if check_pg_config('#define HAVE_LIBLZ4 1');
+push @methods, 'pglz';
+
+# Test whole-record WAL compression with a specific method.
+#
+# Creates a primary with the method enabled and a low threshold so that
+# every non-trivial WAL record is compressed.  A streaming standby is
+# used to verify that compressed WAL is transmitted and decoded correctly.
+# Then the primary is stopped immediately (simulating a crash) and
+# restarted to verify that startup recovery can decompress WAL records.
+sub test_wal_compression
+{
+	my ($method) = @_;
+
+	note "testing wal_compression = $method";
+
+	my $primary = PostgreSQL::Test::Cluster->new("primary_$method");
+	$primary->init(allows_streaming => 1);
+
+	# Use the minimum threshold so virtually every record gets compressed,
+	# and leave wal_compression_buffer at its default (set by Cluster.pm).
+	$primary->append_conf(
+		'postgresql.conf',
+		"wal_compression = '$method'\n"
+		  . "wal_compression_threshold = 32\n");
+	$primary->start;
+
+	my $backup_name = "backup_$method";
+	$primary->backup($backup_name);
+
+	my $standby = PostgreSQL::Test::Cluster->new("standby_$method");
+	$standby->init_from_backup($primary, $backup_name, has_streaming => 1);
+	$standby->start;
+
+	# Generate WAL with records that exceed the compression threshold.
+	# Each row's WAL record includes 200-byte payload well above 32 bytes.
+	$primary->safe_psql(
+		'postgres',
+		"CREATE TABLE t AS
+		 SELECT g, repeat('x', 200) AS d
+		 FROM generate_series(1, 100) AS g");
+
+	$primary->wait_for_replay_catchup($standby);
+
+	is( $standby->safe_psql('postgres', 'SELECT count(*) FROM t'),
+		'100',
+		"compressed WAL replicated via streaming ($method)");
+
+	# Insert another batch that will be recovered after the simulated crash.
+	$primary->safe_psql(
+		'postgres',
+		"INSERT INTO t
+		 SELECT g, repeat('y', 200)
+		 FROM generate_series(101, 200) AS g");
+
+	# Stop without a clean shutdown.  On restart PostgreSQL will replay WAL
+	# from the last checkpoint, exercising XLogDecompressRecordIfNeeded for
+	# every compressed record generated since that checkpoint.
+	$primary->stop('immediate');
+	$primary->start;
+
+	is( $primary->safe_psql('postgres', 'SELECT count(*) FROM t'),
+		'200',
+		"crash recovery replays compressed WAL ($method)");
+
+	$primary->stop;
+	$standby->stop;
+}
+
+foreach my $method (@methods)
+{
+	test_wal_compression($method);
+}
+
+done_testing();
diff --git a/src/test/recovery/wal_compression.conf b/src/test/recovery/wal_compression.conf
new file mode 100644
index 00000000000..6867f370ef2
--- /dev/null
+++ b/src/test/recovery/wal_compression.conf
@@ -0,0 +1,3 @@
+# Enable WAL compression for recovery tests.
+# lz4 is used here; 052_wal_compression.pl separately tests all methods.
+wal_compression = 'lz4'
-- 
2.51.2

