Hi,
The discussion of this $subject is actually started in the 'Confine
vacuum skip logic to lazy_scan_skip' [1] thread and it already has a
commitfest entry [2]. I wanted to create seperate thread to make
discussion easier.
Latest patches need rebase, so I took patches in the [3]; rebased them
on top of the current master. Also there were new read_stream_* calls
so I updated them too.
0001 LGTM and one comment about 0002:
The read_stream_put_value() macro doesn't accept literal constant
values, we need to pass a variable to it. Otherwise, the compilation
fails with:
```
../../postgres/src/include/storage/read_stream.h:169:36: error: lvalue
required as unary ‘&’ operand
169 | memcpy((per_buffer_data), &(value), sizeof(value)))
| ^
../../postgres/src/backend/access/heap/vacuumlazy.c:1703:17: note: in
expansion of macro ‘read_stream_put_value’
1703 | read_stream_put_value(stream, per_buffer_data, false);
```
If that is not intentional, I think it would be better if we can
convert read_stream_put_value() to a way that it accepts rvalues.
[1]
https://postgr.es/m/CA%2BhUKG%2BSWMtu9D1eevnbdzf%3DvurfuDjdFVM5WnX28Fxp-H3mYg%40mail.gmail.com
[2] https://commitfest.postgresql.org/patch/5617/
[3]
https://postgr.es/m/CA%2BhUKGLa7ba7USyT%2BJR7uRiawWeCVJ96wyRsoEXk7r2gngPv%3DA%40mail.gmail.com
--
Regards,
Nazir Bilal Yavuz
Microsoft
From e4363ce317040a810261befc1c1c047110148e50 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <[email protected]>
Date: Fri, 27 Mar 2026 11:48:17 +0300
Subject: [PATCH v3 1/2] Improve API for retrieving data from read streams.
Dealing with the per_buffer_data argument to read_stream_next_buffer()
has proven a bit clunky. Provide some new wrapper functions/macros:
buffer = read_stream_get_buffer(rs);
buffer = read_stream_get_buffer_and_value(rs, &my_int);
buffer = read_stream_get_buffer_and_pointer(rs, &my_pointer_to_int);
These improve readability and type safety via assertions.
---
contrib/amcheck/verify_heapam.c | 2 +-
contrib/bloom/blscan.c | 4 +-
contrib/bloom/blvacuum.c | 8 ++--
contrib/pg_prewarm/autoprewarm.c | 2 +-
contrib/pg_prewarm/pg_prewarm.c | 4 +-
contrib/pg_visibility/pg_visibility.c | 6 +--
contrib/pgstattuple/pgstatapprox.c | 2 +-
contrib/pgstattuple/pgstatindex.c | 8 ++--
src/backend/access/brin/brin.c | 2 +-
src/backend/access/gin/ginvacuum.c | 4 +-
src/backend/access/gist/gistvacuum.c | 2 +-
src/backend/access/hash/hash.c | 4 +-
src/backend/access/heap/heapam.c | 2 +-
src/backend/access/heap/heapam_handler.c | 2 +-
src/backend/access/heap/vacuumlazy.c | 6 +--
src/backend/access/nbtree/nbtree.c | 2 +-
src/backend/access/spgist/spgvacuum.c | 2 +-
src/backend/storage/aio/read_stream.c | 12 ++++++
src/backend/storage/buffer/bufmgr.c | 4 +-
src/include/storage/read_stream.h | 55 ++++++++++++++++++++++++
20 files changed, 99 insertions(+), 34 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 20ff58aa782..4ab50522a08 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -479,7 +479,7 @@ verify_heapam(PG_FUNCTION_ARGS)
stream_data,
0);
- while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ while ((ctx.buffer = read_stream_get_buffer(stream)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c
index 1a0e42021ec..aeacce5e484 100644
--- a/contrib/bloom/blscan.c
+++ b/contrib/bloom/blscan.c
@@ -145,7 +145,7 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
Buffer buffer;
Page page;
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
@@ -182,7 +182,7 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
CHECK_FOR_INTERRUPTS();
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
FreeAccessStrategy(bas);
diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c
index 6beb1c20ebb..d934abcdfbc 100644
--- a/contrib/bloom/blvacuum.c
+++ b/contrib/bloom/blvacuum.c
@@ -81,7 +81,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
vacuum_delay_point(false);
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
gxlogState = GenericXLogStart(index);
@@ -154,7 +154,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
UnlockReleaseBuffer(buffer);
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
/*
@@ -233,7 +233,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
vacuum_delay_point(false);
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
@@ -250,7 +250,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
UnlockReleaseBuffer(buffer);
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
IndexFreeSpaceMapVacuum(info->index);
diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index ba0bc8e6d4a..967048dcba4 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -635,7 +635,7 @@ autoprewarm_database_main(Datum main_arg)
* read stream callback will check that we still have free buffers
* before requesting each block from the read stream API.
*/
- while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ while ((buf = read_stream_get_buffer(stream)) != InvalidBuffer)
{
apw_state->prewarmed_blocks++;
ReleaseBuffer(buf);
diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index c2716086693..ba07fd3eeb8 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -263,11 +263,11 @@ pg_prewarm(PG_FUNCTION_ARGS)
Buffer buf;
CHECK_FOR_INTERRUPTS();
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
ReleaseBuffer(buf);
++blocks_done;
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
}
diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index dfab0b64cf5..e50e3b65894 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -546,7 +546,7 @@ collect_visibility_data(Oid relid, bool include_pd)
Buffer buffer;
Page page;
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
@@ -559,7 +559,7 @@ collect_visibility_data(Oid relid, bool include_pd)
if (include_pd)
{
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
}
@@ -742,7 +742,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
0);
/* Loop over every block in the relation. */
- while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ while ((buffer = read_stream_get_buffer(stream)) != InvalidBuffer)
{
bool check_frozen = all_frozen;
bool check_visible = all_visible;
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index 21e0b50fb4b..31e3c266d73 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -156,7 +156,7 @@ statapprox_heap(Relation rel, output_type *stat)
maxoff;
BlockNumber blkno;
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
if (buf == InvalidBuffer)
break;
diff --git a/contrib/pgstattuple/pgstatindex.c b/contrib/pgstattuple/pgstatindex.c
index 3a3f2637bd9..f7ca4c9c30b 100644
--- a/contrib/pgstattuple/pgstatindex.c
+++ b/contrib/pgstattuple/pgstatindex.c
@@ -306,7 +306,7 @@ pgstatindex_impl(Relation rel, FunctionCallInfo fcinfo)
CHECK_FOR_INTERRUPTS();
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
@@ -345,7 +345,7 @@ pgstatindex_impl(Relation rel, FunctionCallInfo fcinfo)
UnlockReleaseBuffer(buffer);
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
relation_close(rel, AccessShareLock);
@@ -694,7 +694,7 @@ pgstathashindex(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS();
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
@@ -739,7 +739,7 @@ pgstathashindex(PG_FUNCTION_ARGS)
UnlockReleaseBuffer(buf);
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
/* Done accessing the index */
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 2a0f8c8e3b8..941f1f92025 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2198,7 +2198,7 @@ brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
* Scan the index in physical order, and clean up any possible mess in
* each page.
*/
- while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ while ((buf = read_stream_get_buffer(stream)) != InvalidBuffer)
{
CHECK_FOR_INTERRUPTS();
diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c
index 840543eb664..a6d3500d7ec 100644
--- a/src/backend/access/gin/ginvacuum.c
+++ b/src/backend/access/gin/ginvacuum.c
@@ -808,7 +808,7 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
vacuum_delay_point(false);
- buffer = read_stream_next_buffer(stream, NULL);
+ buffer = read_stream_get_buffer(stream);
LockBuffer(buffer, GIN_SHARE);
page = BufferGetPage(buffer);
@@ -834,7 +834,7 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
UnlockReleaseBuffer(buffer);
}
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
/* Update the metapage with accurate page and entry counts */
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 686a0418054..b4517342122 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -247,7 +247,7 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
/* call vacuum_delay_point while not holding any buffer lock */
vacuum_delay_point(false);
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
if (!BufferIsValid(buf))
break;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 8d8cd30dc38..c8d9cc9517b 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -569,7 +569,7 @@ bucket_loop:
* We need to acquire a cleanup lock on the primary bucket page to out
* wait concurrent scans before deleting the dead tuples.
*/
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
Assert(BufferIsValid(buf));
LockBufferForCleanup(buf);
_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
@@ -653,7 +653,7 @@ bucket_loop:
}
/* Stream should be exhausted since we processed all buckets */
- Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(stream) == InvalidBuffer);
read_stream_end(stream);
/* Okay, we're really done. Update tuple count in metapage. */
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 044f385e477..77d759fbba0 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -736,7 +736,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
scan->rs_dir = dir;
- scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+ scan->rs_cbuf = read_stream_get_buffer(scan->rs_read_stream);
if (BufferIsValid(scan->rs_cbuf))
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
}
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d40878928e1..7229dc3bd51 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1038,7 +1038,7 @@ heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
* re-acquire sharelock for each tuple, but since we aren't doing much
* work per tuple, the extra lock traffic is probably better avoided.
*/
- hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
+ hscan->rs_cbuf = read_stream_get_buffer(stream);
if (!BufferIsValid(hscan->rs_cbuf))
return false;
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index f698c2d899b..df4ef84f6ff 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1313,7 +1313,6 @@ lazy_scan_heap(LVRelState *vacrel)
bool was_eager_scanned = false;
int ndeleted = 0;
bool has_lpdead_items;
- void *per_buffer_data = NULL;
bool vm_page_frozen = false;
bool got_cleanup_lock = false;
@@ -1373,13 +1372,12 @@ lazy_scan_heap(LVRelState *vacrel)
PROGRESS_VACUUM_PHASE_SCAN_HEAP);
}
- buf = read_stream_next_buffer(stream, &per_buffer_data);
+ buf = read_stream_get_buffer_and_value(stream, &was_eager_scanned);
/* The relation is exhausted. */
if (!BufferIsValid(buf))
break;
- was_eager_scanned = *((bool *) per_buffer_data);
CheckBufferIsPinnedOnce(buf);
page = BufferGetPage(buf);
blkno = BufferGetBlockNumber(buf);
@@ -2677,7 +2675,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
vacuum_delay_point(false);
- buf = read_stream_next_buffer(stream, (void **) &iter_result);
+ buf = read_stream_get_buffer_and_pointer(stream, &iter_result);
/* The relation is exhausted */
if (!BufferIsValid(buf))
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 6d870e4ebe7..9bff4a99054 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -1355,7 +1355,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
/* call vacuum_delay_point while not holding any buffer lock */
vacuum_delay_point(false);
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
if (!BufferIsValid(buf))
break;
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index c461f8dc02d..97423400b26 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -868,7 +868,7 @@ spgvacuumscan(spgBulkDeleteState *bds)
/* call vacuum_delay_point while not holding any buffer lock */
vacuum_delay_point(false);
- buf = read_stream_next_buffer(stream, NULL);
+ buf = read_stream_get_buffer(stream);
if (!BufferIsValid(buf))
break;
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index cd54c1a74ac..bb2b1b54338 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -788,6 +788,9 @@ read_stream_begin_smgr_relation(int flags,
* valid until the next call to read_stream_next_buffer(). When the stream
* runs out of data, InvalidBuffer is returned. The caller may decide to end
* the stream early at any time by calling read_stream_end().
+ *
+ * See read_stream.h for read_stream_get_buffer() and variants that provide
+ * some degree of type safety for the per_buffer_data argument.
*/
Buffer
read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
@@ -1060,6 +1063,15 @@ read_stream_resume(ReadStream *stream)
stream->distance = stream->resume_distance;
}
+/*
+ * Return the configured per-buffer data size, for use in assertions.
+ */
+size_t
+read_stream_per_buffer_data_size(ReadStream *stream)
+{
+ return stream->per_buffer_data_size;
+}
+
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e212f6110f2..b11d50891a8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5356,7 +5356,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
CHECK_FOR_INTERRUPTS();
/* Read block from source relation. */
- srcBuf = read_stream_next_buffer(src_stream, NULL);
+ srcBuf = read_stream_get_buffer(src_stream);
LockBuffer(srcBuf, BUFFER_LOCK_SHARE);
srcPage = BufferGetPage(srcBuf);
@@ -5381,7 +5381,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
UnlockReleaseBuffer(dstBuf);
UnlockReleaseBuffer(srcBuf);
}
- Assert(read_stream_next_buffer(src_stream, NULL) == InvalidBuffer);
+ Assert(read_stream_get_buffer(src_stream) == InvalidBuffer);
read_stream_end(src_stream);
FreeAccessStrategy(bstrategy_src);
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index c9359b29b0f..b86450d5b39 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -91,6 +91,7 @@ extern ReadStream *read_stream_begin_relation(int flags,
extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data);
extern BlockNumber read_stream_next_block(ReadStream *stream,
BufferAccessStrategy *strategy);
+extern size_t read_stream_per_buffer_data_size(ReadStream *stream);
extern ReadStream *read_stream_begin_smgr_relation(int flags,
BufferAccessStrategy strategy,
SMgrRelation smgr,
@@ -104,4 +105,58 @@ extern void read_stream_resume(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
+/*
+ * Get the next buffer from a stream that is not using per-buffer data.
+ */
+static inline Buffer
+read_stream_get_buffer(ReadStream *stream)
+{
+ Assert(read_stream_per_buffer_data_size(stream) == 0);
+ return read_stream_next_buffer(stream, NULL);
+}
+
+/*
+ * Helper for read_stream_get_buffer_and_value().
+ */
+static inline Buffer
+read_stream_get_buffer_and_value_with_size(ReadStream *stream,
+ void *output_data,
+ size_t output_data_size)
+{
+ Buffer buffer;
+ void *per_buffer_data;
+
+ Assert(read_stream_per_buffer_data_size(stream) == output_data_size);
+ buffer = read_stream_next_buffer(stream, &per_buffer_data);
+ if (buffer != InvalidBuffer)
+ memcpy(output_data, per_buffer_data, output_data_size);
+
+ return buffer;
+}
+
+/*
+ * Get the next buffer and a copy of the associated per-buffer data.
+ * InvalidBuffer means end-of-stream, and in that case the per-buffer data is
+ * undefined. Example of use:
+ *
+ * int my_int;
+ *
+ * buf = read_stream_get_buffer_and_value(stream, &my_int);
+ */
+#define read_stream_get_buffer_and_value(stream, vp) \
+ read_stream_get_buffer_and_value_with_size((stream), (vp), sizeof(*(vp)))
+
+/*
+ * Get the next buffer and a pointer to the associated per-buffer data. This
+ * avoids casts in the calling code, and asserts that we received a pointer to
+ * a pointer to a type that doesn't exceed the storage size. For example:
+ *
+ * int *my_int_p;
+ *
+ * buf = read_stream_get_buffer_and_pointer(stream, &my_int_p);
+ */
+#define read_stream_get_buffer_and_pointer(stream, pointer) \
+ (AssertMacro(sizeof(**(pointer)) <= read_stream_per_buffer_data_size(stream)), \
+ read_stream_next_buffer((stream), ((void **) (pointer))))
+
#endif /* READ_STREAM_H */
--
2.47.3
From 25211563d5f3cf90fd9dfe9f06a0890d782ff981 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <[email protected]>
Date: Fri, 27 Mar 2026 14:54:40 +0300
Subject: [PATCH v3 2/2] Improve API for storing data in read streams.
Read stream callbacks receive a void pointer into the per-buffer data
queue so that can store data there for later retrieval by the buffer
consumer. We can improve readability and safety a bit by changing
cast-and-assign or raw memcpy() to:
read_stream_put_value(stream, per_buffer_data, my_int);
This form infers the size and asserts that the storage space matches,
generally mirroring the read_stream_get_buffer_and_value() call used for
retrieving the streamed data later.
---
src/backend/access/heap/heapam_handler.c | 9 ++-------
src/backend/access/heap/vacuumlazy.c | 9 ++++++---
src/include/storage/read_stream.h | 9 +++++++++
3 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 7229dc3bd51..ead3ad3ecb3 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2490,7 +2490,6 @@ BitmapHeapScanNextBlock(TableScanDesc scan,
BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) scan;
HeapScanDesc hscan = (HeapScanDesc) bscan;
BlockNumber block;
- void *per_buffer_data;
Buffer buffer;
Snapshot snapshot;
int ntup;
@@ -2511,8 +2510,7 @@ BitmapHeapScanNextBlock(TableScanDesc scan,
hscan->rs_cbuf = InvalidBuffer;
}
- hscan->rs_cbuf = read_stream_next_buffer(hscan->rs_read_stream,
- &per_buffer_data);
+ hscan->rs_cbuf = read_stream_get_buffer_and_pointer(hscan->rs_read_stream, &tbmres);
if (BufferIsInvalid(hscan->rs_cbuf))
{
@@ -2520,10 +2518,7 @@ BitmapHeapScanNextBlock(TableScanDesc scan,
return false;
}
- Assert(per_buffer_data);
-
- tbmres = per_buffer_data;
-
+ Assert(tbmres);
Assert(BlockNumberIsValid(tbmres->blockno));
Assert(BufferGetBlockNumber(hscan->rs_cbuf) == tbmres->blockno);
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index df4ef84f6ff..cda3247cc69 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1693,6 +1693,9 @@ heap_vac_scan_next_block(ReadStream *stream,
/* Now we must be in one of the two remaining states: */
if (next_block < vacrel->next_unskippable_block)
{
+ /* read_stream_put_value() requires an lvalue, not a literal */
+ bool temp = false;
+
/*
* 2. We are processing a range of blocks that we could have skipped
* but chose not to. We know that they are all-visible in the VM,
@@ -1700,7 +1703,7 @@ heap_vac_scan_next_block(ReadStream *stream,
*/
vacrel->current_block = next_block;
/* Block was not eager scanned */
- *((bool *) per_buffer_data) = false;
+ read_stream_put_value(stream, per_buffer_data, temp);
return vacrel->current_block;
}
else
@@ -1712,7 +1715,7 @@ heap_vac_scan_next_block(ReadStream *stream,
Assert(next_block == vacrel->next_unskippable_block);
vacrel->current_block = next_block;
- *((bool *) per_buffer_data) = vacrel->next_unskippable_eager_scanned;
+ read_stream_put_value(stream, per_buffer_data, vacrel->next_unskippable_eager_scanned);
return vacrel->current_block;
}
}
@@ -2600,7 +2603,7 @@ vacuum_reap_lp_read_stream_next(ReadStream *stream,
* Save the TidStoreIterResult for later, so we can extract the offsets.
* It is safe to copy the result, according to TidStoreIterateNext().
*/
- memcpy(per_buffer_data, iter_result, sizeof(*iter_result));
+ read_stream_put_value(stream, per_buffer_data, *iter_result);
return iter_result->blkno;
}
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index b86450d5b39..fb3a47e74cc 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -159,4 +159,13 @@ read_stream_get_buffer_and_value_with_size(ReadStream *stream,
(AssertMacro(sizeof(**(pointer)) <= read_stream_per_buffer_data_size(stream)), \
read_stream_next_buffer((stream), ((void **) (pointer))))
+/*
+ * Set the per-buffer data by value. This can be called from inside a
+ * callback that is returning block numbers. It asserts that the value's size
+ * matches the available space.
+ */
+#define read_stream_put_value(stream, per_buffer_data, value) \
+ (AssertMacro(sizeof(value) == read_stream_per_buffer_data_size(stream)), \
+ memcpy((per_buffer_data), &(value), sizeof(value)))
+
#endif /* READ_STREAM_H */
--
2.47.3