This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 0dc5f771c0 Fix MemoryStarvedMajCIT (#4041) 0dc5f771c0 is described below commit 0dc5f771c0df1ad35f996679e19f900830e0edca Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Dec 8 15:23:48 2023 -0500 Fix MemoryStarvedMajCIT (#4041) Modify IT to use methods on a modified Compactor class to consume and free memory instead of using a Scanner with an iterator. The scanner approach no longer works because majc's no longer occur in the tablet server. Fixes #4037 --- .../org/apache/accumulo/compactor/Compactor.java | 7 +- .../test/functional/MemoryConsumingCompactor.java | 96 ++++++++++++++++++++++ .../test/functional/MemoryConsumingIterator.java | 12 ++- .../test/functional/MemoryStarvedMajCIT.java | 72 ++++++++++------ 4 files changed, 158 insertions(+), 29 deletions(-) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 9e3c81504c..11a91e09bc 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -304,6 +304,10 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } } + protected CompactorService.Iface getCompactorThriftHandlerInterface() { + return this; + } + /** * Start this Compactors thrift service to handle incoming client requests * @@ -313,7 +317,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac protected ServerAddress startCompactorClientService() throws UnknownHostException { ClientServiceHandler clientHandler = new ClientServiceHandler(getContext()); - var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, this, getContext()); + var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, + getCompactorThriftHandlerInterface(), getContext()); Property maxMessageSizeProperty = (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java new file mode 100644 index 0000000000..d725730245 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.CompactorService; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemoryConsumingCompactor extends Compactor implements CompactorService.Iface { + + private static final Logger LOG = LoggerFactory.getLogger(MemoryConsumingCompactor.class); + + MemoryConsumingCompactor(ConfigOpts opts, String[] args) { + super(opts, args); + } + + @Override + protected CompactorService.Iface getCompactorThriftHandlerInterface() { + return this; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + // Use the cancel Thrift RPC to free the consumed memory + LOG.warn("cancel called, freeing memory"); + MemoryConsumingIterator.freeBuffers(); + } + + @Override + public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException, TException { + // Use the getRunningCompaction Thrift RPC to consume the memory + LOG.warn("getRunningCompaction called, consuming memory"); + try { + MemoryConsumingIterator iter = new MemoryConsumingIterator(); + iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(), + new SystemIteratorEnvironment() { + + @Override + public ServerContext getServerContext() { + return getContext(); + } + + @Override + public SortedKeyValueIterator<Key,Value> + getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) { + return null; + } + + }); + iter.consume(); + } catch (IOException e) { + throw new TException("Error consuming memory", e); + } + return new TExternalCompactionJob(); + } + + public static void main(String[] args) throws Exception { + try (var compactor = new MemoryConsumingCompactor(new ConfigOpts(), args)) { + compactor.runServer(); + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java index 2b23fb1cff..fd3f911250 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java @@ -61,10 +61,7 @@ public class MemoryConsumingIterator extends WrappingIterator { return (int) amountToConsume; } - @Override - public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) - throws IOException { - LOG.info("seek called"); + public void consume() throws IOException { while (!this.isRunningLowOnMemory()) { int amountToConsume = getAmountToConsume(); if (amountToConsume > 0) { @@ -83,6 +80,13 @@ public class MemoryConsumingIterator extends WrappingIterator { } } LOG.info("Running low on memory == true"); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) + throws IOException { + LOG.info("seek called"); + consume(); super.seek(range, columnFamilies, inclusive); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 5c9e2cd96f..9ec1094ea7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -30,36 +30,41 @@ import java.util.concurrent.atomic.DoubleAdder; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; -@Disabled // ELASTICITY_TODO public class MemoryStarvedMajCIT extends SharedMiniClusterBase { + private static final Logger LOG = LoggerFactory.getLogger(MemoryStarvedMajCIT.class); + public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); - cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE); - // Configure the LowMemoryDetector in the TabletServer + cfg.getClusterServerConfiguration().setNumDefaultScanServers(0); + cfg.getClusterServerConfiguration().setNumDefaultCompactors(1); cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s"); cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD, Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD)); @@ -120,16 +125,31 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { @Test public void testMajCPauses() throws Exception { + String table = getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + ClientContext ctx = (ClientContext) client; + + // Stop the normal compactors and start the version that will consume + // and free memory when we need it to + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + + getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, + MemoryConsumingCompactor.class); + Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 4, 60_000); + Wait.waitFor( + () -> ExternalCompactionUtil.getCompactorAddrs(ctx).get("user_small").size() == 1, + 60_000); + + Map<String,List<HostAndPort>> groupedCompactors = + ExternalCompactionUtil.getCompactorAddrs(ctx); + List<HostAndPort> compactorAddresses = groupedCompactors.get("user_small"); + HostAndPort compactorAddr = compactorAddresses.get(0); + TableOperations to = client.tableOperations(); to.create(table); - // Add a small amount of data so that the MemoryConsumingIterator - // returns true when trying to consume all of the memory. - ReadWriteIT.ingest(client, 1, 1, 1, 0, table); - AtomicReference<Throwable> error = new AtomicReference<>(); Thread compactionThread = new Thread(() -> { try { @@ -139,25 +159,29 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { } }); - try (Scanner scanner = client.createScanner(table)) { + int paused = MAJC_PAUSED.intValue(); + assertEquals(0, paused); - MemoryStarvedScanIT.consumeServerMemory(scanner); + // Calling getRunningCompaction on the MemoryConsumingCompactor + // will consume the free memory + LOG.info("Calling getRunningCompaction on {}", compactorAddr); + ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx); - int paused = MAJC_PAUSED.intValue(); - assertEquals(0, paused); + ReadWriteIT.ingest(client, 100, 100, 100, 0, table); + compactionThread.start(); - ReadWriteIT.ingest(client, 100, 100, 100, 0, table); - compactionThread.start(); + waitFor(() -> MAJC_PAUSED.intValue() > 0); - waitFor(() -> MAJC_PAUSED.intValue() > 0); + // Calling cancel on the MemoryConsumingCompactor will free + // the consumed memory + LOG.info("Calling cancel on {}", compactorAddr); + ExternalCompactionUtil.cancelCompaction(ctx, compactorAddr, "fakeECID"); - MemoryStarvedScanIT.freeServerMemory(client); - compactionThread.interrupt(); - compactionThread.join(); - assertNull(error.get()); - assertTrue(client.instanceOperations().getActiveCompactions().stream() - .anyMatch(ac -> ac.getPausedCount() > 0)); - } + compactionThread.interrupt(); + compactionThread.join(); + assertNull(error.get()); + assertTrue(client.instanceOperations().getActiveCompactions().stream() + .anyMatch(ac -> ac.getPausedCount() > 0)); } }