This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit a516685f59f380598974d2487d38e0da5bfdbe76 Merge: c038984d16 b9d1809996 Author: Keith Turner <[email protected]> AuthorDate: Fri May 16 14:55:39 2025 +0000 Merge commit 'b9d1809996bb81a7457b6ea84dc9d729ea9d1023' .../org/apache/accumulo/tserver/NativeMap.java | 22 +-- .../java/org/apache/accumulo/test/LargeReadIT.java | 174 +++++++++++++++++++++ .../accumulo/test/functional/NativeMapIT.java | 13 ++ 3 files changed, 198 insertions(+), 11 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/LargeReadIT.java index 0000000000,bcb7ef0a9b..d97e70575b mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java +++ b/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java @@@ -1,0 -1,174 +1,174 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import static org.junit.jupiter.api.Assertions.assertEquals; + + import java.time.Duration; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.Callable; + import java.util.concurrent.Executors; + import java.util.function.Consumer; + import java.util.stream.Collectors; + import java.util.stream.Stream; + + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.minicluster.MemoryUnit; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.junit.jupiter.api.Test; + + /** + * Accumulo had a bug where the native map code would always read the first key in a tablet even if + * the scan did not need it. This test is structured so that if the native map does this the tablet + * server will fail w/ an out of memory exception. + */ + public class LargeReadIT extends AccumuloClusterHarness { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(6); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "64"); + cfg.setProperty(Property.TSERV_MINTHREADS, "64"); + cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "true"); + cfg.setProperty(Property.TSERV_MAXMEM, "100M"); + cfg.setMemory(ServerType.TABLET_SERVER, 128, MemoryUnit.MEGABYTE); - cfg.setNumTservers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + } + + @Test + public void testLargeMemoryLG() throws Exception { + // if locality groups are not set then this test will fail because all the scans will read the + // big key value in to memory and then filter it + Map<String,Set<Text>> groups = + Map.of("big", Set.of(new Text("big")), "small", Set.of(new Text("small"))); + NewTableConfiguration config = new NewTableConfiguration().setLocalityGroups(groups); + + Consumer<Scanner> scanConfigurer = scanner -> { + // setting this column family plus the locality group settings should exclude the large value + // at row=001 family=big + scanner.fetchColumnFamily("small"); + }; + + testLargeMemory(config, scanConfigurer); + } + + @Test + public void testLargeMemoryRange() throws Exception { + NewTableConfiguration config = new NewTableConfiguration(); + + Consumer<Scanner> scanConfigurer = scanner -> { + // This range should exclude the large value at row=001 family=big from ever being read + scanner.setRange(new Range(new Key("000", "small", ""), false, null, true)); + }; + + testLargeMemory(config, scanConfigurer); + } + + private void testLargeMemory(NewTableConfiguration config, Consumer<Scanner> scannerConfigurer) + throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + client.tableOperations().create(tableName, config); + + try (var writer = client.createBatchWriter(tableName)) { + var bigValue = new Mutation("000"); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 10_000_000; i++) { + sb.append((i % 10)); + } + // This is the first key in the tablet, it sorts before everything else. + bigValue.put("big", "number", sb.toString()); + writer.addMutation(bigValue); + + for (int i = 0; i < 100; i++) { + String row = String.format("%03d", i); + var smallValue = new Mutation(row); + smallValue.put("small", "number", i + ""); + writer.addMutation(smallValue); + } + } + + final int numThreads = 64; + var executor = Executors.newFixedThreadPool(numThreads); + Callable<Long> scanTask = () -> { + try (var scanner = client.createScanner(tableName)) { + scannerConfigurer.accept(scanner); + return scanner.stream().count(); + } + }; + + // Run lots of concurrent task that should only read the small data, if they read the big + // column family then it will exceed the tablet server memory and cause it to die and the test + // to timeout. + var tasks = + Stream.iterate(scanTask, t -> t).limit(numThreads * 5).collect(Collectors.toList()); + assertEquals(numThreads * 5, tasks.size()); + for (var future : executor.invokeAll(tasks)) { + assertEquals(100, future.get()); + } + + // expected data to be in memory for this part of the test, so verify that + var ctx = ((ClientContext) client); + try (var tablets = ctx.getAmple().readTablets().forTable(ctx.getTableId(tableName)) + .fetch(TabletMetadata.ColumnType.FILES).build()) { + assertEquals(0, tablets.stream().count()); + } + + // flush data and verify + client.tableOperations().flush(tableName, null, null, true); + try (var tablets = ctx.getAmple().readTablets().forTable(ctx.getTableId(tableName)) + .fetch(TabletMetadata.ColumnType.FILES).build()) { + assertEquals(1, tablets.stream().count()); + } + + // Run the scans again, reading from files instead of in memory map... verify the large data + // is not brought into memory from the file, which would kill the tablet server. The test was + // created because of a bug in the native map code, but can also check the rfile code for a + // similar problem. + for (var future : executor.invokeAll(tasks)) { + assertEquals(100, future.get()); + } + + // this test assumes the first key in the tablet is the big one, verify this assumption + try (var scanner = client.createScanner(tableName)) { + assertEquals(10_000_000, scanner.iterator().next().getValue().getSize()); + } + } + } + }
