Thanks for the continued review! Attached v5 adds some comments to the tests, fixes a few nits in the actual code, and adds a commit to fix what I think is an existing off-by-one error in TRACE_POSTGRESQL_BUFFER_READ_DONE.
On Fri, Mar 6, 2026 at 8:18 AM Nazir Bilal Yavuz <[email protected]> wrote: > > > For test_repeated_blocks, the first test: > > > > # test miss of the same block twice in a row > > $psql->query_safe( > > qq/ > > SELECT evict_rel('largeish'); > > /); > > $psql->query_safe( > > qq/ > > SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]); > > /); > > ok(1, "$io_method: stream missing the same block repeatedly"); > > > > It says that it will miss the same block repeatedly, is that because > > we won't start a read for any of the blocks until after > > read_stream_get_block has returned all of them? If so, could be > > clearer in the comment. Not everyone understands all the read stream > > internals. > > I think we start a read of blocks because we hit stream->distance but > it doesn't affect any consecutive same block numbers. What I > understood is: > > Since io_combine_limit is 1, there won't be any IO combining. > > 0th block (0), miss, distance is 1; StartReadBuffersImpl() and > WaitReadBuffers() are called for 0th block. > 1th block (2), miss, distance is 2, StartReadBuffersImpl() is called. > 2th block (2), miss, distance is 2, StartReadBuffersImpl() and > WaitReadBuffers() are called 1th block. > 3th block (4), miss, distance is 4, StartReadBuffersImpl() is called. > 4th block (4), miss, distance is 4, StartReadBuffersImpl() and > WaitReadBuffers() are called 2, 3 and 4th blocks. Makes sense. I've tried to add a comment to this effect. > > I know a lot of other tests do this, but I find it so hard to read the > > test with the SQL is totally left-aligned like that -- especially with > > comments interspersed. You can easily flow the queries on multiple > > lines and indent it more. > > I agree with you. I did reflow the SQL. It does mean there will be a bunch of extra whitespace sent to the server. Other tests do this, though. I wonder how much it affects performance... > > For test_repeated_blocks, the second test: > > > > # test hit of the same block twice in a row > > $psql->query_safe( > > qq/ > > SELECT evict_rel('largeish'); > > /); > > $psql->query_safe( > > qq/ > > SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, > > 5, 6, 5, 4, 3, 2, 1, 0]); > > /); > > ok(1, "$io_method: stream accessing same block"); > > > > I assume that the second access of 2 is a hit because we actually did > > IO for the first one (unlike in the earlier case)? > > I think so but to clarify, all second access of [2, 1, 0] blocks are hit; > right? Yes. I tried expanding the comment to elaborate, but it just came out awkward, so I left it the way it is. > > For test_inject_foreign, the 3rd test: > > > > # Test read stream encountering two buffers that are undergoing the same > > # IO, started by another backend > > > > I see that psql_b is requesting 3 blocks which can be combined into 1 > > IO, which makes it different than the 1st foreign IO test case: > > > > ### > > # Test read stream encountering buffers undergoing IO in another > > backend, > > # with the other backend's reads succeeding. > > ### > > > > where psql_b only requests 1 but I don't really see how these are > > covering different code. Maybe if the read stream one (psql_a) is > > doing a combined IO it might exercise slightly different code, but > > otherwise I don't get it. > > I think the main difference is that: > > > ### > > # Test read stream encountering buffers undergoing IO in another > > backend, > > # with the other backend's reads succeeding. > > ### > > SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', > ARRAY[0, 2, 5, 7]); > > We need to join waiting block number 5 and then start another IO for > block number 7. > > > # Test read stream encountering two buffers that are undergoing the same > > # IO, started by another backend > > SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', > ARRAY[0, 2, 4]); > > We need to join waiting block number 2 but after waiting for an IO, IO > for block number 4 should be already completed too. We don't need to > start IO like the other case. Ah, makes sense. Thanks! - Melanie
From fd1cb5a7d0e04ed70f387ed2c66670e3eff4f049 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 9 Sep 2025 10:14:34 -0400 Subject: [PATCH v5 1/5] aio: Refactor tests in preparation for more tests In a future commit more AIO related tests are due to be introduced. However 001_aio.pl already is fairly large. This commit introduces a new TestAio package with helpers for writing AIO related tests. Then it uses the new helpers to simplify the existing 001_aio.pl by iterating over all supported io_methods. This will be particularly helpful because additional methods already have been submitted. Additionally this commit splits out testing of initdb using a non-default method into its own test. While that test is somewhat important, it's fairly slow and doesn't break that often. For development velocity it's helpful for 001_aio.pl to be faster. While particularly the latter could benefit from being its own commit, it seems to introduce more back-and-forth than it's worth. Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/ --- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 141 +++++++--------------- src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++ src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++ 4 files changed, 204 insertions(+), 99 deletions(-) create mode 100644 src/test/modules/test_aio/t/003_initdb.pl create mode 100644 src/test/modules/test_aio/t/TestAio.pm diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index fefa25bc5ab..18a797f3a3b 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -32,6 +32,7 @@ tests += { 'tests': [ 't/001_aio.pl', 't/002_io_workers.pl', + 't/003_initdb.pl', ], }, } diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 5c634ec3ca9..e18b2a2b8ae 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -7,126 +7,56 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use FindBin; +use lib $FindBin::RealBin; -### -# Test io_method=worker -### -my $node_worker = create_node('worker'); -$node_worker->start(); - -test_generic('worker', $node_worker); -SKIP: -{ - skip 'Injection points not supported by this build', 1 - unless $ENV{enable_injection_points} eq 'yes'; - test_inject_worker('worker', $node_worker); -} +use TestAio; -$node_worker->stop(); +my @methods = TestAio::supported_io_methods(); +my %nodes; ### -# Test io_method=io_uring +# Create and configure one instance for each io_method ### -if (have_io_uring()) +foreach my $method (@methods) { - my $node_uring = create_node('io_uring'); - $node_uring->start(); - test_generic('io_uring', $node_uring); - $node_uring->stop(); -} - - -### -# Test io_method=sync -### - -my $node_sync = create_node('sync'); + my $node = PostgreSQL::Test::Cluster->new($method); -# just to have one test not use the default auto-tuning + $nodes{$method} = $node; + $node->init(); + $node->append_conf('postgresql.conf', "io_method=$method"); + TestAio::configure($node); +} -$node_sync->append_conf( +# Just to have one test not use the default auto-tuning +$nodes{'sync'}->append_conf( 'postgresql.conf', qq( -io_max_concurrency=4 + io_max_concurrency=4 )); -$node_sync->start(); -test_generic('sync', $node_sync); -$node_sync->stop(); - -done_testing(); - ### -# Test Helpers +# Execute the tests for each io_method ### -sub create_node +foreach my $method (@methods) { - local $Test::Builder::Level = $Test::Builder::Level + 1; - - my $io_method = shift; + my $node = $nodes{$method}; - my $node = PostgreSQL::Test::Cluster->new($io_method); - - # Want to test initdb for each IO method, otherwise we could just reuse - # the cluster. - # - # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the - # options specified by ->extra, if somebody puts -c io_method=xyz in - # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we - # detect it. - local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; - if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} - && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) - { - $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; - } - - $node->init(extra => [ '-c', "io_method=$io_method" ]); - - $node->append_conf( - 'postgresql.conf', qq( -shared_preload_libraries=test_aio -log_min_messages = 'DEBUG3' -log_statement=all -log_error_verbosity=default -restart_after_crash=false -temp_buffers=100 -)); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} - # Even though we used -c io_method=... above, if TEMP_CONFIG sets - # io_method, it'd override the setting persisted at initdb time. While - # using (and later verifying) the setting from initdb provides some - # verification of having used the io_method during initdb, it's probably - # not worth the complication of only appending if the variable is set in - # in TEMP_CONFIG. - $node->append_conf( - 'postgresql.conf', qq( -io_method=$io_method -)); +done_testing(); - ok(1, "$io_method: initdb"); - return $node; -} +### +# Test Helpers +### -sub have_io_uring -{ - # To detect if io_uring is supported, we look at the error message for - # assigning an invalid value to an enum GUC, which lists all the valid - # options. We need to use -C to deal with running as administrator on - # windows, the superuser check is omitted if -C is used. - my ($stdout, $stderr) = - run_command [qw(postgres -C invalid -c io_method=invalid)]; - die "can't determine supported io_method values" - unless $stderr =~ m/Available values: ([^\.]+)\./; - my $methods = $1; - note "supported io_method values are: $methods"; - - return ($methods =~ m/io_uring/) ? 1 : 0; -} sub psql_like { @@ -1490,8 +1420,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), } -# Run all tests that are supported for all io_methods -sub test_generic +# Run all tests that for the specified node / io_method +sub test_io_method { my $io_method = shift; my $node = shift; @@ -1526,10 +1456,23 @@ CHECKPOINT; test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + # generic injection tests SKIP: { skip 'Injection points not supported by this build', 1 unless $ENV{enable_injection_points} eq 'yes'; test_inject($io_method, $node); } + + # worker specific injection tests + if ($io_method eq 'worker') + { + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + + test_inject_worker($io_method, $node); + } + } } diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl new file mode 100644 index 00000000000..c03ae58d00a --- /dev/null +++ b/src/test/modules/test_aio/t/003_initdb.pl @@ -0,0 +1,71 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test initdb for each IO method. This is done separately from 001_aio.pl, as +# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can +# be iterated on more quickly. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +foreach my $method (TestAio::supported_io_methods()) +{ + test_create_node($method); +} + +done_testing(); + + +sub test_create_node +{ + local $Test::Builder::Level = $Test::Builder::Level + 1; + + my $io_method = shift; + + my $node = PostgreSQL::Test::Cluster->new($io_method); + + # Want to test initdb for each IO method, otherwise we could just reuse + # the cluster. + # + # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the + # options specified by ->extra, if somebody puts -c io_method=xyz in + # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we + # detect it. + local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; + if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} + && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) + { + $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; + } + + $node->init(extra => [ '-c', "io_method=$io_method" ]); + + TestAio::configure($node); + + # Even though we used -c io_method=... above, if TEMP_CONFIG sets + # io_method, it'd override the setting persisted at initdb time. While + # using (and later verifying) the setting from initdb provides some + # verification of having used the io_method during initdb, it's probably + # not worth the complication of only appending if the variable is set in + # in TEMP_CONFIG. + $node->append_conf( + 'postgresql.conf', qq( +io_method=$io_method +)); + + ok(1, "$io_method: initdb"); + + $node->start(); + $node->stop(); + ok(1, "$io_method: start & stop"); + + return $node; +} diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm new file mode 100644 index 00000000000..5bc80a9b130 --- /dev/null +++ b/src/test/modules/test_aio/t/TestAio.pm @@ -0,0 +1,90 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +TestAio - helpers for writing AIO related tests + +=cut + +package TestAio; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + + +=pod + +=head1 METHODS + +=over + +=item TestAio::supported_io_methods() + +Return an array of all the supported values for the io_method GUC + +=cut + +sub supported_io_methods() +{ + my @io_methods = ('worker'); + + push(@io_methods, "io_uring") if have_io_uring(); + + # Return sync last, as it will least commonly fail + push(@io_methods, "sync"); + + return @io_methods; +} + + +=item TestAio::configure() + +Prepare a cluster for AIO test + +=cut + +sub configure +{ + my $node = shift; + + $node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +)); + +} + + +=pod + +=item TestAio::have_io_uring() + +Return if io_uring is supported + +=cut + +sub have_io_uring +{ + # To detect if io_uring is supported, we look at the error message for + # assigning an invalid value to an enum GUC, which lists all the valid + # options. We need to use -C to deal with running as administrator on + # windows, the superuser check is omitted if -C is used. + my ($stdout, $stderr) = + run_command [qw(postgres -C invalid -c io_method=invalid)]; + die "can't determine supported io_method values" + unless $stderr =~ m/Available values: ([^\.]+)\./; + my $methods = $1; + note "supported io_method values are: $methods"; + + return ($methods =~ m/io_uring/) ? 1 : 0; +} + +1; -- 2.43.0
From 43e0c31a5c262686243ee2ed5617954b5361a3ba Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 10 Sep 2025 14:00:02 -0400 Subject: [PATCH v5 2/5] test_aio: Add read_stream test infrastructure & tests Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 261 +++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 24 +- src/test/modules/test_aio/test_aio.c | 346 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 582 insertions(+), 51 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..755d6dfc030 --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,261 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +$node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', $method); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe( + qq/ SET io_combine_limit = 1; /); + + # test miss of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + # block 0 grows the distance enough that the stream will look ahead and try + # to start a pending read for block 2 (and later block 4) twice before + # returning any buffers. + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); /); + ok(1, "$io_method: stream accessing same block"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Block 5 is undergoing IO in session b, so session a will move on to start + # a new IO for block 7. + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, qq/$io_method: read stream encounters succeeding IO by another backend/); + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_short_read_attach(-errno_from_string('EIO'), + pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + pump_until( + $psql_b->{run}, $psql_b->{timeout}, + \$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>2, nblocks=>3);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Blocks 2 and 4 are undergoing IO initiated by session a + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', + qq/ SELECT inj_io_completion_continue() /); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + is($node->safe_psql('postgres', 'SHOW io_method'), + $io_method, "$io_method: io_method set correctly"); + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..1cc4734a746 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Handle related functions @@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index b1aa8af9ec0..061f0c9f92a 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -19,17 +19,26 @@ #include "postgres.h" #include "access/relation.h" +#include "catalog/pg_type.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/tuplestore.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -37,13 +46,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -85,10 +111,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -384,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -458,6 +487,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + rel = relation_open(relid, AccessExclusiveLock); + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(invalidate_rel_block); Datum invalidate_rel_block(PG_FUNCTION_ARGS) @@ -610,6 +660,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -680,15 +810,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -697,58 +910,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; - - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } - } + uint32 ok_part = new_result - processed; - ioh->result = new_result; + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); + } } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -762,6 +973,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -771,6 +1015,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 52f8603a7be..9036fef129b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -305,6 +305,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.43.0
From 17e85c3deaf8b88145cf4a09763ae17f4f9bd274 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Mon, 16 Mar 2026 16:50:56 -0400 Subject: [PATCH v5 3/5] Fix off-by-one error in read IO tracing --- src/backend/storage/buffer/bufmgr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 00bc609529a..0723d4f3dd8 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done, + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done - 1, operation->smgr->smgr_rlocator.locator.spcOid, operation->smgr->smgr_rlocator.locator.dbOid, operation->smgr->smgr_rlocator.locator.relNumber, -- 2.43.0
From 875a678a953865e6596c779f468c6649d6006d59 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 13:54:02 -0500 Subject: [PATCH v5 4/5] Make buffer hit helper Already two places count buffer hits, requiring quite a few lines of code since we do accounting in so many places. Future commits will add more locations, so refactor into a helper. Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 111 ++++++++++++++-------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 0723d4f3dd8..399004c2e44 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -648,6 +648,10 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr, IOContext io_context); static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); + +static pg_attribute_always_inline void CountBufferHit(BufferAccessStrategy strategy, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1226,8 +1230,6 @@ PinBufferForBlock(Relation rel, bool *foundPtr) { BufferDesc *bufHdr; - IOContext io_context; - IOObject io_object; Assert(blockNum != P_NEW); @@ -1236,17 +1238,6 @@ PinBufferForBlock(Relation rel, persistence == RELPERSISTENCE_PERMANENT || persistence == RELPERSISTENCE_UNLOGGED)); - if (persistence == RELPERSISTENCE_TEMP) - { - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - } - else - { - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, smgr->smgr_rlocator.locator.spcOid, smgr->smgr_rlocator.locator.dbOid, @@ -1254,18 +1245,11 @@ PinBufferForBlock(Relation rel, smgr->smgr_rlocator.backend); if (persistence == RELPERSISTENCE_TEMP) - { bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); - if (*foundPtr) - pgBufferUsage.local_blks_hit++; - } else - { bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, - strategy, foundPtr, io_context); - if (*foundPtr) - pgBufferUsage.shared_blks_hit++; - } + strategy, foundPtr, IOContextForStrategy(strategy)); + if (rel) { /* @@ -1274,22 +1258,10 @@ PinBufferForBlock(Relation rel, * zeroed instead), the per-relation stats always count them. */ pgstat_count_buffer_read(rel); - if (*foundPtr) - pgstat_count_buffer_hit(rel); } - if (*foundPtr) - { - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - true); - } + if (*foundPtr) + CountBufferHit(strategy, rel, persistence, smgr, forkNum, blockNum); return BufferDescriptorGetBuffer(bufHdr); } @@ -1695,6 +1667,51 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait) return ReadBuffersCanStartIOOnce(buffer, nowait); } +/* + * We track various stats related to buffer hits. Because this is done in a + * few separate places, this helper exists for convenience. + */ +static pg_attribute_always_inline void +CountBufferHit(BufferAccessStrategy strategy, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum) +{ + IOContext io_context; + IOObject io_object; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } + + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, + blocknum, + smgr->smgr_rlocator.locator.spcOid, + smgr->smgr_rlocator.locator.dbOid, + smgr->smgr_rlocator.locator.relNumber, + smgr->smgr_rlocator.backend, + true); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (rel) + pgstat_count_buffer_hit(rel); + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; +} + /* * Helper for WaitReadBuffers() that processes the results of a readv * operation, raising an error if necessary. @@ -1990,25 +2007,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done - 1, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - true); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_hit += 1; - else - pgBufferUsage.shared_blks_hit += 1; - - if (operation->rel) - pgstat_count_buffer_hit(operation->rel); - - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; + CountBufferHit(operation->strategy, operation->rel, persistence, + operation->smgr, forknum, + blocknum + operation->nblocks_done - 1); } else { -- 2.43.0
From 4d737fa14f333abc4ee6ade8cb0340530695e887 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 14:00:31 -0500 Subject: [PATCH v5 5/5] Don't wait for already in-progress IO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a backend attempts to start a read on a buffer and finds that I/O is already in progress, it previously waited for that I/O to complete before initiating reads for any other buffers. Although the backend must still wait for the I/O to finish when later acquiring the buffer, it should not need to wait at read start time. Other buffers may be available for I/O, and in some workloads this waiting significantly reduces concurrency. For example, index scans may repeatedly request the same heap block. If the backend waits each time it encounters an in-progress read, the access pattern effectively degenerates into synchronous I/O. By introducing the concept of foreign I/O operations, a backend can record the buffer’s wait reference and defer waiting until WaitReadBuffers() when it actually acquires the buffer. In rare cases, a backend may still need to wait when starting a read if it encounters a buffer after another backend has set BM_IO_IN_PROGRESS but before the buffer descriptor’s wait reference has been set. Such windows should be brief and uncommon. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 491 ++++++++++++++++++---------- src/include/storage/bufmgr.h | 2 + src/tools/pgindent/typedefs.list | 1 + 3 files changed, 330 insertions(+), 164 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 399004c2e44..20c36ccead0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -185,6 +185,21 @@ typedef struct SMgrSortArray SMgrRelation srel; } SMgrSortArray; + +/* + * In AsyncReadBuffers(), when preparing a buffer for reading and setting + * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may + * already contain the desired block. AsyncReadBuffers() must distinguish + * between these cases (and the case where it should initiate I/O) so it can + * mark an in-progress buffer as foreign I/O rather than waiting on it. + */ +typedef enum PrepareReadBuffer_Status +{ + READ_BUFFER_ALREADY_DONE, + READ_BUFFER_IN_PROGRESS, + READ_BUFFER_READY_FOR_IO, +} PrepareReadBuffer_Status; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -1628,45 +1643,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) -{ - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); -} - -/* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. - */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) -{ - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) - { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; - - /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. - */ - pgaio_submit_staged(); - } - - return ReadBuffersCanStartIOOnce(buffer, nowait); -} - /* * We track various stats related to buffer hits. Because this is done in a * few separate places, this helper exists for convenience. @@ -1815,8 +1791,11 @@ WaitReadBuffers(ReadBuffersOperation *operation) * b) reports some time as waiting, even if we never waited * * we first check if we already know the IO is complete. + * + * Note that operation->io_return is uninitialized for foreign IO, + * so we cannot count that wait time. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1835,11 +1814,33 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + uint32 buf_state = pg_atomic_read_u64(&desc->state); + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + CountBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done - 1); + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1870,6 +1871,163 @@ WaitReadBuffers(ReadBuffersOperation *operation) /* NB: READ_DONE tracepoint was already executed in completion callback */ } +/* + * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to + * avoid an external function call. + */ +static PrepareReadBuffer_Status +PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + pgaio_submit_staged(); + + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + return READ_BUFFER_IN_PROGRESS; + } + + /* + * While it is possible for a buffer to have been prepared for IO but not + * yet had its wait reference set, there's no way for us to know that for + * temporary buffers. Thus, we'll prepare for own IO on this buffer. + */ + return READ_BUFFER_READY_FOR_IO; +} + +/* + * Try to start IO on the first buffer in a new run of blocks. If AIO is in + * progress, be it in this backend or another backend, we just associate the + * wait reference with the operation and wait in WaitReadBuffers(). This turns + * out to be important for performance in two workloads: + * + * 1) A read stream that has to read the same block multiple times within the + * readahead distance. This can happen e.g. for the table accesses of an + * index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be able + * to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend has started IO on the buffer but not yet + * set its wait reference. In this case, we have no choice but to wait for + * either the wait reference to be valid or the IO to be done. + */ +static PrepareReadBuffer_Status +PrepareNewReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + return PrepareNewLocalReadBufferIO(operation, buffer); + + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + + for (;;) + { + buf_state = LockBufHdr(desc); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + UnlockBufHdr(desc); + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + if (buf_state & BM_IO_IN_PROGRESS) + { + /* Join existing read */ + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + UnlockBufHdr(desc); + return READ_BUFFER_IN_PROGRESS; + } + + /* + * If the wait ref is not valid but the IO is in progress, someone + * else started IO but hasn't set the wait ref yet. We have no + * choice but to wait until the IO completes. + */ + UnlockBufHdr(desc); + pgaio_submit_staged(); + WaitIO(desc); + continue; + } + + /* + * No IO in progress and not already valid; We will start IO. It's + * possible that the IO was in progress and never became valid because + * the IO errored out. We'll do the IO ourselves. + */ + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, + BufferDescriptorGetBuffer(desc)); + + return READ_BUFFER_READY_FOR_IO; + } +} + + +/* + * When building a new IO from multiple buffers, we won't include buffers + * that are already valid or already in progress. This function should only be + * used for additional adjacent buffers following the head buffer in a new IO. + * + * Returns true if the buffer was successfully prepared for IO and false if it + * is rejected and the read IO should not include this buffer. + */ +static bool +PrepareAdditionalReadBuffer(Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u64(&desc->state); + /* Local buffers don't use BM_IO_IN_PROGRESS */ + if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref)) + return false; + } + else + { + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS)) + { + UnlockBufHdr(desc); + return false; + } + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer); + } + + return true; +} + /* * Initiate IO for the ReadBuffersOperation * @@ -1885,7 +2043,8 @@ WaitReadBuffers(ReadBuffersOperation *operation) * affected by the call. If the first buffer is valid, *nblocks_progress is * set to 1 and operation->nblocks_done is incremented. * - * Returns true if IO was initiated, false if no IO was necessary. + * Returns true if IO was initiated or is already in progress (foreign IO), + * false if the buffer was already valid. */ static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) @@ -1903,7 +2062,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) void *io_pages[MAX_IO_COMBINE_LIMIT]; IOContext io_context; IOObject io_object; - bool did_start_io; + instr_time io_start; + PrepareReadBuffer_Status status; + + /* + * We must get an IO handle before PrepareNewReadBufferIO(), as + * pgaio_io_acquire() might block, which we don't want after setting + * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the + * handle. + * + * If we need to wait for IO before we can get a handle, submit + * already-staged IO first, so that other backends don't need to wait. + * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to + * wait for already submitted IO, which doesn't require additional locks, + * but it could still cause undesirable waits. + * + * A secondary benefit is that this would allow us to measure the time in + * pgaio_io_acquire() without causing undue timer overhead in the common, + * non-blocking, case. However, currently the pgstats infrastructure + * doesn't really allow that, as it a) asserts that an operation can't + * have time without operations b) doesn't have an API to report + * "accumulated" time. + */ + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); + if (unlikely(!ioh)) + { + pgaio_submit_staged(); + ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); + } + + operation->foreign_io = false; + + /* Check if we can start IO on the first to-be-read buffer */ + if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) < + READ_BUFFER_READY_FOR_IO) + { + pgaio_io_release(ioh); + *nblocks_progress = 1; + if (status == READ_BUFFER_ALREADY_DONE) + { + /* + * Someone else has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + CountBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done - 1); + return false; + } + + /* The IO is already in-progress */ + Assert(status == READ_BUFFER_IN_PROGRESS); + CheckReadBuffersOperation(operation, false); + return true; + } + + /* We can read in at least the head buffer */ + Assert(status == READ_BUFFER_READY_FOR_IO); /* * When this IO is executed synchronously, either because the caller will @@ -1954,138 +2181,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); - /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. - * - * If we need to wait for IO before we can get a handle, submit - * already-staged IO first, so that other backends don't need to wait. - * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to - * wait for already submitted IO, which doesn't require additional locks, - * but it could still cause undesirable waits. - * - * A secondary benefit is that this would allow us to measure the time in - * pgaio_io_acquire() without causing undue timer overhead in the common, - * non-blocking, case. However, currently the pgstats infrastructure - * doesn't really allow that, as it a) asserts that an operation can't - * have time without operations b) doesn't have an API to report - * "accumulated" time. - */ - ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); - if (unlikely(!ioh)) - { - pgaio_submit_staged(); - - ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); - } + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); + io_buffers_len = 1; /* - * Check if we can start IO on the first to-be-read buffer. - * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * How many neighboring-on-disk blocks can we scatter-read into other + * buffers at the same time? In this case we don't wait if we see an I/O + * already in progress. We already set BM_IO_IN_PROGRESS for the head + * block, so we should get on with that I/O as soon as possible. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { - /* - * Someone else has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; - *nblocks_progress = 1; - - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); - did_start_io = false; + if (!PrepareAdditionalReadBuffer(buffers[i])) + break; + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); + Assert(io_buffers[io_buffers_len] == buffers[i]); - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - CountBufferHit(operation->strategy, operation->rel, persistence, - operation->smgr, forknum, - blocknum + operation->nblocks_done - 1); + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } - else - { - instr_time io_start; - - /* We found a buffer that we need to read in. */ - Assert(io_buffers[0] == buffers[nblocks_done]); - io_pages[0] = BufferGetBlock(buffers[nblocks_done]); - io_buffers_len = 1; - - /* - * How many neighboring-on-disk blocks can we scatter-read into other - * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already set BM_IO_IN_PROGRESS for the - * head block, so we should get on with that I/O as soon as possible. - */ - for (int i = nblocks_done + 1; i < operation->nblocks; i++) - { - if (!ReadBuffersCanStartIO(buffers[i], true)) - break; - /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i - 1]) == - BufferGetBlockNumber(buffers[i]) - 1); - Assert(io_buffers[io_buffers_len] == buffers[i]); - io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); - } + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); - /* get a reference to wait for in WaitReadBuffers() */ - pgaio_io_get_wref(ioh, &operation->io_wref); + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - /* provide the list of buffers to the completion callbacks */ - pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); - pgaio_io_register_callbacks(ioh, - persistence == RELPERSISTENCE_TEMP ? - PGAIO_HCB_LOCAL_BUFFER_READV : - PGAIO_HCB_SHARED_BUFFER_READV, - flags); + pgaio_io_set_flag(ioh, ioh_flags); - pgaio_io_set_flag(ioh, ioh_flags); + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ + io_start = pgstat_prepare_io_time(track_io_timing); + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum + nblocks_done, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); - /* --- - * Even though we're trying to issue IO asynchronously, track the time - * in smgrstartreadv(): - * - if io_method == IOMETHOD_SYNC, we will always perform the IO - * immediately - * - the io method might not support the IO (e.g. worker IO for a temp - * table) - * --- - */ - io_start = pgstat_prepare_io_time(track_io_timing); - smgrstartreadv(ioh, operation->smgr, forknum, - blocknum + nblocks_done, - io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, - io_start, 1, io_buffers_len * BLCKSZ); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += io_buffers_len; - else - pgBufferUsage.shared_blks_read += io_buffers_len; + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += io_buffers_len; + else + pgBufferUsage.shared_blks_read += io_buffers_len; - /* - * Track vacuum cost when issuing IO, not after waiting for it. - * Otherwise we could end up issuing a lot of IO in a short timespan, - * despite a low cost limit. - */ - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + /* + * Track vacuum cost when issuing IO, not after waiting for it. Otherwise + * we could end up issuing a lot of IO in a short timespan, despite a low + * cost limit. + */ + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - *nblocks_progress = io_buffers_len; - did_start_io = true; - } + *nblocks_progress = io_buffers_len; - return did_start_io; + return true; } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 4017896f951..f85a9acc6ac 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -147,6 +147,8 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + /* true if waiting on another backend's IO */ + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9036fef129b..92230994633 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2363,6 +2363,7 @@ PredicateLockData PredicateLockTargetType PrefetchBufferResult PrepParallelRestorePtrType +PrepareReadBuffer_Status PrepareStmt PreparedStatement PresortedKeyData -- 2.43.0
