This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 4e06b2f054 Avoids full table scans in TabletGroupWatcher in some situations. (#3705) 4e06b2f054 is described below commit 4e06b2f05423469bab2c6d7b4ed27f147c5ead3c Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Oct 6 16:17:03 2023 -0400 Avoids full table scans in TabletGroupWatcher in some situations. (#3705) The manager has an internal class called EventCoordinator that is used to signal that something happened in the manager. When anything signaled an event happened this would cause the tablet group watcher to scan the entire metadata table. This commit makes a few changes to improve this. First it added the ability to EventCoordinator to signal with more specific information, like a something happened with a table or a tablet. Second the TabletGroupWatcher was adapted to react to this more specific signaling with narrower scans of the metadata table. If a signal is made that something changed with a table, then only that tables metadata will be scanned. The TabletGroupWatcher will still do full scans periodically (based on configuration) or when a something signals a full scan is needed. A new IT was added that creates a 1 million splits in a table and then reads and writes to around 100 random tablets. The tablets are ondemand so must be brought online for the read and write. If doing bringing them onling takes too long (because a full tables scan is done) then the test might timeout and fail. --- .../apache/accumulo/core/logging/TabletLogger.java | 3 +- ...TabletStateStore.java => ClosableIterable.java} | 23 +- .../manager/state/LoggingTabletStateStore.java | 10 +- .../server/manager/state/MetaDataStateStore.java | 16 +- .../server/manager/state/RootTabletStateStore.java | 9 +- .../manager/state/TabletManagementIterator.java | 7 +- .../manager/state/TabletManagementScanner.java | 25 +- .../server/manager/state/TabletStateStore.java | 21 +- .../server/manager/state/ZooTabletStateStore.java | 10 +- .../apache/accumulo/manager/EventCoordinator.java | 123 +++- .../java/org/apache/accumulo/manager/Manager.java | 17 +- .../manager/ManagerClientServiceHandler.java | 32 +- .../accumulo/manager/TabletGroupWatcher.java | 657 +++++++++++++-------- .../coordinator/CompactionCoordinator.java | 9 +- .../manager/tableOps/ChangeTableState.java | 2 +- .../manager/tableOps/clone/FinishCloneTable.java | 4 +- .../manager/tableOps/compact/CompactionDriver.java | 19 +- .../manager/tableOps/create/FinishCreateTable.java | 3 +- .../manager/tableOps/delete/DeleteTable.java | 2 +- .../manager/tableOps/split/DeleteOperationIds.java | 4 + .../tableOps/tableImport/FinishImportTable.java | 2 +- .../accumulo/test/functional/SplitMillionIT.java | 132 +++++ .../test/functional/TabletManagementScannerIT.java | 189 ------ 23 files changed, 740 insertions(+), 579 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index e737805761..8295b2c917 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@ -93,7 +93,8 @@ public class TabletLogger { } public static void split(KeyExtent parent, SortedSet<Text> splits) { - locLog.debug("Split {} into {}", parent, splits); + locLog.debug("Split {} into {} tablets", parent, splits.size() + 1); + locLog.trace("Split {} into {}", parent, splits); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java similarity index 52% copy from server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java copy to server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java index e34048512a..fcf5f8de41 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java @@ -18,26 +18,7 @@ */ package org.apache.accumulo.server.manager.state; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.manager.state.TabletManagement; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; - -class RootTabletStateStore extends MetaDataStateStore { - - RootTabletStateStore(DataLevel level, ClientContext context, CurrentState state) { - super(level, context, state, RootTable.NAME); - } - - @Override - public ClosableIterator<TabletManagement> iterator() { - return new TabletManagementScanner(context, TabletsSection.getRange(), state, RootTable.NAME, - knownStateChanges); - } - +public interface ClosableIterable<T> extends Iterable<T> { @Override - public String name() { - return "Metadata Tablets"; - } + ClosableIterator<T> iterator(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index 1fdd21b86a..ce2b3ede02 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; @@ -54,13 +55,8 @@ class LoggingTabletStateStore implements TabletStateStore { } @Override - public ClosableIterator<TabletManagement> iterator() { - return wrapped.iterator(); - } - - @Override - public boolean addTabletStateChange(TabletManagement tablet) { - return this.wrapped.addTabletStateChange(tablet); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { + return wrapped.iterator(ranges); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 7cf77c311a..d69c114f5b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -19,15 +19,15 @@ package org.apache.accumulo.server.manager.state; import java.util.Collection; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.List; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; class MetaDataStateStore extends AbstractTabletStateStore implements TabletStateStore { @@ -37,8 +37,6 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState private final String targetTableName; private final Ample ample; private final DataLevel level; - protected final ArrayBlockingQueue<TabletManagement> knownStateChanges = - new ArrayBlockingQueue<>(1_000); protected MetaDataStateStore(DataLevel level, ClientContext context, CurrentState state, String targetTableName) { @@ -60,14 +58,8 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState } @Override - public ClosableIterator<TabletManagement> iterator() { - return new TabletManagementScanner(context, TabletsSection.getRange(), state, targetTableName, - knownStateChanges); - } - - @Override - public boolean addTabletStateChange(TabletManagement tablet) { - return this.knownStateChanges.offer(tablet); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { + return new TabletManagementScanner(context, ranges, state, targetTableName); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java index e34048512a..4b574ade13 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java @@ -18,11 +18,13 @@ */ package org.apache.accumulo.server.manager.state; +import java.util.List; + import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; class RootTabletStateStore extends MetaDataStateStore { @@ -31,9 +33,8 @@ class RootTabletStateStore extends MetaDataStateStore { } @Override - public ClosableIterator<TabletManagement> iterator() { - return new TabletManagementScanner(context, TabletsSection.getRange(), state, RootTable.NAME, - knownStateChanges); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { + return new TabletManagementScanner(context, ranges, state, RootTable.NAME); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index ea50b01662..e274d275a7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -275,7 +275,7 @@ public class TabletManagementIterator extends SkippingIterator { final long sumOfFileSizes = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); final boolean shouldSplit = sumOfFileSizes > splitThreshold; - LOG.debug("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), sumOfFileSizes, + LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), sumOfFileSizes, splitThreshold, shouldSplit); return shouldSplit; } @@ -292,8 +292,8 @@ public class TabletManagementIterator extends SkippingIterator { onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null; TabletState state = TabletState.compute(tm, current, balancer, tserverResourceGroups); - if (LOG.isDebugEnabled()) { - LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {}, hostingRequested: {}", + if (LOG.isTraceEnabled()) { + LOG.trace("{} is {}. Table is {}line. Tablet hosting goal is {}, hostingRequested: {}", tm.getExtent(), state, (shouldBeOnline ? "on" : "off"), tm.getHostingGoal(), tm.getHostingRequested()); } @@ -402,7 +402,6 @@ public class TabletManagementIterator extends SkippingIterator { balancer = Property.createInstanceFromPropertyName(conf, Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new SimpleLoadBalancer()); balancer.init(benv); - LOG.debug("Balancer is set to {}", balancer.getClass().getSimpleName()); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java index 2ad43397d9..253d6e4f45 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java @@ -20,15 +20,12 @@ package org.apache.accumulo.server.manager.state; import java.io.IOException; import java.lang.ref.Cleaner.Cleanable; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.AccumuloException; @@ -60,11 +57,10 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen private final BatchScanner mdScanner; private final Iterator<Entry<Key,Value>> iter; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Queue<TabletManagement> knownTabletModifications; // This constructor is called from TabletStateStore implementations - public TabletManagementScanner(ClientContext context, Range range, CurrentState state, - String tableName, Queue<TabletManagement> knownTabletModifications) { + public TabletManagementScanner(ClientContext context, List<Range> ranges, CurrentState state, + String tableName) { // scan over metadata table, looking for tablets in the wrong state based on the live servers // and online tables try { @@ -95,14 +91,13 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen } cleanable = CleanerUtil.unclosed(this, TabletManagementScanner.class, closed, log, mdScanner); TabletManagementIterator.configureScanner(mdScanner, state); - mdScanner.setRanges(Collections.singletonList(range)); + mdScanner.setRanges(ranges); iter = mdScanner.iterator(); - this.knownTabletModifications = knownTabletModifications; } // This constructor is called from utilities and tests public TabletManagementScanner(ClientContext context, Range range, String tableName) { - this(context, range, null, tableName, new ArrayBlockingQueue<>(1_000)); + this(context, List.of(range), null, tableName); } @Override @@ -120,9 +115,7 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen if (closed.get()) { return false; } - if (!knownTabletModifications.isEmpty()) { - return true; - } + boolean result = iter.hasNext(); if (!result) { close(); @@ -135,13 +128,7 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen if (closed.get()) { throw new NoSuchElementException(this.getClass().getSimpleName() + " is closed"); } - if (!knownTabletModifications.isEmpty()) { - TabletManagement tm = knownTabletModifications.poll(); - log.trace("Returning known tablet modification, extent: {}, hostingGoal: {}, actions: {}", - tm.getTabletMetadata().getExtent(), tm.getTabletMetadata().getHostingGoal(), - tm.getActions()); - return tm; - } + Entry<Key,Value> e = iter.next(); try { TabletManagement tm = TabletManagementIterator.decode(e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index c44f01bd2d..19cc5b5832 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -23,10 +23,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; @@ -34,7 +36,7 @@ import org.apache.hadoop.fs.Path; /** * Interface for storing information about tablet assignments. There are three implementations: */ -public interface TabletStateStore extends Iterable<TabletManagement> { +public interface TabletStateStore extends ClosableIterable<TabletManagement> { /** * Get the level for this state store @@ -47,19 +49,18 @@ public interface TabletStateStore extends Iterable<TabletManagement> { String name(); /** - * Scan the information about the tablets covered by this store + * Scan the information about the tablets covered by this store that have end row in the specified + * ranges. */ - @Override - ClosableIterator<TabletManagement> iterator(); + ClosableIterator<TabletManagement> iterator(List<Range> ranges); /** - * Notify this TabletStateStore that a Tablet needs to be returned by the iterator() method so - * that the TabletGroupWatcher can perform the associated action on this Tablet. - * - * @param tablet TabletMetadata and Action that needs to be performed on it. - * @return true if tablet modification accepted for processing, false otherwise. + * Scan the information about all tablets covered by this store.. */ - boolean addTabletStateChange(TabletManagement tablet); + @Override + default ClosableIterator<TabletManagement> iterator() { + return iterator(List.of(MetadataSchema.TabletsSection.getRange())); + } /** * Store the assigned locations in the data store. diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index cc6aff0519..7fa5158da5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -23,6 +23,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.metadata.RootTable; @@ -59,7 +60,7 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat } @Override - public ClosableIterator<TabletManagement> iterator() { + public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { return new ClosableIterator<>() { boolean finished = false; @@ -97,13 +98,6 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat }; } - @Override - public boolean addTabletStateChange(TabletManagement tablet) { - // This method does nothing, this TabletStateStore always returns - // the TabletManagement object for the Root Tablet. - return true; - } - private static void validateAssignments(Collection<Assignment> assignments) { if (assignments.size() != 1) { throw new IllegalArgumentException("There is only one root tablet"); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java index 4145f58189..649f259fa1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java @@ -18,13 +18,23 @@ */ package org.apache.accumulo.manager; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class EventCoordinator { private static final Logger log = LoggerFactory.getLogger(EventCoordinator.class); - long eventCounter = 0; + + private long eventCounter = 0; synchronized long waitForEvents(long millis, long lastEvent) { // Did something happen since the last time we waited? @@ -42,20 +52,121 @@ public class EventCoordinator { return eventCounter; } - public synchronized void event(String msg, Object... args) { + private final Map<Ample.DataLevel,Listener> listeners = new EnumMap<>(Ample.DataLevel.class); + + public enum EventScope { + ALL, DATA_LEVEL, TABLE, TABLE_RANGE + } + + public static class Event { + + private final EventScope scope; + private final Ample.DataLevel level; + private final KeyExtent extent; + + Event(EventScope scope, KeyExtent extent) { + this.scope = scope; + this.level = Ample.DataLevel.of(extent.tableId()); + this.extent = extent; + } + + Event(EventScope scope, TableId tableId) { + this.scope = scope; + this.level = Ample.DataLevel.of(tableId); + this.extent = new KeyExtent(tableId, null, null); + } + + Event(EventScope scope, Ample.DataLevel level) { + this.scope = scope; + this.level = level; + this.extent = null; + } + + Event() { + this.scope = EventScope.ALL; + this.level = null; + this.extent = null; + } + + public EventScope getScope() { + return scope; + } + + public Ample.DataLevel getLevel() { + Preconditions.checkState(scope != EventScope.ALL); + return level; + } + + public TableId getTableId() { + Preconditions.checkState(scope == EventScope.TABLE || scope == EventScope.TABLE_RANGE); + return extent.tableId(); + } + + public KeyExtent getExtent() { + Preconditions.checkState(scope == EventScope.TABLE || scope == EventScope.TABLE_RANGE); + return extent; + } + } + + public void event(String msg, Object... args) { + log.info(String.format(msg, args)); + publish(new Event()); + } + + public void event(Ample.DataLevel level, String msg, Object... args) { log.info(String.format(msg, args)); + publish(new Event(EventScope.DATA_LEVEL, level)); + } + + public void event(TableId tableId, String msg, Object... args) { + log.info(String.format(msg, args)); + publish(new Event(EventScope.TABLE, tableId)); + } + + public void event(KeyExtent extent, String msg, Object... args) { + log.debug(String.format(msg, args)); + publish(new Event(EventScope.TABLE_RANGE, extent)); + } + + public void event(Collection<KeyExtent> extents, String msg, Object... args) { + if (!extents.isEmpty()) { + log.debug(String.format(msg, args)); + extents.forEach(extent -> publish(new Event(EventScope.TABLE_RANGE, extent))); + } + } + + private synchronized void publish(Event event) { + if (event.getScope() == EventScope.ALL) { + listeners.values().forEach(listener -> listener.process(event)); + } else { + listeners.getOrDefault(event.getLevel(), e -> {}).process(event); + } + eventCounter++; notifyAll(); } - public Listener getListener() { - return new Listener(); + public interface Listener { + void process(Event event); + } + + public synchronized void addListener(Ample.DataLevel level, Listener listener) { + // Currently only expecting one listener for each level, so keeping the code simple and + // detecting deviations. Can adept if needed. + Preconditions.checkState(listeners.put(level, listener) == null); + } + + public Tracker getTracker() { + return new Tracker(); } - public class Listener { + /** + * Tracks the event counter and helps detect changes in it. + */ + public class Tracker { long lastEvent; - Listener() { + Tracker() { lastEvent = eventCounter; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0f539e199f..4fbe6bc407 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -543,7 +543,8 @@ public class Manager extends AbstractServer } mergeLock.notifyAll(); } - nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); + nextEvent.event(info.getExtent().tableId(), "Merge state of %s set to %s", info.getExtent(), + state); } public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException { @@ -552,7 +553,7 @@ public class Manager extends AbstractServer getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); mergeLock.notifyAll(); } - nextEvent.event("Merge state of %s cleared", tableId); + nextEvent.event(tableId, "Merge state of %s cleared", tableId); } void setManagerGoalState(ManagerGoalState state) { @@ -614,7 +615,7 @@ public class Manager extends AbstractServer return txids; }); - nextEvent.event("Unassignment requested %s", tablet); + nextEvent.event(tablet, "Unassignment requested %s", tablet); } public void cancelUnassignmentRequest(KeyExtent tablet, long fateTxid) { @@ -627,7 +628,7 @@ public class Manager extends AbstractServer return v; }); - nextEvent.event("Unassignment request canceled %s", tablet); + nextEvent.event(tablet, "Unassignment request canceled %s", tablet); } public boolean isUnassignmentRequested(KeyExtent extent) { @@ -856,7 +857,7 @@ public class Manager extends AbstractServer @Override public void run() { - EventCoordinator.Listener eventListener = nextEvent.getListener(); + EventCoordinator.Tracker eventTracker = nextEvent.getTracker(); while (stillManager()) { long wait = DEFAULT_WAIT_FOR_WATCHER; try { @@ -935,7 +936,7 @@ public class Manager extends AbstractServer Span span = TraceUtil.startSpan(this.getClass(), "run::updateStatus"); try (Scope scope = span.makeCurrent()) { wait = updateStatus(); - eventListener.waitForEvents(wait); + eventTracker.waitForEvents(wait); } catch (Exception t) { TraceUtil.setException(span, t, false); log.error("Error balancing tablets, will wait for {} (seconds) and then retry ", @@ -1172,7 +1173,7 @@ public class Manager extends AbstractServer fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); compactionCoordinator = - new CompactionCoordinator(context, tserverSet, security, compactionJobQueues); + new CompactionCoordinator(context, tserverSet, security, compactionJobQueues, nextEvent); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail ManagerClientService.Iface haProxy = @@ -1697,7 +1698,7 @@ public class Manager extends AbstractServer @Override public void stateChanged(TableId tableId, TableState state) { - nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); + nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", tableId, state); if (state == TableState.OFFLINE) { clearMigrations(tableId); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 89e73c038a..37b6bc9456 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -32,7 +32,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -59,8 +58,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.manager.state.TabletManagement; -import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; @@ -72,10 +69,8 @@ import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletDeletedException; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.securityImpl.thrift.TDelegationToken; @@ -306,9 +301,9 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { } if (stopTabletServers) { manager.setManagerGoalState(ManagerGoalState.CLEAN_STOP); - EventCoordinator.Listener eventListener = manager.nextEvent.getListener(); + EventCoordinator.Tracker eventTracker = manager.nextEvent.getTracker(); do { - eventListener.waitForEvents(Manager.ONE_SECOND); + eventTracker.waitForEvents(Manager.ONE_SECOND); } while (manager.tserverSet.size() > 0); } manager.setManagerState(ManagerState.STOP); @@ -362,10 +357,10 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { Manager.log.error("{} reports assignment failed for tablet {}", serverName, tablet); break; case LOADED: - manager.nextEvent.event("tablet %s was loaded on %s", tablet, serverName); + manager.nextEvent.event(tablet, "tablet %s was loaded on %s", tablet, serverName); break; case UNLOADED: - manager.nextEvent.event("tablet %s was unloaded from %s", tablet, serverName); + manager.nextEvent.event(tablet, "tablet %s was unloaded from %s", tablet, serverName); break; case UNLOAD_ERROR: Manager.log.error("{} reports unload failed for tablet {}", serverName, tablet); @@ -656,23 +651,8 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { }); } - // Register the successful extent updates with the appropriate TabletStateStore. - // This will bump these tablets to the head of the line and the TabletGroupWatcher - // will process these updates before the ones returned from scanning the underlying - // table. - if (!success.isEmpty()) { - final Set<ManagementAction> actions = Set.of(ManagementAction.NEEDS_LOCATION_UPDATE); - ample.readTablets().forTablets(success, Optional.empty()) - .fetch(TabletManagement.CONFIGURED_COLUMNS.toArray(new ColumnType[] {})).build() - .forEach(tm -> { - manager.getTabletStateStore(DataLevel.of(tm.getTableId())) - .addTabletStateChange(new TabletManagement(actions, tm)); - }); - } - - // this will kick the tablet group watcher into action - manager.getEventCoordinator().event("Tablet hosting requested tableId:%s extents:%d", tableId, - extents.size()); + manager.getEventCoordinator().event(success, "Tablet hosting requested for %d tablets in %s", + success.size(), tableId); } protected TableId getTableId(ClientContext context, String tableName) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index df7fee1e21..627e2ab427 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -38,7 +38,11 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Stream; @@ -65,6 +69,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; @@ -92,6 +97,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.Manager.TabletGoalState; import org.apache.accumulo.manager.split.SplitTask; @@ -152,12 +158,18 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private final TabletGroupWatcher dependentWatcher; final TableStats stats = new TableStats(); private SortedSet<TServerInstance> lastScanServers = Collections.emptySortedSet(); + private final EventHandler eventHandler; + + private WalStateManager walStateManager; TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher) { super("Watching " + store.name()); this.manager = manager; this.store = store; this.dependentWatcher = dependentWatcher; + this.walStateManager = new WalStateManager(manager.getContext()); + this.eventHandler = new EventHandler(); + manager.getEventCoordinator().addListener(store.getLevel(), eventHandler); } /** Should this {@code TabletGroupWatcher} suspend tablets? */ @@ -210,273 +222,397 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - @Override - public void run() { - int[] oldCounts = new int[TabletState.values().length]; - EventCoordinator.Listener eventListener = this.manager.nextEvent.getListener(); + class EventHandler implements EventCoordinator.Listener { - WalStateManager wals = new WalStateManager(manager.getContext()); + // Setting this to true to start with because its not know what happended before this object was + // created, so just start off with full scan. + private boolean needsFullScan = true; - while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + private final BlockingQueue<Range> rangesToProcess; - final long waitTimeBetweenScans = manager.getConfiguration() - .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + class RangeProccessor implements Runnable { + @Override + public void run() { + try { + while (manager.stillManager()) { + var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); + if (range == null) { + // check to see if still the manager + continue; + } - int totalUnloaded = 0; - int unloaded = 0; - ClosableIterator<TabletManagement> iter = null; - try { - Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); - Map<TableId,MergeStats> currentMerges = new HashMap<>(); - for (MergeInfo merge : manager.merges()) { - if (merge.getExtent() != null) { - currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + ArrayList<Range> ranges = new ArrayList<>(); + ranges.add(range); + + rangesToProcess.drainTo(ranges); + + if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { + // only do full scans when trying to shutdown + setNeedsFullScan(); + continue; + } + + var currentTservers = getCurrentTservers(); + if (currentTservers.isEmpty()) { + setNeedsFullScan(); + continue; + } + + try (var iter = store.iterator(ranges)) { + long t1 = System.currentTimeMillis(); + manageTablets(iter, currentTservers, false); + long t2 = System.currentTimeMillis(); + Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", + store.name(), (t2 - t1) / 1000., ranges.size())); + } catch (Exception e) { + Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), + store.name(), e); + } } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } - // Get the current status for the current list of tservers - final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { - currentTServers.put(entry, manager.tserverStatus.get(entry)); - } + EventHandler() { + rangesToProcess = new ArrayBlockingQueue<>(3000); - if (currentTServers.isEmpty()) { - eventListener.waitForEvents(waitTimeBetweenScans); - synchronized (this) { - lastScanServers = Collections.emptySortedSet(); + Threads + .createThread("TGW [" + store.name() + "] event range processor", new RangeProccessor()) + .start(); + } + + private synchronized void setNeedsFullScan() { + needsFullScan = true; + notifyAll(); + } + + public synchronized void clearNeedsFullScan() { + needsFullScan = false; + } + + @Override + public void process(EventCoordinator.Event event) { + + switch (event.getScope()) { + case ALL: + case DATA_LEVEL: + setNeedsFullScan(); + break; + case TABLE: + case TABLE_RANGE: + if (!rangesToProcess.offer(event.getExtent().toMetaRange())) { + Manager.log.debug("[{}] unable to process event range {} because queue is full", + store.name(), event.getExtent()); + setNeedsFullScan(); } - continue; + break; + default: + throw new IllegalArgumentException("Unhandled scope " + event.getScope()); + } + } + + synchronized void waitForFullScan(long millis) { + if (!needsFullScan) { + try { + wait(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } + } - final Map<String,Set<TServerInstance>> currentTServerGrouping = - manager.tserverSet.getCurrentServersGroups(); + private static class TableMgmtStats { + int[] counts = new int[TabletState.values().length]; + private int totalUnloaded; + } - TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping); + private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, + SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) + throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException, + IOException { - ManagerState managerState = manager.getManagerState(); - int[] counts = new int[TabletState.values().length]; - stats.begin(); + TableMgmtStats tableMgmtStats = new TableMgmtStats(); + int unloaded = 0; - CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( - new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); + Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); + Map<TableId,MergeStats> currentMerges = new HashMap<>(); + for (MergeInfo merge : manager.merges()) { + if (merge.getExtent() != null) { + currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + } + } - final Map<TabletServerId,String> resourceGroups = new HashMap<>(); - manager.tServerResourceGroups().forEach((group, tservers) -> { - tservers.stream().map(TabletServerIdImpl::new) - .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); - }); + final Map<String,Set<TServerInstance>> currentTServerGrouping = + manager.tserverSet.getCurrentServersGroups(); - // Walk through the tablets in our store, and work tablets - // towards their goal - iter = store.iterator(); - while (iter.hasNext()) { - final TabletManagement mti = iter.next(); - if (mti == null) { - throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); - } + TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping); - final Set<ManagementAction> actions = mti.getActions(); - final TabletMetadata tm = mti.getTabletMetadata(); + CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( + new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); - if (tm.isFutureAndCurrentLocationSet()) { - throw new BadLocationStateException( - tm.getExtent() + " is both assigned and hosted, which should never happen: " + this, - tm.getExtent().toMetaRow()); - } - if (tm.isOperationIdAndCurrentLocationSet()) { - throw new BadLocationStateException(tm.getExtent() + final Map<TabletServerId,String> resourceGroups = new HashMap<>(); + manager.tServerResourceGroups().forEach((group, tservers) -> { + tservers.stream().map(TabletServerIdImpl::new) + .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); + }); + + while (iter.hasNext()) { + final TabletManagement mti = iter.next(); + if (mti == null) { + throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); + } + + final Set<ManagementAction> actions = mti.getActions(); + final TabletMetadata tm = mti.getTabletMetadata(); + + if (tm.isFutureAndCurrentLocationSet()) { + throw new BadLocationStateException( + tm.getExtent() + " is both assigned and hosted, which should never happen: " + this, + tm.getExtent().toMetaRow()); + } + if (tm.isOperationIdAndCurrentLocationSet()) { + throw new BadLocationStateException( + tm.getExtent() + " has both operation id and current location, which should never happen: " + this, - tm.getExtent().toMetaRow()); - } + tm.getExtent().toMetaRow()); + } - final TableId tableId = tm.getTableId(); - // ignore entries for tables that do not exist in zookeeper - if (manager.getTableManager().getTableState(tableId) == null) { - continue; - } + final TableId tableId = tm.getTableId(); + // ignore entries for tables that do not exist in zookeeper + if (manager.getTableManager().getTableState(tableId) == null) { + continue; + } - // Don't overwhelm the tablet servers with work - if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(tLists, wals); - tLists.reset(); - unloaded = 0; - eventListener.waitForEvents(waitTimeBetweenScans); - } - final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - - final MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> { - var mStats = currentMerges.get(k); - return mStats != null ? mStats : new MergeStats(new MergeInfo()); - }); - TabletGoalState goal = manager.getGoalState(tm, mergeStats.getMergeInfo()); - TabletState state = TabletState.compute(tm, currentTServers.keySet(), - manager.tabletBalancer, resourceGroups); - - final Location location = tm.getLocation(); - Location current = null; - Location future = null; - if (tm.hasCurrent()) { - current = tm.getLocation(); - } else { - future = tm.getLocation(); - } - TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(), - future != null ? future.getServerInstance() : null, - current != null ? current.getServerInstance() : null, tm.getLogs().size()); + // Don't overwhelm the tablet servers with work + if (tLists.unassigned.size() + unloaded + > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { + flushChanges(tLists); + tLists.reset(); + unloaded = 0; + } - stats.update(tableId, state); - mergeStats.update(tm.getExtent(), state); + final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); + + final MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> { + var mStats = currentMerges.get(k); + return mStats != null ? mStats : new MergeStats(new MergeInfo()); + }); + TabletGoalState goal = manager.getGoalState(tm, mergeStats.getMergeInfo()); + TabletState state = + TabletState.compute(tm, currentTServers.keySet(), manager.tabletBalancer, resourceGroups); + + final Location location = tm.getLocation(); + Location current = null; + Location future = null; + if (tm.hasCurrent()) { + current = tm.getLocation(); + } else { + future = tm.getLocation(); + } + TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(), + future != null ? future.getServerInstance() : null, + current != null ? current.getServerInstance() : null, tm.getLogs().size()); - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { + if (isFullScan) { + stats.update(tableId, state); + } + mergeStats.update(tm.getExtent(), state); + + // Always follow through with assignments + if (state == TabletState.ASSIGNED) { + goal = TabletGoalState.HOSTED; + } else if (state == TabletState.NEEDS_REASSIGNMENT) { + goal = TabletGoalState.UNASSIGNED; + } + + if (Manager.log.isTraceEnabled()) { + Manager.log.trace( + "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}", + store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), + dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), + state, goal, actions); + } + + // if we are shutting down all the tabletservers, we have to do it in order + if (isFullScan && (goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) + && manager.serversToShutdown.equals(currentTServers.keySet())) { + if (dependentWatcher != null) { + // If the dependentWatcher is for the user tables, check to see + // that user tables exist. + DataLevel dependentLevel = dependentWatcher.store.getLevel(); + boolean userTablesExist = true; + switch (dependentLevel) { + case USER: + Set<TableId> onlineTables = manager.onlineTables(); + onlineTables.remove(RootTable.ID); + onlineTables.remove(MetadataTable.ID); + userTablesExist = !onlineTables.isEmpty(); + break; + case METADATA: + case ROOT: + default: + break; + } + // If the stats object in the dependentWatcher is empty, then it + // currently does not have data about what is hosted or not. In + // that case host these tablets until the dependent watcher can + // gather some data. + final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); + if (dependentLevel == DataLevel.USER) { + if (userTablesExist + && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { + goal = TabletGoalState.HOSTED; + } + } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { goal = TabletGoalState.HOSTED; - } else if (state == TabletState.NEEDS_REASSIGNMENT) { - goal = TabletGoalState.UNASSIGNED; } - if (Manager.log.isTraceEnabled()) { - Manager.log.trace( - "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), - dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), - tm.getExtent(), state, goal, actions); + } + } + + if (actions.contains(ManagementAction.NEEDS_SPLITTING)) { + LOG.debug("{} may need splitting.", tm.getExtent()); + if (manager.getSplitter().isSplittable(tm)) { + if (manager.getSplitter().addSplitStarting(tm.getExtent())) { + LOG.debug("submitting tablet {} for split", tm.getExtent()); + manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); } + } else { + LOG.debug("{} is not splittable.", tm.getExtent()); + } + // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this commented out code to + // show where merge used to make a call to split a tablet. + // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); + } - // if we are shutting down all the tabletservers, we have to do it in order - if ((goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) - && manager.serversToShutdown.equals(currentTServers.keySet())) { - if (dependentWatcher != null) { - // If the dependentWatcher is for the user tables, check to see - // that user tables exist. - DataLevel dependentLevel = dependentWatcher.store.getLevel(); - boolean userTablesExist = true; - switch (dependentLevel) { - case USER: - Set<TableId> onlineTables = manager.onlineTables(); - onlineTables.remove(RootTable.ID); - onlineTables.remove(MetadataTable.ID); - userTablesExist = !onlineTables.isEmpty(); - break; - case METADATA: - case ROOT: - default: - break; - } - // If the stats object in the dependentWatcher is empty, then it - // currently does not have data about what is hosted or not. In - // that case host these tablets until the dependent watcher can - // gather some data. - final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); - if (dependentLevel == DataLevel.USER) { - if (userTablesExist - && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { - goal = TabletGoalState.HOSTED; - } - } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { - goal = TabletGoalState.HOSTED; + if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { + var jobs = compactionGenerator.generateJobs(tm, + TabletManagementIterator.determineCompactionKinds(actions)); + LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); + manager.getCompactionQueues().add(tm, jobs); + } + + // ELASITICITY_TODO the case where a planner generates compactions at time T1 for tablet + // and later at time T2 generates nothing for the same tablet is not being handled. At + // time T1 something could have been queued. However at time T2 we will not clear those + // entries from the queue because we see nothing here for that case. After a full + // metadata scan could remove any tablets that were not updated during the scan. + + if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE) + || actions.contains(ManagementAction.IS_MERGING)) { + if (goal == TabletGoalState.HOSTED) { + if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty()) + && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) { + continue; + } + switch (state) { + case HOSTED: + if (location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) { + manager.migrations.remove(tm.getExtent()); } - } + break; + case ASSIGNED_TO_DEAD_SERVER: + hostDeadTablet(tLists, tm, location); + break; + case SUSPENDED: + hostSuspendedTablet(tLists, tm, location, tableConf); + break; + case UNASSIGNED: + hostUnassignedTablet(tLists, tm.getExtent(), + new UnassignedTablet(location, tm.getLast())); + break; + case ASSIGNED: + // Send another reminder + tLists.assigned.add(new Assignment(tm.getExtent(), + future != null ? future.getServerInstance() : null, tm.getLast())); + break; + default: + break; } - - if (actions.contains(ManagementAction.NEEDS_SPLITTING)) { - LOG.debug("{} may need splitting.", tm.getExtent()); - if (manager.getSplitter().isSplittable(tm)) { - if (manager.getSplitter().addSplitStarting(tm.getExtent())) { - LOG.debug("submitting tablet {} for split", tm.getExtent()); - manager.getSplitter() - .executeSplit(new SplitTask(manager.getContext(), tm, manager)); + } else { + switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + tLists.suspendedToGoneServers.add(tm); + cancelOfflineTableMigrations(tm.getExtent()); + break; + case UNASSIGNED: + cancelOfflineTableMigrations(tm.getExtent()); + break; + case ASSIGNED_TO_DEAD_SERVER: + unassignDeadTablet(tLists, tm); + break; + case NEEDS_REASSIGNMENT: + case HOSTED: + TServerConnection client = + manager.tserverSet.getConnection(location.getServerInstance()); + if (client != null) { + client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), + manager.getSteadyTime()); + tableMgmtStats.totalUnloaded++; + unloaded++; + } else { + Manager.log.warn("Could not connect to server {}", location); } - } else { - LOG.debug("{} is not splittable.", tm.getExtent()); - } - // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this commented out code to - // show where merge used to make a call to split a tablet. - // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); + break; + case ASSIGNED: + break; } + } + tableMgmtStats.counts[state.ordinal()]++; + } + } - if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { - var jobs = compactionGenerator.generateJobs(tm, - TabletManagementIterator.determineCompactionKinds(actions)); - LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); - manager.getCompactionQueues().add(tm, jobs); - } + flushChanges(tLists); + return tableMgmtStats; + } - // ELASITICITY_TODO the case where a planner generates compactions at time T1 for tablet - // and later at time T2 generates nothing for the same tablet is not being handled. At - // time T1 something could have been queued. However at time T2 we will not clear those - // entries from the queue because we see nothing here for that case. After a full - // metadata scan could remove any tablets that were not updated during the scan. - - if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE) - || actions.contains(ManagementAction.IS_MERGING)) { - if (goal == TabletGoalState.HOSTED) { - if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty()) - && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) { - continue; - } - switch (state) { - case HOSTED: - if (location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) { - manager.migrations.remove(tm.getExtent()); - } - break; - case ASSIGNED_TO_DEAD_SERVER: - hostDeadTablet(tLists, tm, location, wals); - break; - case SUSPENDED: - hostSuspendedTablet(tLists, tm, location, tableConf); - break; - case UNASSIGNED: - hostUnassignedTablet(tLists, tm.getExtent(), - new UnassignedTablet(location, tm.getLast())); - break; - case ASSIGNED: - // Send another reminder - tLists.assigned.add(new Assignment(tm.getExtent(), - future != null ? future.getServerInstance() : null, tm.getLast())); - break; - default: - break; - } - } else { - switch (state) { - case SUSPENDED: - // Request a move to UNASSIGNED, so as to allow balancing to continue. - tLists.suspendedToGoneServers.add(tm); - cancelOfflineTableMigrations(tm.getExtent()); - break; - case UNASSIGNED: - cancelOfflineTableMigrations(tm.getExtent()); - break; - case ASSIGNED_TO_DEAD_SERVER: - unassignDeadTablet(tLists, tm, wals); - break; - case NEEDS_REASSIGNMENT: - case HOSTED: - TServerConnection client = - manager.tserverSet.getConnection(location.getServerInstance()); - if (client != null) { - client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), - manager.getSteadyTime()); - unloaded++; - totalUnloaded++; - } else { - Manager.log.warn("Could not connect to server {}", location); - } - break; - case ASSIGNED: - break; - } - } - counts[state.ordinal()]++; + private SortedMap<TServerInstance,TabletServerStatus> getCurrentTservers() { + // Get the current status for the current list of tservers + final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); + for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { + currentTServers.put(entry, manager.tserverStatus.get(entry)); + } + return currentTServers; + } + + @Override + public void run() { + int[] oldCounts = new int[TabletState.values().length]; + + while (manager.stillManager()) { + // slow things down a little, otherwise we spam the logs when there are many wake-up events + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + // ELASTICITY_TODO above sleep in the case when not doing a full scan to make manager more + // responsive + + final long waitTimeBetweenScans = manager.getConfiguration() + .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + + var currentTServers = getCurrentTservers(); + + ClosableIterator<TabletManagement> iter = null; + try { + if (currentTServers.isEmpty()) { + eventHandler.waitForFullScan(waitTimeBetweenScans); + synchronized (this) { + lastScanServers = Collections.emptySortedSet(); } + continue; } - // ELASTICITY_TODO: Add handling for other actions + stats.begin(); + + ManagerState managerState = manager.getManagerState(); + + // Clear the need for a full scan before starting a full scan inorder to detect events that + // happen during the full scan. + eventHandler.clearNeedsFullScan(); - flushChanges(tLists, wals); + iter = store.iterator(); + var tabletMgmtStats = manageTablets(iter, currentTServers, true); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); @@ -485,19 +621,21 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // Report changes for (TabletState state : TabletState.values()) { int i = state.ordinal(); - if (counts[i] > 0 && counts[i] != oldCounts[i]) { - manager.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], - state.name()); + if (tabletMgmtStats.counts[i] > 0 && tabletMgmtStats.counts[i] != oldCounts[i]) { + manager.nextEvent.event(store.getLevel(), "[%s]: %d tablets are %s", store.name(), + tabletMgmtStats.counts[i], state.name()); } } - Manager.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), + Manager.log.debug(String.format("[%s]: full scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.)); - oldCounts = counts; - if (totalUnloaded > 0) { - manager.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded); + oldCounts = tabletMgmtStats.counts; + if (tabletMgmtStats.totalUnloaded > 0) { + manager.nextEvent.event(store.getLevel(), "[%s]: %d tablets unloaded", store.name(), + tabletMgmtStats.totalUnloaded); } - updateMergeState(mergeStatsCache); + // TODO + // updateMergeState(mergeStatsCache); synchronized (this) { lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); @@ -505,13 +643,15 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (manager.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { Manager.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), waitTimeBetweenScans / 1000.)); - eventListener.waitForEvents(waitTimeBetweenScans); + eventHandler.waitForFullScan(waitTimeBetweenScans); } else { - Manager.log.info("Detected change in current tserver set, re-running state machine."); + // Create an event at the store level, this will force the next scan to be a full scan + manager.nextEvent.event(store.getLevel(), "Set of tablet servers changed"); } } catch (Exception ex) { Manager.log.error("Error processing table state for store " + store.name(), ex); if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { + // ELASTICITY_TODO review this function repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow()); } else { sleepUninterruptibly(Manager.WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS); @@ -528,12 +668,11 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - private void unassignDeadTablet(TabletLists tLists, TabletMetadata tm, WalStateManager wals) - throws WalMarkerException { + private void unassignDeadTablet(TabletLists tLists, TabletMetadata tm) throws WalMarkerException { tLists.assignedToDeadServers.add(tm); if (!tLists.logsForDeadServers.containsKey(tm.getLocation().getServerInstance())) { tLists.logsForDeadServers.put(tm.getLocation().getServerInstance(), - wals.getWalsInUse(tm.getLocation().getServerInstance())); + walStateManager.getWalsInUse(tm.getLocation().getServerInstance())); } } @@ -582,15 +721,15 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - private void hostDeadTablet(TabletLists tLists, TabletMetadata tm, Location location, - WalStateManager wals) throws WalMarkerException { + private void hostDeadTablet(TabletLists tLists, TabletMetadata tm, Location location) + throws WalMarkerException { tLists.assignedToDeadServers.add(tm); if (location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) { manager.migrations.remove(tm.getExtent()); } TServerInstance tserver = tm.getLocation().getServerInstance(); if (!tLists.logsForDeadServers.containsKey(tserver)) { - tLists.logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); + tLists.logsForDeadServers.put(tserver, walStateManager.getWalsInUse(tserver)); } } @@ -1372,7 +1511,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - private void handleDeadTablets(TabletLists tLists, WalStateManager wals) + private void handleDeadTablets(TabletLists tLists) throws WalMarkerException, DistributedStoreException { var deadTablets = tLists.assignedToDeadServers; var deadLogs = tLists.logsForDeadServers; @@ -1387,8 +1526,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } else { store.unassign(deadTablets, deadLogs); } - markDeadServerLogsAsClosed(wals, deadLogs); - manager.nextEvent.event( + markDeadServerLogsAsClosed(walStateManager, deadLogs); + manager.nextEvent.event(store.getLevel(), "Marked %d tablets as suspended because they don't have current servers", deadTablets.size()); } @@ -1433,13 +1572,25 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - private void flushChanges(TabletLists tLists, WalStateManager wals) + private final Lock flushLock = new ReentrantLock(); + + private void flushChanges(TabletLists tLists) throws DistributedStoreException, TException, WalMarkerException { var unassigned = Collections.unmodifiableMap(tLists.unassigned); - handleDeadTablets(tLists, wals); - - getAssignmentsFromBalancer(tLists, unassigned); + flushLock.lock(); + try { + // This method was originally only ever called by one thread. The code was modified so that + // two threads could possibly call this flush method concurrently. It is not clear the + // following methods are thread safe so a lock is acquired out of caution. Balancer plugins + // may not expect multiple threads to call them concurrently, Accumulo has not done this in + // the past. The log recovery code needs to be evaluated for thread safety. + handleDeadTablets(tLists); + + getAssignmentsFromBalancer(tLists, unassigned); + } finally { + flushLock.unlock(); + } if (!tLists.assignments.isEmpty()) { Manager.log.info(String.format("Assigning %d tablets", tLists.assignments.size())); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 7f0be24740..eb2cf605fd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -106,6 +106,7 @@ import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.manager.EventCoordinator; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; import org.apache.accumulo.server.ServerContext; @@ -161,6 +162,7 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface private final LiveTServerSet tserverSet; private final SecurityOperation security; private final CompactionJobQueues jobQueues; + private final EventCoordinator eventCoordinator; // Exposed for tests protected volatile Boolean shutdown = false; @@ -171,12 +173,14 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface private final Cache<Path,Integer> checked_tablet_dir_cache; public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, - SecurityOperation security, CompactionJobQueues jobQueues) { + SecurityOperation security, CompactionJobQueues jobQueues, + EventCoordinator eventCoordinator) { this.ctx = ctx; this.tserverSet = tservers; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; this.jobQueues = jobQueues; + this.eventCoordinator = eventCoordinator; var refreshLatches = new EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class); refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1)); @@ -788,6 +792,9 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface // of a coordinator restart when the Coordinator can't find the TServer for the // corresponding external compaction. recordCompletion(ecid); + + // This will causes the tablet to be reexamined to see if it needs any more compactions. + eventCoordinator.event(extent, "Compaction completed %s", extent); } private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats stats, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java index 38025c916e..562c909842 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java @@ -62,7 +62,7 @@ public class ChangeTableState extends ManagerRepo { Utils.unreserveNamespace(env, namespaceId, tid, false); Utils.unreserveTable(env, tableId, tid, true); LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state {} {}", tableId, ts); - env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts); + env.getEventCoordinator().event(tableId, "Set table state of %s to %s", tableId, ts); return null; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java index da4a6d6975..3733271049 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java @@ -62,8 +62,8 @@ class FinishCloneTable extends ManagerRepo { Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false); Utils.unreserveTable(environment, cloneInfo.tableId, tid, true); - environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, - cloneInfo.srcTableId); + environment.getEventCoordinator().event(cloneInfo.tableId, "Cloned table %s from %s", + cloneInfo.tableName, cloneInfo.srcTableId); LoggerFactory.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 0f8491fc5e..7868aee84c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -146,6 +147,9 @@ class CompactionDriver extends ManagerRepo { int selected = 0; + KeyExtent minSelected = null; + KeyExtent maxSelected = null; + CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid); for (TabletMetadata tablet : tablets) { @@ -211,6 +215,14 @@ class CompactionDriver extends ManagerRepo { && tabletMetadata.getSelectedFiles().getMetadataValue() .equals(selectedFiles.getMetadataValue())); + if (minSelected == null || tablet.getExtent().compareTo(minSelected) < 0) { + minSelected = tablet.getExtent(); + } + + if (maxSelected == null || tablet.getExtent().compareTo(maxSelected) > 0) { + maxSelected = tablet.getExtent(); + } + selected++; } @@ -239,10 +251,9 @@ class CompactionDriver extends ManagerRepo { result.getExtent())); if (selected > 0) { - // selected files for some tablets, send a notification to get the tablet group watcher to - // scan for tablets to compact - manager.getEventCoordinator().event("%s selected files for compaction for %d tablets", - FateTxId.formatTid(tid), selected); + manager.getEventCoordinator().event( + new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), + "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); } return total - complete; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java index cdd6ad7291..58abc8d844 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java @@ -62,7 +62,8 @@ class FinishCreateTable extends ManagerRepo { Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false); Utils.unreserveTable(env, tableInfo.getTableId(), tid, true); - env.getEventCoordinator().event("Created table %s ", tableInfo.getTableName()); + env.getEventCoordinator().event(tableInfo.getTableId(), "Created table %s ", + tableInfo.getTableName()); if (tableInfo.getInitialSplitSize() > 0) { cleanupSplitFiles(env); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java index c578f64b89..e8eb5c8761 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java @@ -48,7 +48,7 @@ public class DeleteTable extends ManagerRepo { @Override public Repo<Manager> call(long tid, Manager env) { env.getTableManager().transitionTableState(tableId, TableState.DELETING); - env.getEventCoordinator().event("deleting table %s ", tableId); + env.getEventCoordinator().event(tableId, "deleting table %s ", tableId); return new CleanUp(tableId, namespaceId); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index a01539d339..d5ec7f9a88 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -67,6 +67,10 @@ public class DeleteOperationIds extends ManagerRepo { .stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet())); } + // Get the tablets hosted ASAP if necessary. + manager.getEventCoordinator().event(splitInfo.getOriginal(), "Added %d splits to %s", + splitInfo.getSplits().size(), splitInfo.getOriginal()); + TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java index 0df0771ecf..8f55aa3313 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java @@ -63,7 +63,7 @@ class FinishImportTable extends ManagerRepo { Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), tid); } - env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName); + env.getEventCoordinator().event(tableInfo.tableId, "Imported table %s ", tableInfo.tableName); LoggerFactory.getLogger(FinishImportTable.class) .debug("Imported table " + tableInfo.tableId + " " + tableInfo.tableName); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java new file mode 100644 index 0000000000..84ab0d0482 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public class SplitMillionIT extends AccumuloClusterHarness { + + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, + justification = "predictable random is ok for testing") + @Test + public void testOneMillionTablets() throws Exception { + Logger log = LoggerFactory.getLogger(SplitIT.class); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + SortedSet<Text> splits = new TreeSet<>(); + + for (int i = 100; i < 100_000_000; i += 100) { + String split = String.format("%010d", i); + + splits.add(new Text(split)); + + if (splits.size() >= 10000) { + addSplits(c, tableName, splits, log); + } + } + + if (!splits.isEmpty()) { + addSplits(c, tableName, splits, log); + } + + var rows = IntStream + .concat(new Random().ints(98, 0, 100_000_000).flatMap(i -> IntStream.of(i, i + 1)), + IntStream.of(0, 1, 99_999_998, 99_999_999)) + .toArray(); + + // read and write to a few of the 1 million tablets. The following should touch the first, + // last, and a few middle tablets. + for (var rowInt : rows) { + + var row = String.format("%010d", rowInt); + + long t1 = System.currentTimeMillis(); + try (var scanner = c.createScanner(tableName)) { + scanner.setRange(new Range(row)); + assertEquals(0, scanner.stream().count()); + } + + long t2 = System.currentTimeMillis(); + + try (var writer = c.createBatchWriter(tableName)) { + Mutation m = new Mutation(row); + m.put("c", "x", "200"); + m.put("c", "y", "900"); + m.put("c", "z", "300"); + writer.addMutation(m); + } + + long t3 = System.currentTimeMillis(); + + try (var scanner = c.createScanner(tableName)) { + scanner.setRange(new Range(row)); + Map<String,String> coords = scanner.stream().collect(Collectors.toMap( + e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); + assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); + } + + long t4 = System.currentTimeMillis(); + log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); + } + + long t1 = System.currentTimeMillis(); + long count = c.tableOperations().getTabletInformation(tableName, new Range()).count(); + long t2 = System.currentTimeMillis(); + assertEquals(1_000_000, count); + log.info("Time to scan all tablets : {}ms", t2 - t1); + + t1 = System.currentTimeMillis(); + c.tableOperations().delete(tableName); + t2 = System.currentTimeMillis(); + log.info("Time to delete table : {}ms", t2 - t1); + + } + } + + private static void addSplits(AccumuloClient c, String tableName, SortedSet<Text> splits, + Logger log) throws Exception { + long t1 = System.currentTimeMillis(); + c.tableOperations().addSplits(tableName, splits); + long t2 = System.currentTimeMillis(); + log.info("Added {} splits in {}ms", splits.size(), t2 - t1); + splits.clear(); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java deleted file mode 100644 index 11b4e58ac9..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.functional; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.time.Duration; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.IntStream; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.admin.Locations; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.TabletManagement; -import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader; -import org.apache.accumulo.harness.MiniClusterConfigurationCallback; -import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; -import org.apache.accumulo.test.util.Wait; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import com.google.common.collect.Iterators; - -public class TabletManagementScannerIT extends SharedMiniClusterBase { - - public static class TMSIT_Config implements MiniClusterConfigurationCallback { - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); - cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "10"); - cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10"); - cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "180"); - cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10s"); - cfg.setProperty(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD, "15"); - } - } - - @Override - protected Duration defaultTimeout() { - return Duration.ofMinutes(2); - } - - @BeforeAll - public static void beforeAll() throws Exception { - SharedMiniClusterBase.startMiniClusterWithConfig(new TMSIT_Config()); - } - - @AfterAll - public static void afterAll() throws Exception { - SharedMiniClusterBase.stopMiniCluster(); - } - - @Test - public void testKnownTabletModificationsTakePriority() throws Exception { - - final ConcurrentLinkedQueue<TabletManagement> mods = new ConcurrentLinkedQueue<>(); - - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - - // Confirm that the root and metadata tables are hosted - Locations rootLocations = - c.tableOperations().locate(RootTable.NAME, Collections.singletonList(new Range())); - rootLocations.groupByTablet().keySet() - .forEach(tid -> assertNotNull(rootLocations.getTabletLocation(tid))); - - Locations metadataLocations = - c.tableOperations().locate(MetadataTable.NAME, Collections.singletonList(new Range())); - metadataLocations.groupByTablet().keySet() - .forEach(tid -> assertNotNull(metadataLocations.getTabletLocation(tid))); - - String tableName = super.getUniqueNames(1)[0]; - NewTableConfiguration ntc = new NewTableConfiguration(); - SortedSet<Text> splits = new TreeSet<>(); - IntStream.range(97, 122).forEach((i) -> { - splits.add(new Text(String.valueOf((char) i))); - }); - ntc.withSplits(splits); - c.tableOperations().create(tableName, ntc); - - TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); - - // wait for the tablets to exist in the metadata table. The tablets - // will not be hosted so the current location will be empty. Passing null - // to the TabletManagementScanner should cause all tablets to be returned - // by the TabletManagementIterator. - try (TabletManagementScanner s = new TabletManagementScanner((ClientContext) c, - new Range(TabletsSection.getRange(tableId)), null, MetadataTable.NAME, mods)) { - Wait.waitFor(() -> Iterators.size(s) == 26, 10_000, 250); - } - - // set the tablet hosting goal on the first half of the tablets - // so they will need a location update action - c.tableOperations().setTabletHostingGoal(tableName, new Range(null, "m"), - TabletHostingGoal.ALWAYS); - - // let's add a known modification for the last tablet - TabletsMetadata tms = getCluster().getServerContext().getAmple().readTablets() - .forTablet(new KeyExtent(tableId, null, new Text("y"))) - .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, - ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, ColumnType.FILES, - ColumnType.LAST, ColumnType.OPID) - .build(); - assertEquals(1, Iterators.size(tms.iterator())); - TabletMetadata tm = tms.iterator().next(); - mods.add(new TabletManagement(EnumSet.of(ManagementAction.NEEDS_SPLITTING), tm)); - - // check the contents of the TabletManagementScanner - try (TabletManagementScanner s = new TabletManagementScanner((ClientContext) c, - new Range(TabletsSection.getRange(tableId)), null, MetadataTable.NAME, mods)) { - - @SuppressWarnings("resource") - Iterator<TabletManagement> iter = s; - // Confirm that the first object is the one we added to the known mods set. - assertFalse(mods.isEmpty()); - assertTrue(iter.hasNext()); - TabletManagement tman = iter.next(); - assertEquals(1, tman.getActions().size()); - assertTrue(tman.getActions().contains(ManagementAction.NEEDS_SPLITTING)); - assertEquals(tm, tman.getTabletMetadata()); - - assertTrue(mods.isEmpty()); - - // The TabletManagementScanner is going to return all of the tablets because - // we passed null for the state. Confirm that the hosting goal is ALWAYS for - // for the first 13 and ondemand for the latter half. When running in the Manager - // the latter half of the results would not be returned. - for (int i = 0; i < 26; i++) { - assertTrue(iter.hasNext()); - tman = iter.next(); - assertEquals(1, tman.getActions().size()); - assertTrue(tman.getActions().contains(ManagementAction.NEEDS_LOCATION_UPDATE)); - if (i < 13) { - assertEquals(TabletHostingGoal.ALWAYS, tman.getTabletMetadata().getHostingGoal()); - } else { - assertEquals(TabletHostingGoal.ONDEMAND, tman.getTabletMetadata().getHostingGoal()); - } - } - - assertFalse(iter.hasNext()); - } - - } - } - -}