From f77c06f60857b03086169aac5b0dd479fe046129 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 21 Mar 2024 18:00:47 +0000
Subject: [PATCH v26 4/4] Demonstrate reading unflushed WAL directly from WAL
 buffers

---
 src/backend/access/transam/xlogreader.c       |   3 +-
 .../read_wal_from_buffers--1.0.sql            |  23 ++
 .../read_wal_from_buffers.c                   | 266 +++++++++++++++++-
 .../read_wal_from_buffers/t/001_basic.pl      |  37 +++
 4 files changed, 327 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 75ea36c37f..3e1b814d54 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1033,7 +1033,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+	if (state->seg.ws_segno != 0 &&
+		targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
index 82fa097d10..72d05522fc 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -12,3 +12,26 @@ CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
     bytes_read OUT int)
 AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
 LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
index 9df5c07b4b..ed33a14127 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -14,11 +14,27 @@
 #include "postgres.h"
 
 #include "access/xlog.h"
-#include "fmgr.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
 #include "utils/pg_lsn.h"
 
 PG_MODULE_MAGIC;
 
+static int	read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+								  int reqLen, XLogRecPtr targetRecPtr,
+								  char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+							 bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+							  XLogRecPtr start_lsn,
+							  XLogRecPtr end_lsn);
+
 /*
  * SQL function to read WAL from WAL buffers. Returns number of bytes read.
  */
@@ -52,3 +68,251 @@ read_wal_from_buffers(PG_FUNCTION_ARGS)
 
 	PG_RETURN_INT32(read);
 }
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+					  int reqLen, XLogRecPtr targetRecPtr,
+					  char *cur_page)
+{
+	XLogRecPtr	read_upto,
+				loc;
+	TimeLineID	tli = GetWALInsertionTimeLine();
+	Size		count;
+	Size		read = 0;
+
+	loc = targetPagePtr + reqLen;
+
+	/* Loop waiting for xlog to be available if necessary */
+	while (1)
+	{
+		read_upto = GetXLogInsertRecPtr();
+
+		if (loc <= read_upto)
+			break;
+
+		WaitXLogInsertionsToFinish(loc);
+
+		CHECK_FOR_INTERRUPTS();
+		pg_usleep(1000L);
+	}
+
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
+		count = XLOG_BLCKSZ;
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
+		return -1;
+	}
+	else
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
+
+	/* read WAL from WAL buffers */
+	read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+	if (read != count)
+		ereport(ERROR,
+				errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+					   count, read));
+
+	return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	/*
+	 * Validate start and end LSNs coming from the function inputs.
+	 *
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (start_lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+	if (start_lsn > end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		if (errormsg)
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X: %s",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+		else
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+	}
+
+	return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+				 bool *nulls, uint32 ncols)
+{
+	const char *record_type;
+	RmgrData	desc;
+	uint32		fpi_len = 0;
+	StringInfoData rec_desc;
+	StringInfoData rec_blk_ref;
+	int			i = 0;
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+
+	if (XLogRecHasAnyBlockRefs(record))
+	{
+		initStringInfo(&rec_blk_ref);
+		XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	}
+
+	values[i++] = LSNGetDatum(record->ReadRecPtr);
+	values[i++] = LSNGetDatum(record->EndRecPtr);
+	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+	values[i++] = CStringGetTextDatum(desc.rm_name);
+	values[i++] = CStringGetTextDatum(record_type);
+	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+	values[i++] = UInt32GetDatum(fpi_len);
+
+	if (rec_desc.len > 0)
+		values[i++] = CStringGetTextDatum(rec_desc.data);
+	else
+		nulls[i++] = true;
+
+	if (XLogRecHasAnyBlockRefs(record))
+		values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	else
+		nulls[i++] = true;
+
+	Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+				  XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+	XLogReaderState *xlogreader;
+	XLogRecPtr	first_valid_record;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext old_cxt;
+	MemoryContext tmp_cxt;
+
+	Assert(start_lsn <= end_lsn);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_from_wal_buffers,
+											   .segment_open = NULL,
+											   .segment_close = NULL),
+									NULL);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(LOG,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+		return;
+	}
+
+	tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									"GetWALRecordsInfo temporary cxt",
+									ALLOCSET_DEFAULT_SIZES);
+
+	while (ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+		bool		nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+		/* Use the tmp context so we can clean up after each tuple is done */
+		old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+		GetWALRecordInfo(xlogreader, values, nulls,
+						 GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+
+		/* clean up and switch back */
+		MemoryContextSwitchTo(old_cxt);
+		MemoryContextReset(tmp_cxt);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextDelete(tmp_cxt);
+	XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
index 2360ff1171..15ef550c8c 100644
--- a/src/test/modules/read_wal_from_buffers/t/001_basic.pl
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -71,4 +71,41 @@ for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 }
 ok($result, 'waited until WAL is successfully read from WAL buffers');
 
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+	$node->safe_psql('postgres',
+		"CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+	my $start_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	my $tbl_oid = $node->safe_psql('postgres',
+		"SELECT oid FROM pg_class WHERE relname = 'foo';");
+	$node->safe_psql('postgres',
+		"INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+	my $end_lsn =
+	  $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+	$node->safe_psql('postgres',
+		"CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+	my $res = $node->safe_psql(
+		'postgres',
+		"SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+					WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+						resource_manager = 'Heap' AND
+						record_type = 'INSERT';");
+
+	if ($res eq 10)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result,
+	'waited until we get info of WAL records available in WAL buffers.');
+
 done_testing();
-- 
2.34.1

