This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f7e6183271 speeds up tablet mgmt iterator (#4568) f7e6183271 is described below commit f7e61832716a196b6b2e9d61666561ef66e27929 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat May 18 13:20:22 2024 -0400 speeds up tablet mgmt iterator (#4568) Based on profiling tablet servers while running SplitMillionIT, made the following changes related to TabletManagementIterator * Made TabletMgmtIterator extend WholeRowIterator instead of stack on top of it. This avoids encoding->decoding->encoding rows on the iterator stack. * Avoided lookuping and parsing multiple table props for each tablet inorder to determine if it needs split. Moved this be done once per table. * Changed a stream to compute file size sum into a loop. --- .../core/manager/state/TabletManagement.java | 14 +- .../core/metadata/schema/TabletMetadata.java | 11 +- .../manager/state/TabletManagementIterator.java | 164 +++++++++++---------- .../server/manager/state/ZooTabletStateStore.java | 5 +- .../apache/accumulo/server/split/SplitUtils.java | 8 - .../state/TabletManagementIteratorTest.java | 6 +- .../server/manager/state/TabletManagementTest.java | 7 +- .../manager/ManagerClientServiceHandler.java | 5 +- 8 files changed, 118 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java index 0649791623..7b13e76b7f 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java @@ -23,6 +23,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.SortedMap; +import java.util.function.BiConsumer; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -58,17 +59,18 @@ public class TabletManagement { NEEDS_VOLUME_REPLACEMENT; } - public static void addActions(final SortedMap<Key,Value> decodedRow, + public static void addActions(final BiConsumer<Key,Value> bic, final Text row, final Set<ManagementAction> actions) { - final Key reasonsKey = new Key(decodedRow.firstKey().getRow(), REASONS_COLUMN_NAME, EMPTY); + final Key reasonsKey = new Key(row, REASONS_COLUMN_NAME, EMPTY); final Value reasonsValue = new Value(Joiner.on(',').join(actions)); - decodedRow.put(reasonsKey, reasonsValue); + bic.accept(reasonsKey, reasonsValue); } - public static void addError(final SortedMap<Key,Value> decodedRow, final Exception error) { - final Key errorKey = new Key(decodedRow.firstKey().getRow(), ERROR_COLUMN_NAME, EMPTY); + public static void addError(final BiConsumer<Key,Value> bic, final Text row, + final Exception error) { + final Key errorKey = new Key(row, ERROR_COLUMN_NAME, EMPTY); final Value errorValue = new Value(error.getMessage()); - decodedRow.put(errorKey, errorValue); + bic.accept(errorKey, errorValue); } private final Set<ManagementAction> actions; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 6f2ffd6237..ef1fd5ea9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -156,8 +156,15 @@ public class TabletMetadata { this.compacted = tmBuilder.compacted.build(); this.userCompactionsRequested = tmBuilder.userCompactionsRequested.build(); this.unSplittableMetadata = tmBuilder.unSplittableMetadata; - this.fileSize = - Suppliers.memoize(() -> files.values().stream().mapToLong(DataFileValue::getSize).sum()); + this.fileSize = Suppliers.memoize(() -> { + // This code was using a java stream. While profiling SplitMillionIT, the stream was showing + // up as hot when scanning 1 million tablets. Converted to a for loop to improve performance. + long sum = 0; + for (var dfv : files.values()) { + sum += dfv.getSize(); + } + return sum; + }); this.extent = Suppliers.memoize(() -> new KeyExtent(getTableId(), getEndRow(), getPrevEndRow())); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index b9c64c7f84..e5c7bd32b4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -19,15 +19,16 @@ package org.apache.accumulo.server.manager.state; import java.io.IOException; -import java.util.ArrayList; +import java.util.AbstractMap; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedMap; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.PluginEnvironment.Configuration; @@ -39,9 +40,9 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.manager.state.TabletManagement; @@ -58,6 +59,7 @@ import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.split.SplitUtils; +import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,35 +68,55 @@ import org.slf4j.LoggerFactory; * TabletManagement objects for each Tablet that needs some type of action performed on it by the * Manager. */ -public class TabletManagementIterator extends SkippingIterator { +public class TabletManagementIterator extends WholeRowIterator { private static final Logger LOG = LoggerFactory.getLogger(TabletManagementIterator.class); public static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams"; private CompactionJobGenerator compactionGenerator; private TabletBalancer balancer; + private final SplitConfig splitConfig = new SplitConfig(); + + private static class SplitConfig { + TableId tableId; + long splitThreshold; + long maxEndRowSize; + int maxFilesToOpen; + + void update(TableId tableId, Configuration tableConfig) { + if (!tableId.equals(this.tableId)) { + this.tableId = tableId; + splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + maxEndRowSize = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); + maxFilesToOpen = (int) ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey())); + } + } + } private static boolean shouldReturnDueToSplit(final TabletMetadata tm, - final Configuration tableConfig) { + final Configuration tableConfig, SplitConfig splitConfig) { - final long splitThreshold = ConfigurationTypeHelper - .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); - final long maxEndRowSize = ConfigurationTypeHelper - .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); - final int maxFilesToOpen = (int) ConfigurationTypeHelper - .getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey())); + // Should see the same table many times in a row, so this should only read config the first time + // seen. Reading the config for each tablet was showing up as performance problem when profiling + // SplitMillionIT that reads one million tablets. It is also nice to have snapshot of config + // that is used for all tablet in a table. + splitConfig.update(tm.getTableId(), tableConfig); // If the current computed metadata matches the current marker then we can't split, // so we return false. If the marker is set but doesn't match then return true // which gives a chance to clean up the marker and recheck. var unsplittable = tm.getUnSplittable(); if (unsplittable != null) { - return !unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), - splitThreshold, maxEndRowSize, maxFilesToOpen, tm.getFiles())); + return !unsplittable + .equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), splitConfig.splitThreshold, + splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, tm.getFiles())); } // If unsplittable is not set at all then check if over split threshold - final boolean shouldSplit = SplitUtils.needsSplit(tableConfig, tm); + final boolean shouldSplit = SplitUtils.needsSplit(splitConfig.splitThreshold, tm); LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), - tm.getFileSize(), splitThreshold, shouldSplit); + tm.getFileSize(), splitConfig.splitThreshold, shouldSplit); return shouldSplit; } @@ -137,7 +159,6 @@ public class TabletManagementIterator extends SkippingIterator { final TabletManagementParameters tabletMgmtParams) { // Note : if the scanner is ever made to fetch columns, then TabletManagement.CONFIGURED_COLUMNS // must be updated - scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, tabletMgmtParams.serialize()); @@ -149,8 +170,7 @@ public class TabletManagementIterator extends SkippingIterator { } private IteratorEnvironment env; - private Key topKey = null; - private Value topValue = null; + private TabletManagementParameters tabletMgmtParams = null; @Override @@ -188,69 +208,64 @@ public class TabletManagementIterator extends SkippingIterator { } @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - return topValue; - } - - @Override - public boolean hasTop() { - return topKey != null && topValue != null; - } + protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { - @Override - protected void consume() throws IOException { - topKey = null; - topValue = null; + var keyIter = keys.listIterator(); + var kvIter = new Iterator<Map.Entry<Key,Value>>() { + @Override + public boolean hasNext() { + return keyIter.hasNext(); + } - final Set<ManagementAction> actions = new HashSet<>(); - while (getSource().hasTop()) { - final Key k = getSource().getTopKey(); - final Value v = getSource().getTopValue(); - final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v); - final TabletMetadata tm = TabletMetadata.convertRow(decodedRow.entrySet().iterator(), - TabletManagement.CONFIGURED_COLUMNS, false, true); - - actions.clear(); - Exception error = null; - try { - LOG.trace("Evaluating extent: {}", tm); - computeTabletManagementActions(tm, actions); - } catch (Exception e) { - LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); - error = e; + @Override + public Entry<Key,Value> next() { + var valueIdx = keyIter.nextIndex(); + var key = keyIter.next(); + return new AbstractMap.SimpleImmutableEntry<>(key, values.get(valueIdx)); } + }; - if (!actions.isEmpty() || error != null) { - if (error != null) { - // Insert the error into K,V pair representing - // the tablet metadata. - TabletManagement.addError(decodedRow, error); - } else if (!actions.isEmpty()) { - // If we simply returned here, then the client would get the encoded K,V - // from the WholeRowIterator. However, it would not know the reason(s) why - // it was returned. Insert a K,V pair to represent the reasons. The client - // can pull this K,V pair from the results by looking at the colf. - TabletManagement.addActions(decodedRow, actions); - } + final Set<ManagementAction> actions = new HashSet<>(); + final TabletMetadata tm = + TabletMetadata.convertRow(kvIter, TabletManagement.CONFIGURED_COLUMNS, false, true); + + Exception error = null; + try { + LOG.trace("Evaluating extent: {}", tm); + computeTabletManagementActions(tm, actions); + } catch (Exception e) { + LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); + error = e; + } - // This key is being created exactly the same way as the whole row iterator creates keys. - // This is important for ensuring that seek works as expected in the continue case. See - // WholeRowIterator seek function for details, it looks for keys w/o columns. - topKey = new Key(decodedRow.firstKey().getRow()); - topValue = WholeRowIterator.encodeRow(new ArrayList<>(decodedRow.keySet()), - new ArrayList<>(decodedRow.values())); - LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), actions); - return; + if (!actions.isEmpty() || error != null) { + if (error != null) { + // Insert the error into K,V pair representing + // the tablet metadata. + TabletManagement.addError((k, v) -> { + keys.add(k); + values.add(v); + }, currentRow, error); + } else if (!actions.isEmpty()) { + // If we simply returned here, then the client would get the encoded K,V + // from the WholeRowIterator. However, it would not know the reason(s) why + // it was returned. Insert a K,V pair to represent the reasons. The client + // can pull this K,V pair from the results by looking at the colf. + TabletManagement.addActions((k, v) -> { + keys.add(k); + values.add(v); + }, currentRow, actions); } - LOG.trace("No reason to return extent {}, continuing", tm.getExtent()); - getSource().next(); + // This key is being created exactly the same way as the whole row iterator creates keys. + // This is important for ensuring that seek works as expected in the continue case. See + // WholeRowIterator seek function for details, it looks for keys w/o columns. + LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), actions); + return true; } + + LOG.trace("No reason to return extent {}, continuing", tm.getExtent()); + return false; } private static final Set<ManagementAction> REASONS_NOT_TO_SPLIT_OR_COMPACT = @@ -285,7 +300,8 @@ public class TabletManagementIterator extends SkippingIterator { if (tm.getOperationId() == null && tabletMgmtParams.isTableOnline(tm.getTableId()) && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, reasonsToReturnThisTablet)) { try { - if (shouldReturnDueToSplit(tm, this.env.getPluginEnv().getConfiguration(tm.getTableId()))) { + if (shouldReturnDueToSplit(tm, this.env.getPluginEnv().getConfiguration(tm.getTableId()), + splitConfig)) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); } // important to call this since reasonsToReturnThisTablet is passed to it diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index cbb5c280ac..504d9e5589 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -34,7 +34,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -79,7 +78,6 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat final String zpath = ctx.getZooKeeperRoot() + RootTable.ZROOT_TABLET; final TabletIteratorEnvironment env = new TabletIteratorEnvironment(ctx, IteratorScope.scan, ctx.getTableConfiguration(AccumuloTable.ROOT.tableId()), AccumuloTable.ROOT.tableId()); - final WholeRowIterator wri = new WholeRowIterator(); final TabletManagementIterator tmi = new TabletManagementIterator(); final AtomicBoolean closed = new AtomicBoolean(false); @@ -88,8 +86,7 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat ctx.getZooReaderWriter().getZooKeeper().getData(zpath, false, null); final RootTabletMetadata rtm = new RootTabletMetadata(new String(rootTabletMetadata, UTF_8)); final SortedMapIterator iter = new SortedMapIterator(rtm.toKeyValues()); - wri.init(iter, Map.of(), env); - tmi.init(wri, + tmi.init(iter, Map.of(TabletManagementIterator.TABLET_GOAL_STATE_PARAMS_OPTION, parameters.serialize()), env); tmi.seek(new Range(), null, true); diff --git a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java index 5cf5c9edc6..e844deb779 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java @@ -29,8 +29,6 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.function.Predicate; -import org.apache.accumulo.core.client.PluginEnvironment.Configuration; -import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -294,12 +292,6 @@ public class SplitUtils { return needsSplit(splitThreshold, tabletMetadata); } - public static boolean needsSplit(final Configuration tableConf, TabletMetadata tabletMetadata) { - var splitThreshold = ConfigurationTypeHelper - .getFixedMemoryAsBytes(tableConf.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); - return needsSplit(splitThreshold, tabletMetadata); - } - public static boolean needsSplit(long splitThreshold, TabletMetadata tabletMetadata) { return tabletMetadata.getFileSize() > splitThreshold; } diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java index a673777721..599a44c1c1 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java @@ -42,10 +42,10 @@ public class TabletManagementIteratorTest { Key badStartKey = new Key("row", "colf", "colq", 1234L); Key badEndKey = new Key("rowEnd", "colf", "colq", 1234L); - assertThrows(IllegalStateException.class, () -> iter.seek(null, Set.of(), false)); - assertThrows(IllegalStateException.class, + assertThrows(NullPointerException.class, () -> iter.seek(null, Set.of(), false)); + assertThrows(NullPointerException.class, () -> iter.seek(new Range((Key) null, (Key) null), Set.of(), false)); - assertThrows(IllegalStateException.class, + assertThrows(NullPointerException.class, () -> iter.seek(new Range(goodStartKey, goodEndKey), Set.of(), false)); assertTrue(assertThrows(IllegalArgumentException.class, () -> iter.seek(new Range(goodStartKey, badEndKey), Set.of(), false)).getMessage() diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java index 9bc99cd0c4..207c1d0a01 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java @@ -129,7 +129,7 @@ public class TabletManagementTest { final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); - TabletManagement.addActions(entries, actions); + TabletManagement.addActions(entries::put, entries.firstKey().getRow(), actions); Key key = entries.firstKey(); Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), new ArrayList<>(entries.values())); @@ -149,7 +149,8 @@ public class TabletManagementTest { final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); - TabletManagement.addError(entries, new UnsupportedOperationException("Not supported.")); + TabletManagement.addError(entries::put, entries.firstKey().getRow(), + new UnsupportedOperationException("Not supported.")); Key key = entries.firstKey(); Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), new ArrayList<>(entries.values())); @@ -174,7 +175,7 @@ public class TabletManagementTest { final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); - TabletManagement.addActions(entries, actions); + TabletManagement.addActions(entries::put, entries.firstKey().getRow(), actions); Key key = entries.firstKey(); Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), new ArrayList<>(entries.values())); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index dd14024db6..e70e151ace 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -648,8 +648,9 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { inProgress.forEach(hostingRequestInProgress::remove); } - manager.getEventCoordinator().event(success, "Tablet hosting requested for %d tablets in %s", - success.size(), tableId); + manager.getEventCoordinator().event(success, + "Tablet hosting requested for %d of %d tablets in %s", success.size(), extents.size(), + tableId); } protected TableId getTableId(ClientContext context, String tableName)