Hi,

On 2026-03-18 16:16:14 -0400, Andres Freund wrote:
> > I don't think we'll be able to easily test READ_BUFFER_ALREADY_DONE
> > (though perhaps we aren't testing it for shared buffers either?).
>
> We do reach the READ_BUFFER_ALREADY_DONE in PrepareHeadBufferReadIO(), but
> only due to io_method=sync peculiarities (as that only actually performs the
> IO later when waiting, it's easy to have two IOs for the same block).
>
>
> It's probably worth adding tests for that, although I suspect it should be in
> 001_aio.pl - no read stream required to hit it.  I can give it a shot, if you
> want?

I started writing some tests and realized that these commits had made that
somewhat harder - by not using StartBufferIO() anymore, the existing test
infrastructure for StartBufferIO() (in test_aio) became less useful.  In
effect it actually reduced coverage some.

Thinking about it more, I also got worried about the duplicating of
logic. It's perhaps acceptable with the patches as-is, but we'll soon need
something very similar for AIO writes. Then we'd end up with like 5 variants,
because we'd still need the existing StartBufferIO() for some cases where we
do want to wait (e.g. the edge case in ExtendBufferedRelShared()).

In the attached prototype I replaced your patch introducing
PrepareHeadBufferReadIO()/ PrepareAdditionalBufferReadIO() with one that
instead revises StartBufferIO() to have the enum return value you introduced
and a PgAioWaitRef * argument that callers that would like to asynchronously
wait for the IO to complete (others pass NULL). There are some other cleanups
in it too, see the commit message for more details.

Your final patch doesn't change a whole lot due to this, instead of calling
PrepareHeadBufferReadIO() it now calls StartBufferIO(wait=true,
&operation->io_wref) and instead of PrepareAdditionalBufferReadIO() it does
StartBufferIO(wait=false, NULL).  That means it has to set
operation->foreign_io to true itself, but that seems not problematic.


As part of this I replaced the following comment:
+    /*
+     * Submit any staged IO before checking for in-progress IO. Without this,
+     * the wref check below could find IO that this backend staged but hasn't
+     * submitted yet. Waiting on that would PANIC because the owner can't wait
+     * on its own staged IO.
+     */
+    pgaio_submit_staged();

as I am pretty sure that can't be reached. I added an assert + explanation.



I also updated "Restructure AsyncReadBuffers()" to move
pgstat_prepare_report_checksum_failure() and the computation of flags to
before the ReadBuffersCanStartIO().  And added a comment explaining why little
should be added between the ReadBuffersCanStartIO() calls.


Thoughts?


I still want to expand the tests a bit, but I thought that we should resolve
this structural issue first.


Greetings,

Andres Freund
>From 0f675832ceecc1da99fdd9ec772127370a3cdd3c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 9 Sep 2025 10:14:34 -0400
Subject: [PATCH v7a 1/8] aio: Refactor tests in preparation for more tests

In a future commit more AIO related tests are due to be introduced. However
001_aio.pl already is fairly large.

This commit introduces a new TestAio package with helpers for writing AIO
related tests. Then it uses the new helpers to simplify the existing
001_aio.pl by iterating over all supported io_methods. This will be
particularly helpful because additional methods already have been submitted.

Additionally this commit splits out testing of initdb using a non-default
method into its own test. While that test is somewhat important, it's fairly
slow and doesn't break that often. For development velocity it's helpful for
001_aio.pl to be faster.

While particularly the latter could benefit from being its own commit, it
seems to introduce more back-and-forth than it's worth.

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
 src/test/modules/test_aio/meson.build     |   1 +
 src/test/modules/test_aio/t/001_aio.pl    | 145 +++++++---------------
 src/test/modules/test_aio/t/003_initdb.pl |  71 +++++++++++
 src/test/modules/test_aio/t/TestAio.pm    |  90 ++++++++++++++
 4 files changed, 206 insertions(+), 101 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/003_initdb.pl
 create mode 100644 src/test/modules/test_aio/t/TestAio.pm

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index fefa25bc5ab..18a797f3a3b 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -32,6 +32,7 @@ tests += {
     'tests': [
       't/001_aio.pl',
       't/002_io_workers.pl',
+      't/003_initdb.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 5c634ec3ca9..e18b2a2b8ae 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -7,53 +7,48 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+use FindBin;
+use lib $FindBin::RealBin;
 
-###
-# Test io_method=worker
-###
-my $node_worker = create_node('worker');
-$node_worker->start();
-
-test_generic('worker', $node_worker);
-SKIP:
-{
-	skip 'Injection points not supported by this build', 1
-	  unless $ENV{enable_injection_points} eq 'yes';
-	test_inject_worker('worker', $node_worker);
-}
+use TestAio;
 
-$node_worker->stop();
+my @methods = TestAio::supported_io_methods();
+my %nodes;
 
 
 ###
-# Test io_method=io_uring
+# Create and configure one instance for each io_method
 ###
 
-if (have_io_uring())
+foreach my $method (@methods)
 {
-	my $node_uring = create_node('io_uring');
-	$node_uring->start();
-	test_generic('io_uring', $node_uring);
-	$node_uring->stop();
-}
+	my $node = PostgreSQL::Test::Cluster->new($method);
 
+	$nodes{$method} = $node;
+	$node->init();
+	$node->append_conf('postgresql.conf', "io_method=$method");
+	TestAio::configure($node);
+}
 
-###
-# Test io_method=sync
-###
-
-my $node_sync = create_node('sync');
-
-# just to have one test not use the default auto-tuning
-
-$node_sync->append_conf(
+# Just to have one test not use the default auto-tuning
+$nodes{'sync'}->append_conf(
 	'postgresql.conf', qq(
-io_max_concurrency=4
+ io_max_concurrency=4
 ));
 
-$node_sync->start();
-test_generic('sync', $node_sync);
-$node_sync->stop();
+
+###
+# Execute the tests for each io_method
+###
+
+foreach my $method (@methods)
+{
+	my $node = $nodes{$method};
+
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
 
 done_testing();
 
@@ -62,71 +57,6 @@ done_testing();
 # Test Helpers
 ###
 
-sub create_node
-{
-	local $Test::Builder::Level = $Test::Builder::Level + 1;
-
-	my $io_method = shift;
-
-	my $node = PostgreSQL::Test::Cluster->new($io_method);
-
-	# Want to test initdb for each IO method, otherwise we could just reuse
-	# the cluster.
-	#
-	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
-	# options specified by ->extra, if somebody puts -c io_method=xyz in
-	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
-	# detect it.
-	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
-	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
-		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
-	{
-		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
-	}
-
-	$node->init(extra => [ '-c', "io_method=$io_method" ]);
-
-	$node->append_conf(
-		'postgresql.conf', qq(
-shared_preload_libraries=test_aio
-log_min_messages = 'DEBUG3'
-log_statement=all
-log_error_verbosity=default
-restart_after_crash=false
-temp_buffers=100
-));
-
-	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
-	# io_method, it'd override the setting persisted at initdb time. While
-	# using (and later verifying) the setting from initdb provides some
-	# verification of having used the io_method during initdb, it's probably
-	# not worth the complication of only appending if the variable is set in
-	# in TEMP_CONFIG.
-	$node->append_conf(
-		'postgresql.conf', qq(
-io_method=$io_method
-));
-
-	ok(1, "$io_method: initdb");
-
-	return $node;
-}
-
-sub have_io_uring
-{
-	# To detect if io_uring is supported, we look at the error message for
-	# assigning an invalid value to an enum GUC, which lists all the valid
-	# options. We need to use -C to deal with running as administrator on
-	# windows, the superuser check is omitted if -C is used.
-	my ($stdout, $stderr) =
-	  run_command [qw(postgres -C invalid -c io_method=invalid)];
-	die "can't determine supported io_method values"
-	  unless $stderr =~ m/Available values: ([^\.]+)\./;
-	my $methods = $1;
-	note "supported io_method values are: $methods";
-
-	return ($methods =~ m/io_uring/) ? 1 : 0;
-}
 
 sub psql_like
 {
@@ -1490,8 +1420,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 }
 
 
-# Run all tests that are supported for all io_methods
-sub test_generic
+# Run all tests that for the specified node / io_method
+sub test_io_method
 {
 	my $io_method = shift;
 	my $node = shift;
@@ -1526,10 +1456,23 @@ CHECKPOINT;
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
 
+	# generic injection tests
   SKIP:
 	{
 		skip 'Injection points not supported by this build', 1
 		  unless $ENV{enable_injection_points} eq 'yes';
 		test_inject($io_method, $node);
 	}
+
+	# worker specific injection tests
+	if ($io_method eq 'worker')
+	{
+	  SKIP:
+		{
+			skip 'Injection points not supported by this build', 1
+			  unless $ENV{enable_injection_points} eq 'yes';
+
+			test_inject_worker($io_method, $node);
+		}
+	}
 }
diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl
new file mode 100644
index 00000000000..c03ae58d00a
--- /dev/null
+++ b/src/test/modules/test_aio/t/003_initdb.pl
@@ -0,0 +1,71 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test initdb for each IO method. This is done separately from 001_aio.pl, as
+# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can
+# be iterated on more quickly.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	test_create_node($method);
+}
+
+done_testing();
+
+
+sub test_create_node
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $io_method = shift;
+
+	my $node = PostgreSQL::Test::Cluster->new($io_method);
+
+	# Want to test initdb for each IO method, otherwise we could just reuse
+	# the cluster.
+	#
+	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
+	# options specified by ->extra, if somebody puts -c io_method=xyz in
+	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
+	# detect it.
+	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
+	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
+		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
+	{
+		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
+	}
+
+	$node->init(extra => [ '-c', "io_method=$io_method" ]);
+
+	TestAio::configure($node);
+
+	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
+	# io_method, it'd override the setting persisted at initdb time. While
+	# using (and later verifying) the setting from initdb provides some
+	# verification of having used the io_method during initdb, it's probably
+	# not worth the complication of only appending if the variable is set in
+	# in TEMP_CONFIG.
+	$node->append_conf(
+		'postgresql.conf', qq(
+io_method=$io_method
+));
+
+	ok(1, "$io_method: initdb");
+
+	$node->start();
+	$node->stop();
+	ok(1, "$io_method: start & stop");
+
+	return $node;
+}
diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm
new file mode 100644
index 00000000000..5bc80a9b130
--- /dev/null
+++ b/src/test/modules/test_aio/t/TestAio.pm
@@ -0,0 +1,90 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+TestAio - helpers for writing AIO related tests
+
+=cut
+
+package TestAio;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item TestAio::supported_io_methods()
+
+Return an array of all the supported values for the io_method GUC
+
+=cut
+
+sub supported_io_methods()
+{
+	my @io_methods = ('worker');
+
+	push(@io_methods, "io_uring") if have_io_uring();
+
+	# Return sync last, as it will least commonly fail
+	push(@io_methods, "sync");
+
+	return @io_methods;
+}
+
+
+=item TestAio::configure()
+
+Prepare a cluster for AIO test
+
+=cut
+
+sub configure
+{
+	my $node = shift;
+
+	$node->append_conf(
+		'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+));
+
+}
+
+
+=pod
+
+=item TestAio::have_io_uring()
+
+Return if io_uring is supported
+
+=cut
+
+sub have_io_uring
+{
+	# To detect if io_uring is supported, we look at the error message for
+	# assigning an invalid value to an enum GUC, which lists all the valid
+	# options. We need to use -C to deal with running as administrator on
+	# windows, the superuser check is omitted if -C is used.
+	my ($stdout, $stderr) =
+	  run_command [qw(postgres -C invalid -c io_method=invalid)];
+	die "can't determine supported io_method values"
+	  unless $stderr =~ m/Available values: ([^\.]+)\./;
+	my $methods = $1;
+	note "supported io_method values are: $methods";
+
+	return ($methods =~ m/io_uring/) ? 1 : 0;
+}
+
+1;
-- 
2.53.0.1.gb2826b52eb

>From dd56873189f5e0c9677626adcae475b194d27da0 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 10 Sep 2025 14:00:02 -0400
Subject: [PATCH v7a 2/8] test_aio: Add read_stream test infrastructure & tests

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 286 +++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  24 +-
 src/test/modules/test_aio/test_aio.c          | 344 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 606 insertions(+), 50 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..17a68e35c1d
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,286 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+	'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', $method);
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(
+		qq/ SET io_combine_limit = 1; /);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(
+		qq/ SELECT evict_rel('largeish'); /);
+
+	# block 0 grows the distance enough that the stream will look ahead and try
+	# to start a pending read for block 2 (and later block 4) twice before
+	# returning any buffers.
+	$psql->query_safe(
+		qq/ SELECT * FROM read_stream_for_blocks('largeish',
+			ARRAY[0, 2, 2, 4, 4]); /);
+
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/ SELECT * FROM read_stream_for_blocks('largeish',
+			ARRAY[0, 2, 2, 4, 4]); /);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(
+		qq/ SELECT evict_rel('largeish'); /);
+	$psql->query_safe(
+		qq/ SELECT * FROM read_stream_for_blocks('largeish',
+			ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); /);
+	ok(1, "$io_method: stream accessing same block");
+
+	# Test repeated blocks with a temp table, using invalidate_rel_block()
+	# to evict individual local buffers.
+	$psql->query_safe(
+		qq/ CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+			INSERT INTO largeish_temp(k) SELECT generate_series(1, 200); /);
+
+	# Evict the specific blocks we'll request to force misses
+	$psql->query_safe(
+		qq/ SELECT invalidate_rel_block('largeish_temp', 0); /);
+	$psql->query_safe(
+		qq/ SELECT invalidate_rel_block('largeish_temp', 2); /);
+	$psql->query_safe(
+		qq/ SELECT invalidate_rel_block('largeish_temp', 4); /);
+
+	$psql->query_safe(
+		qq/ SELECT * FROM read_stream_for_blocks('largeish_temp',
+			ARRAY[0, 2, 2, 4, 4]); /);
+	ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+	# Now the blocks are cached, so repeated access should be hits
+	$psql->query_safe(
+		qq/ SELECT * FROM read_stream_for_blocks('largeish_temp',
+			ARRAY[0, 2, 2, 4, 4]); /);
+	ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(
+		qq/ SELECT evict_rel('largeish'); /);
+
+	$psql_b->query_safe(
+		qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+			relfilenode=>pg_relation_filenode('largeish')); /);
+
+	$psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/ SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait'; /,
+		'completion_wait');
+
+	# Block 5 is undergoing IO in session b, so session a will move on to start
+	# a new IO for block 7.
+	$psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1, qq/$io_method: read stream encounters succeeding IO by another backend/);
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(
+		qq/ SELECT evict_rel('largeish'); /);
+
+	$psql_b->query_safe(
+		qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+			relfilenode=>pg_relation_filenode('largeish')); /);
+
+	$psql_b->query_safe(
+		qq/ SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+			pid=>pg_backend_pid(),
+			relfilenode=>pg_relation_filenode('largeish')); /);
+
+	$psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq/ SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait'; /,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	pump_until(
+		$psql_b->{run}, $psql_b->{timeout},
+		\$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/);
+	ok(1, "$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend.
+	###
+	$psql_a->query_safe(
+		qq/ SELECT evict_rel('largeish'); /);
+
+	$psql_b->query_safe(
+		qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+			relfilenode=>pg_relation_filenode('largeish')); /);
+
+	$psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish',
+		blockno=>2, nblocks=>3);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq/ SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait'; /,
+		'completion_wait');
+
+	# Blocks 2 and 4 are undergoing IO initiated by session a
+	$psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres',
+		qq/ SELECT inj_io_completion_continue() /);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	is($node->safe_psql('postgres', 'SHOW io_method'),
+		$io_method, "$io_method: io_method set correctly");
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..1cc4734a746 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
 
 /*
  * Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..061f0c9f92a 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -19,17 +19,26 @@
 #include "postgres.h"
 
 #include "access/relation.h"
+#include "catalog/pg_type.h"
 #include "fmgr.h"
+#include "funcapi.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
+#include "utils/tuplestore.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -37,13 +46,30 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -85,10 +111,13 @@ test_aio_shmem_startup(void)
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
 
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -384,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -458,6 +487,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	rel;
+	int32		buffers_evicted,
+				buffers_flushed,
+				buffers_skipped;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+							&buffers_skipped);
+
+	relation_close(rel, AccessExclusiveLock);
+
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(invalidate_rel_block);
 Datum
 invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -610,6 +660,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks + 1);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -680,15 +810,98 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->short_read_pid != 0 &&
+		inj_io_error_state->short_read_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->completion_wait_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -697,58 +910,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
+				uint32		ok_part = new_result - processed;
 
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
 			}
-
-			ioh->result = new_result;
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -762,6 +973,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = 0;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -771,6 +1015,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4673eca9cd6..4d2e370804c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -305,6 +305,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.53.0.1.gb2826b52eb

>From 2725c524a3f057e6cd4f16fe9e2393bf6546932b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Mon, 16 Mar 2026 16:50:56 -0400
Subject: [PATCH v7a 3/8] Fix off-by-one error in read IO tracing

AsyncReadBuffer()'s no-IO needed path passed
TRACE_POSTGRESQL_BUFFER_READ_DONE the wrong block number because it had
already incremented operation->nblocks_done. Fix by folding the
nblocks_done offset into the blocknum local variable at initialization.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/u73un3xeljr4fiidzwi4ikcr6vm7oqugn4fo5vqpstjio6anl2%40hph6fvdiiria
---
 src/backend/storage/buffer/bufmgr.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 00bc609529a..10afae1990b 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1875,10 +1875,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 {
 	Buffer	   *buffers = &operation->buffers[0];
 	int			flags = operation->flags;
-	BlockNumber blocknum = operation->blocknum;
 	ForkNumber	forknum = operation->forknum;
 	char		persistence = operation->persistence;
 	int16		nblocks_done = operation->nblocks_done;
+	BlockNumber blocknum = operation->blocknum + nblocks_done;
 	Buffer	   *io_buffers = &operation->buffers[nblocks_done];
 	int			io_buffers_len = 0;
 	PgAioHandle *ioh;
@@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
+		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum,
 										  operation->smgr->smgr_rlocator.locator.spcOid,
 										  operation->smgr->smgr_rlocator.locator.dbOid,
 										  operation->smgr->smgr_rlocator.locator.relNumber,
@@ -2062,7 +2062,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 */
 		io_start = pgstat_prepare_io_time(track_io_timing);
 		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum + nblocks_done,
+					   blocknum,
 					   io_pages, io_buffers_len);
 		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
 								io_start, 1, io_buffers_len * BLCKSZ);
-- 
2.53.0.1.gb2826b52eb

>From eed59fd9260463c272b6ec74dfff365881451b62 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 17 Mar 2026 15:49:52 -0400
Subject: [PATCH v7a 4/8] Pass io_object and io_context through to
 PinBufferForBlock()

PinBufferForBlock() is always_inline and called in a loop in
StartReadBuffersImpl(). Previously it computed io_context and io_object
internally, which required calling IOContextForStrategy() -- a non-inline
function the compiler cannot prove is side-effect-free. This could
potential cause unneeded redundant function calls.

Compute io_context and io_object in the callers instead, allowing
StartReadBuffersImpl() to do so once before entering the loop.

Suggested-by: Andres Freund <[email protected]>
---
 src/backend/storage/buffer/bufmgr.c | 45 ++++++++++++++++++++---------
 1 file changed, 31 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 10afae1990b..ab9c2a4b904 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1223,11 +1223,11 @@ PinBufferForBlock(Relation rel,
 				  ForkNumber forkNum,
 				  BlockNumber blockNum,
 				  BufferAccessStrategy strategy,
+				  IOObject io_object,
+				  IOContext io_context,
 				  bool *foundPtr)
 {
 	BufferDesc *bufHdr;
-	IOContext	io_context;
-	IOObject	io_object;
 
 	Assert(blockNum != P_NEW);
 
@@ -1236,17 +1236,6 @@ PinBufferForBlock(Relation rel,
 			persistence == RELPERSISTENCE_PERMANENT ||
 			persistence == RELPERSISTENCE_UNLOGGED));
 
-	if (persistence == RELPERSISTENCE_TEMP)
-	{
-		io_context = IOCONTEXT_NORMAL;
-		io_object = IOOBJECT_TEMP_RELATION;
-	}
-	else
-	{
-		io_context = IOContextForStrategy(strategy);
-		io_object = IOOBJECT_RELATION;
-	}
-
 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
 									   smgr->smgr_rlocator.locator.spcOid,
 									   smgr->smgr_rlocator.locator.dbOid,
@@ -1339,9 +1328,23 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 				 mode == RBM_ZERO_AND_LOCK))
 	{
 		bool		found;
+		IOContext	io_context;
+		IOObject	io_object;
+
+		if (persistence == RELPERSISTENCE_TEMP)
+		{
+			io_context = IOCONTEXT_NORMAL;
+			io_object = IOOBJECT_TEMP_RELATION;
+		}
+		else
+		{
+			io_context = IOContextForStrategy(strategy);
+			io_object = IOOBJECT_RELATION;
+		}
 
 		buffer = PinBufferForBlock(rel, smgr, persistence,
-								   forkNum, blockNum, strategy, &found);
+								   forkNum, blockNum, strategy,
+								   io_object, io_context, &found);
 		ZeroAndLockBuffer(buffer, mode, found);
 		return buffer;
 	}
@@ -1379,11 +1382,24 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	int			actual_nblocks = *nblocks;
 	int			maxcombine = 0;
 	bool		did_start_io;
+	IOContext	io_context;
+	IOObject	io_object;
 
 	Assert(*nblocks == 1 || allow_forwarding);
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
 
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
 	for (int i = 0; i < actual_nblocks; ++i)
 	{
 		bool		found;
@@ -1432,6 +1448,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 										   operation->forknum,
 										   blockNum + i,
 										   operation->strategy,
+										   io_object, io_context,
 										   &found);
 		}
 
-- 
2.53.0.1.gb2826b52eb

>From afc829207d8f57a6f89da3bd9639ba98a74c42a3 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 13:54:02 -0500
Subject: [PATCH v7a 5/8] Make buffer hit helper

Already two places count buffer hits, requiring quite a few lines of
code since we do accounting in so many places. Future commits will add
more locations, so refactor into a helper.

Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv
---
 src/backend/storage/buffer/bufmgr.c | 84 ++++++++++++++---------------
 1 file changed, 42 insertions(+), 42 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ab9c2a4b904..fa85570a791 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -648,6 +648,11 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  bool *foundPtr, IOContext io_context);
 static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress);
 static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
+
+static pg_attribute_always_inline void TrackBufferHit(IOObject io_object,
+													  IOContext io_context,
+													  Relation rel, char persistence, SMgrRelation smgr,
+													  ForkNumber forknum, BlockNumber blocknum);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
@@ -1243,18 +1248,14 @@ PinBufferForBlock(Relation rel,
 									   smgr->smgr_rlocator.backend);
 
 	if (persistence == RELPERSISTENCE_TEMP)
-	{
 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
-		if (*foundPtr)
-			pgBufferUsage.local_blks_hit++;
-	}
 	else
-	{
 		bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
 							 strategy, foundPtr, io_context);
-		if (*foundPtr)
-			pgBufferUsage.shared_blks_hit++;
-	}
+
+	if (*foundPtr)
+		TrackBufferHit(io_object, io_context, rel, persistence, smgr, forkNum, blockNum);
+
 	if (rel)
 	{
 		/*
@@ -1263,21 +1264,6 @@ PinBufferForBlock(Relation rel,
 		 * zeroed instead), the per-relation stats always count them.
 		 */
 		pgstat_count_buffer_read(rel);
-		if (*foundPtr)
-			pgstat_count_buffer_hit(rel);
-	}
-	if (*foundPtr)
-	{
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
-
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  true);
 	}
 
 	return BufferDescriptorGetBuffer(bufHdr);
@@ -1712,6 +1698,37 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 	return ReadBuffersCanStartIOOnce(buffer, nowait);
 }
 
+/*
+ * We track various stats related to buffer hits. Because this is done in a
+ * few separate places, this helper exists for convenience.
+ */
+static pg_attribute_always_inline void
+TrackBufferHit(IOObject io_object, IOContext io_context,
+			   Relation rel, char persistence, SMgrRelation smgr,
+			   ForkNumber forknum, BlockNumber blocknum)
+{
+	TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum,
+									  blocknum,
+									  smgr->smgr_rlocator.locator.spcOid,
+									  smgr->smgr_rlocator.locator.dbOid,
+									  smgr->smgr_rlocator.locator.relNumber,
+									  smgr->smgr_rlocator.backend,
+									  true);
+
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_hit += 1;
+	else
+		pgBufferUsage.shared_blks_hit += 1;
+
+	pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageHit;
+
+	if (rel)
+		pgstat_count_buffer_hit(rel);
+}
+
 /*
  * Helper for WaitReadBuffers() that processes the results of a readv
  * operation, raising an error if necessary.
@@ -2007,25 +2024,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum,
-										  operation->smgr->smgr_rlocator.locator.spcOid,
-										  operation->smgr->smgr_rlocator.locator.dbOid,
-										  operation->smgr->smgr_rlocator.locator.relNumber,
-										  operation->smgr->smgr_rlocator.backend,
-										  true);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_hit += 1;
-		else
-			pgBufferUsage.shared_blks_hit += 1;
-
-		if (operation->rel)
-			pgstat_count_buffer_hit(operation->rel);
-
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
+		TrackBufferHit(io_object, io_context, operation->rel, persistence,
+					   operation->smgr, forknum, blocknum);
 	}
 	else
 	{
-- 
2.53.0.1.gb2826b52eb

>From cac33659de4600920b82fbe0e0cbfaa02c3d641f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 18 Mar 2026 11:09:58 -0400
Subject: [PATCH v7a 6/8] Restructure AsyncReadBuffers()

Restructure AsyncReadBuffers() to use early return when the head buffer
is already valid, instead of using a did_start_io flag and if/else
branches. Also move around a bit of the code to be located closer to
where it is used. This is a refactor only.
---
 src/backend/storage/buffer/bufmgr.c | 171 ++++++++++++++--------------
 1 file changed, 88 insertions(+), 83 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fa85570a791..e212f6110f2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1920,7 +1920,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	void	   *io_pages[MAX_IO_COMBINE_LIMIT];
 	IOContext	io_context;
 	IOObject	io_object;
-	bool		did_start_io;
+	instr_time	io_start;
+
+	if (persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
 
 	/*
 	 * When this IO is executed synchronously, either because the caller will
@@ -1931,16 +1942,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
 
 	if (persistence == RELPERSISTENCE_TEMP)
-	{
-		io_context = IOCONTEXT_NORMAL;
-		io_object = IOOBJECT_TEMP_RELATION;
 		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
-	}
-	else
-	{
-		io_context = IOContextForStrategy(operation->strategy);
-		io_object = IOOBJECT_RELATION;
-	}
 
 	/*
 	 * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
@@ -1992,7 +1994,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	if (unlikely(!ioh))
 	{
 		pgaio_submit_staged();
-
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
@@ -2017,91 +2018,95 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 		pgaio_io_release(ioh);
 		pgaio_wref_clear(&operation->io_wref);
-		did_start_io = false;
 
 		/*
 		 * Report and track this as a 'hit' for this backend, even though it
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TrackBufferHit(io_object, io_context, operation->rel, persistence,
-					   operation->smgr, forknum, blocknum);
+		TrackBufferHit(io_object, io_context,
+					   operation->rel, operation->persistence,
+					   operation->smgr, operation->forknum,
+					   blocknum);
+		return false;
 	}
-	else
+
+	Assert(io_buffers[0] == buffers[nblocks_done]);
+	io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+	io_buffers_len = 1;
+
+	/*
+	 * NB: As little code as possible should be added between the
+	 * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
+	 * below and the smgrstartreadv(), as some of the buffers are now marked
+	 * as IO_IN_PROGRESS and will thus cause other backends to wait.
+	 */
+
+	/*
+	 * How many neighboring-on-disk blocks can we scatter-read into other
+	 * buffers at the same time?  In this case we don't wait if we see an I/O
+	 * already in progress (see comment above).
+	 */
+	for (int i = nblocks_done + 1; i < operation->nblocks; i++)
 	{
-		instr_time	io_start;
+		/* Must be consecutive block numbers. */
+		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		/* We found a buffer that we need to read in. */
-		Assert(io_buffers[0] == buffers[nblocks_done]);
-		io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
-		io_buffers_len = 1;
+		if (!ReadBuffersCanStartIO(buffers[i], true))
+			break;
 
-		/*
-		 * How many neighboring-on-disk blocks can we scatter-read into other
-		 * buffers at the same time?  In this case we don't wait if we see an
-		 * I/O already in progress.  We already set BM_IO_IN_PROGRESS for the
-		 * head block, so we should get on with that I/O as soon as possible.
-		 */
-		for (int i = nblocks_done + 1; i < operation->nblocks; i++)
-		{
-			if (!ReadBuffersCanStartIO(buffers[i], true))
-				break;
-			/* Must be consecutive block numbers. */
-			Assert(BufferGetBlockNumber(buffers[i - 1]) ==
-				   BufferGetBlockNumber(buffers[i]) - 1);
-			Assert(io_buffers[io_buffers_len] == buffers[i]);
+		Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
-		}
-
-		/* get a reference to wait for in WaitReadBuffers() */
-		pgaio_io_get_wref(ioh, &operation->io_wref);
-
-		/* provide the list of buffers to the completion callbacks */
-		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
-
-		pgaio_io_register_callbacks(ioh,
-									persistence == RELPERSISTENCE_TEMP ?
-									PGAIO_HCB_LOCAL_BUFFER_READV :
-									PGAIO_HCB_SHARED_BUFFER_READV,
-									flags);
-
-		pgaio_io_set_flag(ioh, ioh_flags);
-
-		/* ---
-		 * Even though we're trying to issue IO asynchronously, track the time
-		 * in smgrstartreadv():
-		 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
-		 *   immediately
-		 * - the io method might not support the IO (e.g. worker IO for a temp
-		 *   table)
-		 * ---
-		 */
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum,
-					   io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
-								io_start, 1, io_buffers_len * BLCKSZ);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_read += io_buffers_len;
-		else
-			pgBufferUsage.shared_blks_read += io_buffers_len;
-
-		/*
-		 * Track vacuum cost when issuing IO, not after waiting for it.
-		 * Otherwise we could end up issuing a lot of IO in a short timespan,
-		 * despite a low cost limit.
-		 */
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
-
-		*nblocks_progress = io_buffers_len;
-		did_start_io = true;
+		io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 	}
 
-	return did_start_io;
+	/* get a reference to wait for in WaitReadBuffers() */
+	pgaio_io_get_wref(ioh, &operation->io_wref);
+
+	/* provide the list of buffers to the completion callbacks */
+	pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+
+	pgaio_io_register_callbacks(ioh,
+								persistence == RELPERSISTENCE_TEMP ?
+								PGAIO_HCB_LOCAL_BUFFER_READV :
+								PGAIO_HCB_SHARED_BUFFER_READV,
+								flags);
+
+	pgaio_io_set_flag(ioh, ioh_flags);
+
+	/* ---
+	 * Even though we're trying to issue IO asynchronously, track the time
+	 * in smgrstartreadv():
+	 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+	 *   immediately
+	 * - the io method might not support the IO (e.g. worker IO for a temp
+	 *   table)
+	 * ---
+	 */
+	io_start = pgstat_prepare_io_time(track_io_timing);
+	smgrstartreadv(ioh, operation->smgr, forknum,
+				   blocknum,
+				   io_pages, io_buffers_len);
+	pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+							io_start, 1, io_buffers_len * BLCKSZ);
+
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_read += io_buffers_len;
+	else
+		pgBufferUsage.shared_blks_read += io_buffers_len;
+
+	/*
+	 * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+	 * we could end up issuing a lot of IO in a short timespan, despite a low
+	 * cost limit.
+	 */
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+
+	*nblocks_progress = io_buffers_len;
+
+	return true;
 }
 
 /*
-- 
2.53.0.1.gb2826b52eb

>From bd50c31bc242ec7afcae66e0953a2cb874ab02e7 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Thu, 19 Mar 2026 16:30:38 -0400
Subject: [PATCH v7a 7/8] bufmgr: Improve StartBufferIO interface

Until now StartBufferIO() had a few weaknesses:

- As it did not submit staged IOs, it was not safe to call StartBufferIO()
  where there was a potential for unsubmitted IO, which required
  AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around
  StartBufferIO().

- With nowait = true, the boolean return value did not allow to distinguish
  between no IO being necessary and having to wait, which would lead
  ReadBuffersCanStartIO() to unnecessarily submit staged IO.

- Several callers needed to handle both local and shared buffers, requiring
  the caller to differentiate between StartBufferIO() and StartLocalBufferIO()

- In a future commit some callers of StartBufferIO() want the BufferDesc's
  io_wref to be returned, to asynchronously wait for in-progress IO

- Indicating whether to wait with the nowait parameter was somewhat confusing
  compared to a wait parameter

Address these issue as follows:

- StartBufferIO() is renamed to StartSharedBufferIO()

- A new StartBufferIO() is introduced that supports both shared and local
  buffers

- The boolean return value has been replaced with an enum, indicating whether
  the IO is already done, already in progress or that the buffer has been
  readied for IO

- A new PgAioWaitRef * argument allows the caller to get the wait reference is
  desired.  All current callers pass NULL, a user of this will be introduced
  subsequently

- Instead of the nowait argument there now is wait

  This probably would not have been worthwhile on its own, but since all these
  lines needed to be touched anyway...

Author: Andres Freund <[email protected]>
Author: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
 src/include/storage/buf_internals.h         |  24 +-
 src/backend/storage/buffer/bufmgr.c         | 286 ++++++++++++--------
 src/backend/storage/buffer/localbuf.c       |  43 ++-
 src/test/modules/test_aio/t/001_aio.pl      |  20 +-
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  17 +-
 src/tools/pgindent/typedefs.list            |   1 +
 7 files changed, 246 insertions(+), 147 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 8d1e16b5d51..ad1b7b2216a 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 
 extern void TrackNewBufferPin(Buffer buf);
 
-/* solely to make it easier to write tests */
-extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
+/*
+ * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO.
+ *
+ * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer
+ * may already have I/O in progress or the I/O may have been done by another
+ * backend.  See the documentation of StartSharedBufferIO for more details.
+ */
+typedef enum StartBufferIOResult
+{
+	BUFFER_IO_ALREADY_DONE,
+	BUFFER_IO_IN_PROGRESS,
+	BUFFER_IO_READY_FOR_IO,
+} StartBufferIOResult;
+
+/* the following are exposed to make it easier to write tests */
+extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait,
+										 PgAioWaitRef *io_wref);
+extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait,
+											   PgAioWaitRef *io_wref);
 extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 							  bool forget_owner, bool release_aio);
 
@@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
 								   uint64 set_flag_bits, bool release_aio);
-extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
+extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput,
+											  bool wait, PgAioWaitRef *io_wref);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e212f6110f2..5fc1ceccd0b 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 	BufferDesc *bufHdr;
 	bool		need_to_zero;
 	bool		isLocalBuf = BufferIsLocal(buffer);
+	StartBufferIOResult sbres;
 
 	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
 
@@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 		 */
 		need_to_zero = false;
 	}
-	else if (isLocalBuf)
-	{
-		/* Simple case for non-shared buffers. */
-		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-		need_to_zero = StartLocalBufferIO(bufHdr, true, false);
-	}
 	else
 	{
-		/*
-		 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
-		 * concurrently.  Even though we aren't doing I/O, that ensures that
-		 * we don't zero a page that someone else has pinned.  An exclusive
-		 * content lock wouldn't be enough, because readers are allowed to
-		 * drop the content lock after determining that a tuple is visible
-		 * (see buffer access rules in README).
-		 */
-		bufHdr = GetBufferDescriptor(buffer - 1);
-		need_to_zero = StartBufferIO(bufHdr, true, false);
+		if (isLocalBuf)
+		{
+			/* Simple case for non-shared buffers. */
+			bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+			sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
+		}
+		else
+		{
+			/*
+			 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
+			 * concurrently.  Even though we aren't doing I/O, that ensures
+			 * that we don't zero a page that someone else has pinned.  An
+			 * exclusive content lock wouldn't be enough, because readers are
+			 * allowed to drop the content lock after determining that a tuple
+			 * is visible (see buffer access rules in README).
+			 */
+			bufHdr = GetBufferDescriptor(buffer - 1);
+			sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
+		}
+
+		Assert(sbres != BUFFER_IO_IN_PROGRESS);
+		need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
 	}
 
 	if (need_to_zero)
@@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
-	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
-
-		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
-		 */
-		pgaio_submit_staged();
-	}
-
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	IOContext	io_context;
 	IOObject	io_object;
 	instr_time	io_start;
+	StartBufferIOResult status;
 
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
+	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
+	 * block, which we don't want after setting IO_IN_PROGRESS.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 * for the outcome: either done, or something went wrong and we will
 	 * retry.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	if (status != BUFFER_IO_READY_FOR_IO)
 	{
-		/*
-		 * Someone else has already completed this block, we're done.
-		 *
-		 * When IO is necessary, ->nblocks_done is updated in
-		 * ProcessReadBuffersResult(), but that is not called if no IO is
-		 * necessary. Thus update here.
-		 */
-		operation->nblocks_done += 1;
-		*nblocks_progress = 1;
-
 		pgaio_io_release(ioh);
-		pgaio_wref_clear(&operation->io_wref);
-
-		/*
-		 * Report and track this as a 'hit' for this backend, even though it
-		 * must have started out as a miss in PinBufferForBlock(). The other
-		 * backend will track this as a 'read'.
-		 */
-		TrackBufferHit(io_object, io_context,
-					   operation->rel, operation->persistence,
-					   operation->smgr, operation->forknum,
-					   blocknum);
-		return false;
+		*nblocks_progress = 1;
+		if (status == BUFFER_IO_ALREADY_DONE)
+		{
+			/*
+			 * Someone has already completed this block, we're done.
+			 *
+			 * When IO is necessary, ->nblocks_done is updated in
+			 * ProcessReadBuffersResult(), but that is not called if no IO is
+			 * necessary. Thus update here.
+			 */
+			operation->nblocks_done += 1;
+			Assert(operation->nblocks_done <= operation->nblocks);
+
+			Assert(!pgaio_wref_valid(&operation->io_wref));
+
+			/*
+			 * Report and track this as a 'hit' for this backend, even though
+			 * it must have started out as a miss in PinBufferForBlock(). The
+			 * other backend will track this as a 'read'.
+			 */
+			TrackBufferHit(io_object, io_context,
+						   operation->rel, operation->persistence,
+						   operation->smgr, operation->forknum,
+						   blocknum);
+			return false;
+		}
+
+		/* The IO is already in-progress */
+		Assert(status == BUFFER_IO_IN_PROGRESS);
+
+		return true;
 	}
 
 	Assert(io_buffers[0] == buffers[nblocks_done]);
@@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 	/*
 	 * NB: As little code as possible should be added between the
-	 * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
-	 * below and the smgrstartreadv(), as some of the buffers are now marked
-	 * as IO_IN_PROGRESS and will thus cause other backends to wait.
+	 * StartBufferIO() above, the further StartBufferIO()s below and the
+	 * smgrstartreadv(), as some of the buffers are now marked as
+	 * IO_IN_PROGRESS and will thus cause other backends to wait.
 	 */
 
 	/*
@@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
 			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		if (!ReadBuffersCanStartIO(buffers[i], true))
+		status = StartBufferIO(buffers[nblocks_done], true, false, NULL);
+		if (status != BUFFER_IO_READY_FOR_IO)
 			break;
 
 		Assert(io_buffers[io_buffers_len] == buffers[i]);
@@ -2893,16 +2873,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			 * We *must* do smgr[zero]extend before succeeding, else the page
 			 * will not be reserved by the kernel, and the next P_NEW call
 			 * will decide to return the same page.  Clear the BM_VALID bit,
-			 * do StartBufferIO() and proceed.
+			 * do StartSharedBufferIO() and proceed.
 			 *
 			 * Loop to handle the very small possibility that someone re-sets
 			 * BM_VALID between our clearing it and StartBufferIO inspecting
 			 * it.
 			 */
-			do
+			while (true)
 			{
+				StartBufferIOResult sbres;
+
 				pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
-			} while (!StartBufferIO(existing_hdr, true, false));
+
+				sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
+
+				if (sbres != BUFFER_IO_ALREADY_DONE)
+					break;
+			}
 		}
 		else
 		{
@@ -2928,7 +2915,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true, false);
+			StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -4450,7 +4437,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false, false))
+	if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -7029,6 +7016,13 @@ WaitIO(BufferDesc *buf)
 {
 	ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
 
+	/*
+	 * Should never end up here with unsubmitted IO, as no AIO unaware code
+	 * may be used while in batch mode and AIO aware code needs to have
+	 * submitted all staged IO to avoid deadlocks & slowness.
+	 */
+	Assert(!pgaio_have_staged());
+
 	ConditionVariablePrepareToSleep(cv);
 	for (;;)
 	{
@@ -7081,30 +7075,37 @@ WaitIO(BufferDesc *buf)
 }
 
 /*
- * StartBufferIO: begin I/O on this buffer
+ * StartSharedBufferIO: begin I/O on this buffer
  *	(Assumptions)
- *	My process is executing no IO on this buffer
  *	The buffer is Pinned
  *
- * In some scenarios multiple backends could attempt the same I/O operation
- * concurrently.  If someone else has already started I/O on this buffer then
- * we will wait for completion of the IO using WaitIO().
+ * In several scenarios the buffer may already be undergoing I/O in this or
+ * another backend. This is handled as follows:
  *
- * Input operations are only attempted on buffers that are not BM_VALID,
- * and output operations only on buffers that are BM_VALID and BM_DIRTY,
- * so we can always tell if the work is already done.
+ * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
+ *   reference, the *io_wref is set to the buffer's io_wref and
+ *   BUFFER_IO_IN_PROGRESS is returned.
  *
- * Returns true if we successfully marked the buffer as I/O busy,
- * false if someone else already did the work.
+ * - If the caller passes a NULL io_wref (i.e. the caller does not want to
+ *   asynchronously wait for the completion of the IO), wait = false and the
+ *   buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned
  *
- * If nowait is true, then we don't wait for an I/O to be finished by another
- * backend.  In that case, false indicates either that the I/O was already
- * finished, or is still in progress.  This is useful for callers that want to
- * find out if they can perform the I/O as part of a larger operation, without
- * waiting for the answer or distinguishing the reasons why not.
+ * - If wait = true and either the buffer does not have a wait reference,
+ *   or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
+ *   to complete.  To avoid the potential of deadlocks and unnecessary delays,
+ *   all staged I/O is submitted before waiting.
+ *
+ *
+ * Input operations are only attempted on buffers that are not BM_VALID, and
+ * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
+ * always tell if the work is already done.  If no I/O is necessary,
+ * BUFFER_IO_ALREADY_DONE is returned.
+ *
+ * If we successfully marked the buffer as BM_IO_IN_PROGRESS,
+ * BUFFER_IO_READY_FOR_IO is returned.
  */
-bool
-StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
+StartBufferIOResult
+StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
@@ -7116,10 +7117,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
-		UnlockBufHdr(buf);
-		if (nowait)
-			return false;
-		WaitIO(buf);
+
+		/* Join the existing IO */
+		if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
+		{
+			*io_wref = buf->io_wref;
+			UnlockBufHdr(buf);
+
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else if (!wait)
+		{
+			UnlockBufHdr(buf);
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else
+		{
+			/*
+			 * With wait = true, we always have to wait if the caller has
+			 * passed io_wref = NULL.
+			 *
+			 * Even with io_wref != NULL, we have to wait if the buffer's wait
+			 * ref is not valid but the IO is in progress, someone else
+			 * started IO but hasn't set the wait ref yet. We have no choice
+			 * but to wait until the IO completes.
+			 */
+			UnlockBufHdr(buf);
+
+			/*
+			 * If this backend currently has staged IO, submit it before
+			 * waiting for in-progress IO, to avoid potential deadlocks and
+			 * unnecessary delays.
+			 */
+			pgaio_submit_staged();
+
+			WaitIO(buf);
+		}
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -7128,9 +7161,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
 		UnlockBufHdr(buf);
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
+	/*
+	 * No IO in progress and not already done; We will start IO. It's possible
+	 * that the IO was in progress but we're not done, because the IO errored
+	 * out. We'll do the IO ourselves.
+	 */
 	UnlockBufHdrExt(buf, buf_state,
 					BM_IO_IN_PROGRESS, 0,
 					0);
@@ -7138,7 +7176,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	ResourceOwnerRememberBufferIO(CurrentResourceOwner,
 								  BufferDescriptorGetBuffer(buf));
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
+}
+
+/*
+ * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
+ * when the caller doesn't otherwise need to care about local vs shared. See
+ * StartSharedBufferIO() for details.
+ */
+StartBufferIOResult
+StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
+{
+	BufferDesc *buf_hdr;
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
+	else
+	{
+		buf_hdr = GetBufferDescriptor(buffer - 1);
+
+		return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
 }
 
 /*
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 404c6bccbdd..64da741e101 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -191,7 +191,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 	 * Try to start an I/O operation.  There currently are no reasons for
 	 * StartLocalBufferIO to return false, so we raise an error in that case.
 	 */
-	if (!StartLocalBufferIO(bufHdr, false, false))
+	if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
 		elog(ERROR, "failed to start write IO on local buffer");
 
 	/* Find smgr relation for buffer */
@@ -435,7 +435,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
 
 			/* no need to loop for local buffers */
-			StartLocalBufferIO(existing_hdr, true, false);
+			StartLocalBufferIO(existing_hdr, true, true, NULL);
 		}
 		else
 		{
@@ -451,7 +451,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
 			hresult->id = victim_buf_id;
 
-			StartLocalBufferIO(victim_buf_hdr, true, false);
+			StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -517,26 +517,41 @@ MarkLocalBufferDirty(Buffer buffer)
 }
 
 /*
- * Like StartBufferIO, but for local buffers
+ * Like StartSharedBufferIO, but for local buffers
  */
-bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
+StartBufferIOResult
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
 	/*
 	 * With AIO the buffer could have IO in progress, e.g. when there are two
-	 * scans of the same relation. Either wait for the other IO or return
-	 * false.
+	 * scans of the same relation.  Either wait for the other IO (if wait =
+	 * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
 	 */
 	if (pgaio_wref_valid(&bufHdr->io_wref))
 	{
-		PgAioWaitRef iow = bufHdr->io_wref;
+		PgAioWaitRef buf_wref = bufHdr->io_wref;
 
-		if (nowait)
-			return false;
+		if (io_wref != NULL)
+		{
+			/* We've already asynchronously started this IO, so join it */
+			*io_wref = buf_wref;
+			return BUFFER_IO_IN_PROGRESS;
+		}
 
-		pgaio_wref_wait(&iow);
+		/*
+		 * For temp buffers we should never need to wait in
+		 * StartLocalBufferIO() when called with io_wref == NULL while there
+		 * are staged IOs, as it's not allowed to call code that is not aware
+		 * of AIO while in batch mode.
+		 */
+		Assert(!pgaio_have_staged());
+
+		if (!wait)
+			return BUFFER_IO_IN_PROGRESS;
+
+		pgaio_wref_wait(&buf_wref);
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -545,14 +560,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 	buf_state = pg_atomic_read_u64(&bufHdr->state);
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
 	/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
 
 	/* local buffers don't track IO using resowners */
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
 }
 
 /*
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index e18b2a2b8ae..43e0f255302 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -383,7 +383,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"first StartBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -392,14 +392,14 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"second StartBufferIO fails, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 	psql_like(
 		$io_method,
 		$psql_b,
 		"second StartBufferIO fails, other session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 
@@ -409,7 +409,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, without marking it as success, this should trigger the
@@ -438,7 +438,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"blocking buffer io w/ success: first start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -448,7 +448,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, marking it as success
@@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"first StartLocalBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"second StartLocalBufferIO succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after not marking valid succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after marking valid fails",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^f$/,
 		qr/^$/);
 
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 1cc4734a746..603eae9f83f 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 061f0c9f92a..9fdf4168f4d 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (RelationUsesLocalBuffers(rel))
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartLocalBufferIO(buf_hdrs[i], true, false);
+			StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
 		pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
 	}
 	else
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartBufferIO(buf_hdrs[i], true, false);
+			StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
 	}
 
 	pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
@@ -586,15 +586,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS)
 {
 	Buffer		buf = PG_GETARG_INT32(0);
 	bool		for_input = PG_GETARG_BOOL(1);
-	bool		nowait = PG_GETARG_BOOL(2);
+	bool		wait = PG_GETARG_BOOL(2);
+	StartBufferIOResult result;
 	bool		can_start;
 
 	if (BufferIsLocal(buf))
-		can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
-									   for_input, nowait);
+		result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
+									for_input, wait, NULL);
 	else
-		can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
-								  for_input, nowait);
+		result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
+									 for_input, wait, NULL);
+
+	can_start = result == BUFFER_IO_READY_FOR_IO;
 
 	/*
 	 * For tests we don't want the resowner release preventing us from
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4d2e370804c..8b569039f2c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2934,6 +2934,7 @@ SplitTextOutputData
 SplitVar
 StackElem
 StakindFlags
+StartBufferIOResult
 StartDataPtrType
 StartLOPtrType
 StartLOsPtrType
-- 
2.53.0.1.gb2826b52eb

>From 5ea4a59c0b565456c25d3a2e0534505f28ab08cc Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Thu, 19 Mar 2026 17:00:00 -0400
Subject: [PATCH v7a 8/8] AIO: Don't wait for already in-progress IO

When a backend attempts to start a read IO and finds the first buffer
already has I/O in progress, previously it waited for that I/O to
complete before initiating reads for any of the subsequent buffers.

Although the backend must wait for the I/O to finish when acquiring the
buffer, there's no reason for it to wait when setting up the read
operation. Waiting at this point prevents the backend from starting I/O
on subsequent buffers and can significantly reduce concurrency.

This matters in two workloads: when multiple backends scan the same
relation concurrently, and when a single backend requests the same block
multiple times within the readahead distance.

If backends wait each time they encounter an in-progress read,
the access pattern effectively degenerates into synchronous I/O.

To fix this, when encountering an already in-progress IO for the head
buffer, a backend now records the buffer's wait reference and defers
waiting until WaitReadBuffers(), when it actually needs to acquire the
buffer.

In rare cases, a backend may still need to wait synchronously at IO
start time: if another backend has set BM_IO_IN_PROGRESS on the buffer
but has not yet set the wait reference. Such windows should be brief and
uncommon.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv
---
 src/include/storage/bufmgr.h        |  4 +-
 src/backend/storage/buffer/bufmgr.c | 89 ++++++++++++++++++++++++-----
 2 files changed, 78 insertions(+), 15 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4017896f951..dd41b92f944 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -144,9 +144,11 @@ struct ReadBuffersOperation
 	 */
 	Buffer	   *buffers;
 	BlockNumber blocknum;
-	int			flags;
+	uint16		flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	/* true if waiting on another backend's IO */
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5fc1ceccd0b..db04a3c211d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1800,8 +1800,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 * b) reports some time as waiting, even if we never waited
 			 *
 			 * we first check if we already know the IO is complete.
+			 *
+			 * Note that operation->io_return is uninitialized for foreign IO,
+			 * so we cannot use the cheaper PGAIO_RS_UNKNOWN pre-check.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1820,11 +1823,45 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc = BufferIsLocal(buffer) ?
+					GetLocalBufferDescriptor(-buffer - 1) :
+					GetBufferDescriptor(buffer - 1);
+				uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+				if (buf_state & BM_VALID)
+				{
+					BlockNumber blocknum = operation->blocknum + operation->nblocks_done;
+
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					/*
+					 * Track this as a 'hit' for this backend. The backend
+					 * performing the IO will track it as a 'read'.
+					 */
+					TrackBufferHit(io_object, io_context,
+								   operation->rel, operation->persistence,
+								   operation->smgr, operation->forknum,
+								   blocknum);
+				}
+
+				/*
+				 * If the foreign IO failed and left the buffer invalid,
+				 * nblocks_done is not incremented. The retry loop below will
+				 * call AsyncReadBuffers() which will attempt the IO itself.
+				 */
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1870,7 +1907,8 @@ WaitReadBuffers(ReadBuffersOperation *operation)
  * affected by the call. If the first buffer is valid, *nblocks_progress is
  * set to 1 and operation->nblocks_done is incremented.
  *
- * Returns true if IO was initiated, false if no IO was necessary.
+ * Returns true if IO was initiated or is already in progress (foreign IO),
+ * false if the buffer was already valid.
  */
 static bool
 AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
@@ -1943,8 +1981,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
-	 * block, which we don't want after setting IO_IN_PROGRESS.
+	 * We must get an IO handle before StartBufferIO(), as pgaio_io_acquire()
+	 * might block, which we don't want after setting IO_IN_PROGRESS. If we
+	 * don't need to do the IO, we'll release the handle.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -1966,14 +2005,34 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
+	operation->foreign_io = false;
+	pgaio_wref_clear(&operation->io_wref);
+
 	/*
-	 * Check if we can start IO on the first to-be-read buffer.
+	 * Try to start IO on the first buffer in a new run of blocks. If AIO is
+	 * in progress, be it in this backend or another backend, we just
+	 * associate the wait reference with the operation and wait in
+	 * WaitReadBuffers(). This turns out to be important for performance in
+	 * two workloads:
 	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * 1) A read stream that has to read the same block multiple times within
+	 * the readahead distance. This can happen e.g. for the table accesses of
+	 * an index scan.
+	 *
+	 * 2) Concurrent scans by multiple backends on the same relation.
+	 *
+	 * If we were to synchronously wait for the in-progress IO, we'd not be
+	 * able to keep enough I/O in flight.
+	 *
+	 * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+	 * ReadBuffersOperation that WaitReadBuffers then can wait on.
+	 *
+	 * It's possible that another backend has started IO on the buffer but not
+	 * yet set its wait reference. In this case, we have no choice but to wait
+	 * for either the wait reference to be valid or the IO to be done.
 	 */
-	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	status = StartBufferIO(buffers[nblocks_done], true, true,
+						   &operation->io_wref);
 	if (status != BUFFER_IO_READY_FOR_IO)
 	{
 		pgaio_io_release(ioh);
@@ -2006,6 +2065,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 		/* The IO is already in-progress */
 		Assert(status == BUFFER_IO_IN_PROGRESS);
+		Assert(pgaio_wref_valid(&operation->io_wref));
+		operation->foreign_io = true;
 
 		return true;
 	}
-- 
2.53.0.1.gb2826b52eb

Reply via email to