This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit aecd11f81ccda36726b133b601ca5adad73eca58 Merge: 23a921726d 9e079dbbeb Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Aug 23 11:25:49 2024 -0400 Merge branch 'main' into elasticity Redo/fix merge of #4821 .../core/clientImpl/ClientTabletCache.java | 11 +- .../core/clientImpl/ClientTabletCacheImpl.java | 33 +++-- .../java/org/apache/accumulo/core/fate/Fate.java | 10 +- .../org/apache/accumulo/core/lock/ServiceLock.java | 9 +- .../java/org/apache/accumulo/core/util/Timer.java | 25 ---- .../util/compaction/ExternalCompactionUtil.java | 7 +- .../apache/accumulo/core/util/time/NanoTime.java | 104 ------------- .../org/apache/accumulo/core/util/TimerTest.java | 34 ----- .../accumulo/core/util/time/NanoTimeTest.java | 162 --------------------- pom.xml | 2 +- .../accumulo/server/compaction/FileCompactor.java | 14 +- .../org/apache/accumulo/compactor/Compactor.java | 6 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 9 +- .../java/org/apache/accumulo/manager/Manager.java | 6 +- .../availability/SetTabletAvailability.java | 8 +- .../manager/tableOps/merge/ReserveTablets.java | 7 +- .../org/apache/accumulo/tserver/ScanServer.java | 6 +- .../accumulo/tserver/UnloadTabletHandler.java | 6 +- 18 files changed, 73 insertions(+), 386 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java index a31ca2418b,0000000000..5a23cad2d4 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java @@@ -1,426 -1,0 +1,429 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.InvalidTabletHostingRequestException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.MetadataCachedTabletObtainer; +import org.apache.accumulo.core.singletons.SingletonManager; +import org.apache.accumulo.core.singletons.SingletonService; +import org.apache.accumulo.core.util.Interner; ++import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.UtilWaitThread; - import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +/** + * Client side cache of information about Tablets. Currently, a tablet prev end row is cached and + * locations are cached if they exist. + */ +public abstract class ClientTabletCache { + + /** + * Flipped false on call to {@link #clearInstances}. Checked by client classes that locally cache + * Locators. + */ + private volatile boolean isValid = true; + + boolean isValid() { + return isValid; + } + + /** + * Used to indicate if a user of this interface needs a tablet with a location. This simple enum + * was created instead of using a boolean for code clarity. + */ + public enum LocationNeed { + REQUIRED, NOT_REQUIRED + } + + /** + * This method allows linear scans to host tablet ahead of time that they may read in the future. + * The goal of this method is to allow tablets to request hosting of tablet for a scan before the + * scan actually needs it. Below is an example of how this method could work with a scan when + * {@code minimumHostAhead=4} is passed and avoid the scan having to wait on tablet hosting. + * + * <ol> + * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li> + * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li> + * <li>The request to read the 5th tablets causes a request to host 4 more tablets (this would be + * the 9th,10th,11th, and 12th tablets)</li> + * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li> + * <li>While the scan does the read above, the 9th,10th,11th, and 12th tablets are actually + * hosted. This happens concurrently with the scan above.</li> + * <li>When the scan goes to read the 9th tablet, hopefully its already hosted. Also attempting to + * read the 9th tablet will cause a request to host the 13th,14th,15th, and 16th tablets.</li> + * </ol> + * + * In the situation above, the goal is that while we are reading 4 hosted tablets the 4 following + * tablets are in the process of being hosted. + * + * @param minimumHostAhead Attempts to keep between minimumHostAhead and 2*minimumHostAhead + * tablets following the found tablet hosted. + * @param hostAheadRange Only host following tablets that are within this range. + */ + public abstract CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * Finds the tablet that contains the given row. + * + * @param locationNeed When {@link LocationNeed#REQUIRED} is passed will only return a tablet if + * it has location. When {@link LocationNeed#NOT_REQUIRED} is passed will return the tablet + * that overlaps the row with or without a location. + * + * @return overlapping tablet. If no overlapping tablet exists, returns null. If location is + * required and the tablet currently has no location ,returns null. + */ + public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + return findTablet(context, row, skipRow, locationNeed, 0, null); + } + + public CachedTablet findTabletWithRetry(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + var tl = findTablet(context, row, skipRow, locationNeed); + while (tl == null && locationNeed == LocationNeed.REQUIRED) { + UtilWaitThread.sleep(100); + tl = findTablet(context, row, skipRow, locationNeed); + } + return tl; + } + + public abstract <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, + Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * <p> + * This method finds what tablets overlap a given set of ranges, passing each range and its + * associated tablet to the range consumer. If a range overlaps multiple tablets then it can be + * passed to the range consumer multiple times. + * </p> + * + * @param locationNeed When {@link LocationNeed#REQUIRED} is passed only tablets that have a + * location are provided to the rangeConsumer, any range that overlaps a tablet without a + * location will be returned as a failure. When {@link LocationNeed#NOT_REQUIRED} is + * passed, ranges that overlap tablets with and without a location are provided to the + * range consumer. + * @param ranges For each range will try to find overlapping contiguous tablets that optionally + * have a location. + * @param rangeConsumer If all of the tablets that a range overlaps are found, then the range and + * tablets will be passed to this consumer one at time. A range will either be passed to + * this consumer one more mor times OR returned as a failuer, but never both. + * + * @return The failed ranges that did not have a location (if a location is required) or where + * contiguous tablets could not be found. + */ + public abstract List<Range> findTablets(ClientContext context, List<Range> ranges, + BiConsumer<CachedTablet,Range> rangeConsumer, LocationNeed locationNeed) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * The behavior of this method is similar to + * {@link #findTablets(ClientContext, List, BiConsumer, LocationNeed)}, except it bins ranges to + * the passed in binnedRanges map instead of passing them to a consumer. This method only bins to + * hosted tablets with a location. + */ + public List<Range> binRanges(ClientContext context, List<Range> ranges, + Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, InvalidTabletHostingRequestException { + return findTablets(context, ranges, ((cachedTablet, range) -> ClientTabletCacheImpl + .addRange(binnedRanges, cachedTablet, range)), LocationNeed.REQUIRED); + } + + public abstract void invalidateCache(KeyExtent failedExtent); + + public abstract void invalidateCache(Collection<KeyExtent> keySet); + + /** + * Invalidate entire cache + */ + public abstract void invalidateCache(); + + /** + * Invalidate all metadata entries that point to server + */ + public abstract void invalidateCache(ClientContext context, String server); + + private static class InstanceKey { + InstanceId instanceId; + TableId tableId; + + InstanceKey(InstanceId instanceId, TableId table) { + this.instanceId = instanceId; + this.tableId = table; + } + + @Override + public int hashCode() { + return instanceId.hashCode() + tableId.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof InstanceKey) { + return equals((InstanceKey) o); + } + return false; + } + + public boolean equals(InstanceKey lk) { + return instanceId.equals(lk.instanceId) && tableId.equals(lk.tableId); + } + + } + + private static final HashMap<InstanceKey,ClientTabletCache> instances = new HashMap<>(); + private static boolean enabled = true; + + public static synchronized void clearInstances() { + for (ClientTabletCache locator : instances.values()) { + locator.isValid = false; + } + instances.clear(); + } + + static synchronized boolean isEnabled() { + return enabled; + } + + static synchronized void disable() { + clearInstances(); + enabled = false; + } + + static synchronized void enable() { + enabled = true; + } + + public long getTabletHostingRequestCount() { + return 0L; + } + + public static synchronized ClientTabletCache getInstance(ClientContext context, TableId tableId) { + Preconditions.checkState(enabled, "The Accumulo singleton that that tracks tablet locations is " + + "disabled. This is likely caused by all AccumuloClients being closed or garbage collected"); + InstanceKey key = new InstanceKey(context.getInstanceID(), tableId); + ClientTabletCache tl = instances.get(key); + if (tl == null) { + MetadataCachedTabletObtainer mlo = new MetadataCachedTabletObtainer(); + + if (AccumuloTable.ROOT.tableId().equals(tableId)) { + tl = new RootClientTabletCache(new ZookeeperLockChecker(context)); + } else if (AccumuloTable.METADATA.tableId().equals(tableId)) { + tl = new ClientTabletCacheImpl(AccumuloTable.METADATA.tableId(), + getInstance(context, AccumuloTable.ROOT.tableId()), mlo, + new ZookeeperLockChecker(context)); + } else { + tl = new ClientTabletCacheImpl(tableId, + getInstance(context, AccumuloTable.METADATA.tableId()), mlo, + new ZookeeperLockChecker(context)); + } + instances.put(key, tl); + } + + return tl; + } + + static { + SingletonManager.register(new SingletonService() { + + @Override + public boolean isEnabled() { + return ClientTabletCache.isEnabled(); + } + + @Override + public void enable() { + ClientTabletCache.enable(); + } + + @Override + public void disable() { + ClientTabletCache.disable(); + } + }); + } + + public static class CachedTablets { + + private final List<CachedTablet> cachedTablets; + + public CachedTablets(List<CachedTablet> cachedTablets) { + this.cachedTablets = cachedTablets; + } + + public List<CachedTablet> getCachedTablets() { + return cachedTablets; + } + } + + public static class CachedTablet { + private static final Interner<String> interner = new Interner<>(); + + private final KeyExtent tablet_extent; + private final String tserverLocation; + private final String tserverSession; + private final TabletAvailability availability; + private final boolean hostingRequested; + - private final NanoTime creationTime = NanoTime.now(); ++ private final Timer creationTimer = Timer.startNew(); + + public CachedTablet(KeyExtent tablet_extent, String tablet_location, String session, + TabletAvailability availability, boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + checkArgument(tablet_location != null, "tablet_location is null"); + checkArgument(session != null, "session is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = interner.intern(tablet_location); + this.tserverSession = interner.intern(session); + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + public CachedTablet(KeyExtent tablet_extent, Optional<String> tablet_location, + Optional<String> session, TabletAvailability availability, boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = tablet_location.map(interner::intern).orElse(null); + this.tserverSession = session.map(interner::intern).orElse(null); + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + public CachedTablet(KeyExtent tablet_extent, TabletAvailability availability, + boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = null; + this.tserverSession = null; + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + @Override + public boolean equals(Object o) { + if (o instanceof CachedTablet) { + CachedTablet otl = (CachedTablet) o; + return getExtent().equals(otl.getExtent()) + && getTserverLocation().equals(otl.getTserverLocation()) + && getTserverSession().equals(otl.getTserverSession()) + && getAvailability() == otl.getAvailability() + && hostingRequested == otl.hostingRequested; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getExtent(), tserverLocation, tserverSession, availability, + hostingRequested); + } + + @Override + public String toString() { + return "(" + getExtent() + "," + getTserverLocation() + "," + getTserverSession() + "," + + getAvailability() + ")"; + } + + public KeyExtent getExtent() { + return tablet_extent; + } + + public Optional<String> getTserverLocation() { + return Optional.ofNullable(tserverLocation); + } + + public Optional<String> getTserverSession() { + return Optional.ofNullable(tserverSession); + } + + /** + * The ClientTabletCache will remove and replace a CachedTablet when the location is no longer + * valid. However, it will not do the same when the availability is no longer valid. The + * availability returned by this method may be out of date. If this information is needed to be + * fresh, then you may want to consider clearing the cache first. + */ + public TabletAvailability getAvailability() { + return this.availability; + } + - public NanoTime getCreationTime() { - return creationTime; ++ /** ++ * @return a timer that was started when this object was created ++ */ ++ public Timer getCreationTimer() { ++ return creationTimer; + } + + public boolean wasHostingRequested() { + return hostingRequested; + } + } + + public static class TabletServerMutations<T extends Mutation> { + private Map<KeyExtent,List<T>> mutations; + private String tserverSession; + + public TabletServerMutations(String tserverSession) { + this.tserverSession = tserverSession; + this.mutations = new HashMap<>(); + } + + public void addMutation(KeyExtent ke, T m) { + List<T> mutList = mutations.computeIfAbsent(ke, k -> new ArrayList<>()); + mutList.add(m); + } + + public Map<KeyExtent,List<T>> getMutations() { + return mutations; + } + + final String getSession() { + return tserverSession; + } + } +} diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java index 7f3de5819c,0000000000..7aa10260cb mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java @@@ -1,984 -1,0 +1,985 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; ++import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.InvalidTabletHostingRequestException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.Timer; - import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public class ClientTabletCacheImpl extends ClientTabletCache { + + private static final Logger log = LoggerFactory.getLogger(ClientTabletCacheImpl.class); + private static final AtomicBoolean HOSTING_ENABLED = new AtomicBoolean(true); + + // MAX_TEXT represents a TEXT object that is greater than all others. Attempted to use null for + // this purpose, but there seems to be a bug in TreeMap.tailMap with null. Therefore instead of + // using null, created MAX_TEXT. + static final Text MAX_TEXT = new Text(); + + static final Comparator<Text> END_ROW_COMPARATOR = (o1, o2) -> { + if (o1 == o2) { + return 0; + } + if (o1 == MAX_TEXT) { + return 1; + } + if (o2 == MAX_TEXT) { + return -1; + } + return o1.compareTo(o2); + }; + + protected TableId tableId; + protected ClientTabletCache parent; + protected TreeMap<Text,CachedTablet> metaCache = new TreeMap<>(END_ROW_COMPARATOR); + protected CachedTabletObtainer tabletObtainer; + private final TabletServerLockChecker lockChecker; + protected Text lastTabletRow; + + private final TreeSet<KeyExtent> badExtents = new TreeSet<>(); + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock rLock = rwLock.readLock(); + private final Lock wLock = rwLock.writeLock(); + private final AtomicLong tabletHostingRequestCount = new AtomicLong(0); + + public interface CachedTabletObtainer { + /** + * @return null when unable to read information successfully + */ + CachedTablets lookupTablet(ClientContext context, CachedTablet src, Text row, Text stopRow, + ClientTabletCache parent) throws AccumuloSecurityException, AccumuloException; + + List<CachedTablet> lookupTablets(ClientContext context, String tserver, + Map<KeyExtent,List<Range>> map, ClientTabletCache parent) + throws AccumuloSecurityException, AccumuloException; + } + + public interface TabletServerLockChecker { + boolean isLockHeld(String tserver, String session); + + void invalidateCache(String server); + } + + private class LockCheckerSession { + + private final HashSet<Pair<String,String>> okLocks = new HashSet<>(); + private final HashSet<Pair<String,String>> invalidLocks = new HashSet<>(); + + private CachedTablet checkLock(CachedTablet tl) { + // the goal of this class is to minimize calls out to lockChecker under that + // assumption that + // it is a resource synchronized among many threads... want to + // avoid fine-grained synchronization when binning lots of mutations or ranges... remember + // decisions from the lockChecker in thread local unsynchronized + // memory + + if (tl == null) { + return null; + } + + if (tl.getTserverLocation().isEmpty()) { + return tl; + } + + Pair<String,String> lock = + new Pair<>(tl.getTserverLocation().orElseThrow(), tl.getTserverSession().orElseThrow()); + + if (okLocks.contains(lock)) { + return tl; + } + + if (invalidLocks.contains(lock)) { + return null; + } + + if (lockChecker.isLockHeld(tl.getTserverLocation().orElseThrow(), + tl.getTserverSession().orElseThrow())) { + okLocks.add(lock); + return tl; + } + + if (log.isTraceEnabled()) { + log.trace("Tablet server {} {} no longer holds its lock", tl.getTserverLocation(), + tl.getTserverSession()); + } + + invalidLocks.add(lock); + + return null; + } + } + + public ClientTabletCacheImpl(TableId tableId, ClientTabletCache parent, CachedTabletObtainer tlo, + TabletServerLockChecker tslc) { + this.tableId = tableId; + this.parent = parent; + this.tabletObtainer = tlo; + this.lockChecker = tslc; + + this.lastTabletRow = new Text(tableId.canonical()); + lastTabletRow.append(new byte[] {'<'}, 0, 1); + } + + @Override + public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, + Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { + + Timer timer = null; + + if (log.isTraceEnabled()) { + log.trace("tid={} Binning {} mutations for table {}", Thread.currentThread().getId(), + mutations.size(), tableId); + timer = Timer.startNew(); + } + + ArrayList<T> notInCache = new ArrayList<>(); + Text row = new Text(); + + LockCheckerSession lcSession = new LockCheckerSession(); + + rLock.lock(); + try { + processInvalidated(context, lcSession); + + // for this to be efficient rows need to be in sorted order, but always sorting is slow... + // therefore only sort the + // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order + + // For this to be efficient, need to avoid fine grained synchronization and fine grained + // logging. + // Therefore methods called by this are not synchronized and should not log. + + for (T mutation : mutations) { + row.set(mutation.getRow()); + CachedTablet tl = findTabletInCache(row); + if (!addMutation(binnedMutations, mutation, tl, lcSession)) { + notInCache.add(mutation); + } + } + } finally { + rLock.unlock(); + } + + HashSet<CachedTablet> locationLess = new HashSet<>(); + + if (!notInCache.isEmpty()) { + notInCache.sort((o1, o2) -> WritableComparator.compareBytes(o1.getRow(), 0, + o1.getRow().length, o2.getRow(), 0, o2.getRow().length)); + + wLock.lock(); + try { + // Want to ignore any entries in the cache w/o a location that were created before the + // following time. Entries created after the following time may have been populated by the + // following loop, and we want to use those. - var cacheCutoff = NanoTime.now(); ++ Timer cacheCutoffTimer = Timer.startNew(); + + for (T mutation : notInCache) { + + row.set(mutation.getRow()); + + CachedTablet tl = _findTablet(context, row, false, false, false, lcSession, - LocationNeed.REQUIRED, cacheCutoff); ++ LocationNeed.REQUIRED, cacheCutoffTimer); + + if (!addMutation(binnedMutations, mutation, tl, lcSession)) { + failures.add(mutation); + if (tl != null && tl.getTserverLocation().isEmpty()) { + locationLess.add(tl); + } + } + } + } finally { + wLock.unlock(); + } + } + + requestTabletHosting(context, locationLess); + + if (timer != null) { + log.trace("tid={} Binned {} mutations for table {} to {} tservers in {}", + Thread.currentThread().getId(), mutations.size(), tableId, binnedMutations.size(), + String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0)); + } + + } + + private <T extends Mutation> boolean addMutation( + Map<String,TabletServerMutations<T>> binnedMutations, T mutation, CachedTablet tl, + LockCheckerSession lcSession) { + + if (tl == null || tl.getTserverLocation().isEmpty()) { + return false; + } + + TabletServerMutations<T> tsm = binnedMutations.get(tl.getTserverLocation().orElseThrow()); + + if (tsm == null) { + // do lock check once per tserver here to make binning faster + boolean lockHeld = lcSession.checkLock(tl) != null; + if (lockHeld) { + tsm = new TabletServerMutations<>(tl.getTserverSession().orElseThrow()); + binnedMutations.put(tl.getTserverLocation().orElseThrow(), tsm); + } else { + return false; + } + } + + // its possible the same tserver could be listed with different sessions + if (tsm.getSession().equals(tl.getTserverSession().orElseThrow())) { + tsm.addMutation(tl.getExtent(), mutation); + return true; + } + + return false; + } + + static boolean isContiguous(List<CachedTablet> cachedTablets) { + + Iterator<CachedTablet> iter = cachedTablets.iterator(); + KeyExtent prevExtent = iter.next().getExtent(); + + while (iter.hasNext()) { + KeyExtent currExtent = iter.next().getExtent(); + + if (!currExtent.isPreviousExtent(prevExtent)) { + return false; + } + + prevExtent = currExtent; + } + + return true; + } + + private List<Range> findTablets(ClientContext context, List<Range> ranges, + BiConsumer<CachedTablet,Range> rangeConsumer, boolean useCache, LockCheckerSession lcSession, + LocationNeed locationNeed, Consumer<CachedTablet> locationlessConsumer) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { + List<Range> failures = new ArrayList<>(); + List<CachedTablet> cachedTablets = new ArrayList<>(); + + // Use anything in the cache w/o a location populated after this point in time. Cache entries + // w/o a location created before the following time should be ignored and the metadata table + // consulted. - var cacheCutoff = NanoTime.now(); ++ Timer cacheCutoffTimer = Timer.startNew(); + + l1: for (Range range : ranges) { + + cachedTablets.clear(); + + Text startRow; + + if (range.getStartKey() != null) { + startRow = range.getStartKey().getRow(); + } else { + startRow = new Text(); + } + + CachedTablet tl = null; + + if (useCache) { + tl = lcSession.checkLock(findTabletInCache(startRow)); + } else { + tl = _findTablet(context, startRow, false, false, false, lcSession, locationNeed, - cacheCutoff); ++ cacheCutoffTimer); + } + + if (tl == null) { + failures.add(range); + continue; + } + + cachedTablets.add(tl); + + // a range may extend over multiple tablets, look for additional tablet that overlap the range + while (tl.getExtent().endRow() != null + && !range.afterEndKey(new Key(tl.getExtent().endRow()).followingKey(PartialKey.ROW))) { + if (useCache) { + Text row = new Text(tl.getExtent().endRow()); + row.append(new byte[] {0}, 0, 1); + tl = lcSession.checkLock(findTabletInCache(row)); + } else { + tl = _findTablet(context, tl.getExtent().endRow(), true, false, false, lcSession, - locationNeed, cacheCutoff); ++ locationNeed, cacheCutoffTimer); + } + + if (tl == null) { + failures.add(range); + continue l1; + } + cachedTablets.add(tl); + } + + // pass all tablets without a location before failing range + cachedTablets.stream().filter(tloc -> tloc.getTserverLocation().isEmpty()) + .forEach(locationlessConsumer); + + if (locationNeed == LocationNeed.REQUIRED + && !cachedTablets.stream().allMatch(tloc -> tloc.getTserverLocation().isPresent())) { + failures.add(range); + continue; + } + + // Ensure the extents found are non overlapping and have no holes. When reading some extents + // from the cache and other from the metadata table in the loop above we may end up with + // non-contiguous extents. This can happen when a subset of exents are placed in the cache and + // then after that merges and splits happen. + if (isContiguous(cachedTablets)) { + for (CachedTablet tl2 : cachedTablets) { + rangeConsumer.accept(tl2, range); + } + } else { + failures.add(range); + } + + } + + return failures; + } + + @Override + public List<Range> findTablets(ClientContext context, List<Range> ranges, + BiConsumer<CachedTablet,Range> rangeConsumer, LocationNeed locationNeed) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { + + /* + * For this to be efficient, need to avoid fine grained synchronization and fine grained + * logging. Therefore methods called by this are not synchronized and should not log. + */ + + Timer timer = null; + + if (log.isTraceEnabled()) { + log.trace("tid={} Binning {} ranges for table {}", Thread.currentThread().getId(), + ranges.size(), tableId); + timer = Timer.startNew(); + } + + LockCheckerSession lcSession = new LockCheckerSession(); + + List<Range> failures; + rLock.lock(); + try { + processInvalidated(context, lcSession); + + // for this to be optimal, need to look ranges up in sorted order when + // ranges are not present in cache... however do not want to always + // sort ranges... therefore try binning ranges using only the cache + // and sort whatever fails and retry + + failures = findTablets(context, ranges, rangeConsumer, true, lcSession, locationNeed, + keyExtent -> {}); + } finally { + rLock.unlock(); + } + + if (!failures.isEmpty()) { + // sort failures by range start key + Collections.sort(failures); + + // use a hashset because some ranges may overlap the same extent, so want to avoid duplicate + // extents + HashSet<CachedTablet> locationLess = new HashSet<>(); + Consumer<CachedTablet> locationLessConsumer; + if (locationNeed == LocationNeed.REQUIRED) { + locationLessConsumer = locationLess::add; + } else { + locationLessConsumer = keyExtent -> {}; + } + + // try lookups again + wLock.lock(); + try { + + failures = findTablets(context, failures, rangeConsumer, false, lcSession, locationNeed, + locationLessConsumer); + } finally { + wLock.unlock(); + } + + requestTabletHosting(context, locationLess); + + } + + if (timer != null) { + log.trace("tid={} Binned {} ranges for table {} in {}", Thread.currentThread().getId(), + ranges.size(), tableId, String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0)); + } + + return failures; + } + + @Override + public void invalidateCache(KeyExtent failedExtent) { + wLock.lock(); + try { + badExtents.add(failedExtent); + } finally { + wLock.unlock(); + } + if (log.isTraceEnabled()) { + log.trace("Invalidated extent={}", failedExtent); + } + } + + @Override + public void invalidateCache(Collection<KeyExtent> keySet) { + wLock.lock(); + try { + badExtents.addAll(keySet); + } finally { + wLock.unlock(); + } + if (log.isTraceEnabled()) { + log.trace("Invalidated {} cache entries for table {}", keySet.size(), tableId); + } + } + + @Override + public void invalidateCache(ClientContext context, String server) { + int invalidatedCount = 0; + + wLock.lock(); + try { + for (CachedTablet cacheEntry : metaCache.values()) { + var loc = cacheEntry.getTserverLocation(); + if (loc.isPresent() && loc.orElseThrow().equals(server)) { + badExtents.add(cacheEntry.getExtent()); + invalidatedCount++; + } + } + } finally { + wLock.unlock(); + } + + lockChecker.invalidateCache(server); + + if (log.isTraceEnabled()) { + log.trace("invalidated {} cache entries table={} server={}", invalidatedCount, tableId, + server); + } + + } + + @Override + public void invalidateCache() { + int invalidatedCount; + wLock.lock(); + try { + invalidatedCount = metaCache.size(); + metaCache.clear(); + } finally { + wLock.unlock(); + } + this.tabletHostingRequestCount.set(0); + if (log.isTraceEnabled()) { + log.trace("invalidated all {} cache entries for table={}", invalidatedCount, tableId); + } + } + + @Override + public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { + + Timer timer = null; + + if (log.isTraceEnabled()) { + log.trace("tid={} Locating tablet table={} row={} skipRow={}", + Thread.currentThread().getId(), tableId, TextUtil.truncate(row), skipRow); + timer = Timer.startNew(); + } + + LockCheckerSession lcSession = new LockCheckerSession(); + CachedTablet tl = - _findTablet(context, row, skipRow, false, true, lcSession, locationNeed, NanoTime.now()); ++ _findTablet(context, row, skipRow, false, true, lcSession, locationNeed, Timer.startNew()); + + if (timer != null) { + log.trace("tid={} Located tablet {} at {} in {}", Thread.currentThread().getId(), + (tl == null ? "null" : tl.getExtent()), (tl == null ? "null" : tl.getTserverLocation()), + String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0)); + } + + if (tl != null && locationNeed == LocationNeed.REQUIRED) { + // Look at the next (minimumHostAhead * 2) tablets and return which ones need hosting. See the + // javadoc in the superclass of this method for more details. + Map<KeyExtent,CachedTablet> extentsToHost = findExtentsToHost(context, minimumHostAhead * 2, + hostAheadRange, lcSession, tl, locationNeed); + + if (!extentsToHost.isEmpty()) { + if (extentsToHost.containsKey(tl.getExtent()) || extentsToHost.size() >= minimumHostAhead) { + requestTabletHosting(context, extentsToHost.values()); + } + } + + if (tl.getTserverLocation().isEmpty()) { + return null; + } + } + + return tl; + + } + + private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context, int hostAheadCount, + Range hostAheadRange, LockCheckerSession lcSession, CachedTablet firstTablet, + LocationNeed locationNeed) throws AccumuloException, TableNotFoundException, + InvalidTabletHostingRequestException, AccumuloSecurityException { + + // its only expected that this method is called when location need is required + Preconditions.checkArgument(locationNeed == LocationNeed.REQUIRED); + + Map<KeyExtent,CachedTablet> extentsToHost; + + if (hostAheadCount > 0) { + extentsToHost = new HashMap<>(); + if (firstTablet.getTserverLocation().isEmpty()) { + extentsToHost.put(firstTablet.getExtent(), firstTablet); + } + + KeyExtent extent = firstTablet.getExtent(); + + var currTablet = extent; + + // Use anything in the cache w/o a location populated after this point in time. Cache entries + // w/o a location created before the following time should be ignored and the metadata table + // consulted. - var cacheCutoff = NanoTime.now(); ++ Timer cacheCutoffTimer = Timer.startNew(); + + for (int i = 0; i < hostAheadCount; i++) { + if (currTablet.endRow() == null || hostAheadRange + .afterEndKey(new Key(currTablet.endRow()).followingKey(PartialKey.ROW))) { + break; + } + + CachedTablet followingTablet = _findTablet(context, currTablet.endRow(), true, false, true, - lcSession, locationNeed, cacheCutoff); ++ lcSession, locationNeed, cacheCutoffTimer); + + if (followingTablet == null) { + break; + } + + currTablet = followingTablet.getExtent(); + + if (followingTablet.getTserverLocation().isEmpty() + && !followingTablet.wasHostingRequested()) { + extentsToHost.put(followingTablet.getExtent(), followingTablet); + } + } + } else if (firstTablet.getTserverLocation().isEmpty()) { + extentsToHost = Map.of(firstTablet.getExtent(), firstTablet); + } else { + extentsToHost = Map.of(); + } + return extentsToHost; + } + + @Override + public long getTabletHostingRequestCount() { + return tabletHostingRequestCount.get(); + } + + @VisibleForTesting + public void resetTabletHostingRequestCount() { + tabletHostingRequestCount.set(0); + } + + @VisibleForTesting + public void enableTabletHostingRequests(boolean enabled) { + HOSTING_ENABLED.set(enabled); + } + + private static final Duration STALE_DURATION = Duration.ofMinutes(2); + + private void requestTabletHosting(ClientContext context, + Collection<CachedTablet> tabletsWithNoLocation) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, InvalidTabletHostingRequestException { + + if (!HOSTING_ENABLED.get()) { + return; + } + + // System tables should always be hosted + if (AccumuloTable.ROOT.tableId() == tableId || AccumuloTable.METADATA.tableId() == tableId) { + return; + } + + if (tabletsWithNoLocation.isEmpty()) { + return; + } + + if (context.getTableState(tableId) != TableState.ONLINE) { + log.trace("requestTabletHosting: table {} is not online", tableId); + return; + } + + List<TKeyExtent> extentsToBringOnline = new ArrayList<>(); + for (var cachedTablet : tabletsWithNoLocation) { - if (cachedTablet.getCreationTime().elapsed().compareTo(STALE_DURATION) < 0) { ++ if (cachedTablet.getCreationTimer().elapsed().compareTo(STALE_DURATION) < 0) { + if (cachedTablet.getAvailability() == TabletAvailability.ONDEMAND) { + if (!cachedTablet.wasHostingRequested()) { + extentsToBringOnline.add(cachedTablet.getExtent().toThrift()); + log.trace("requesting ondemand tablet to be hosted {}", cachedTablet.getExtent()); + } else { + log.trace("ignoring ondemand tablet that already has a hosting request in place {} {}", - cachedTablet.getExtent(), cachedTablet.getCreationTime().elapsed()); ++ cachedTablet.getExtent(), cachedTablet.getCreationTimer().elapsed()); + } + } else if (cachedTablet.getAvailability() == TabletAvailability.UNHOSTED) { + throw new InvalidTabletHostingRequestException("Extent " + cachedTablet.getExtent() + + " has a tablet availability " + TabletAvailability.UNHOSTED); + } + } else { + // When a tablet does not have a location it is reread from the metadata table before this + // method is called. Therefore, it's expected that entries in the cache are recent. If the + // entries are not recent it could have two causes. One is a bug in the Accumulo code. + // Another is externalities like process swapping or slow metadata table reads. Logging a + // warning in case there is a bug. If the warning ends up being too spammy and is caused by + // externalities then this code/warning will need to be improved. + log.warn("Unexpected stale tablet seen in cache {}", cachedTablet.getExtent()); + invalidateCache(cachedTablet.getExtent()); + } + } + + if (!extentsToBringOnline.isEmpty()) { + log.debug("Requesting hosting for {} ondemand tablets for table id {}.", + extentsToBringOnline.size(), tableId); + ThriftClientTypes.MANAGER.executeVoid(context, + client -> client.requestTabletHosting(TraceUtil.traceInfo(), context.rpcCreds(), + tableId.canonical(), extentsToBringOnline)); + tabletHostingRequestCount.addAndGet(extentsToBringOnline.size()); + } + } + + private void lookupTablet(ClientContext context, Text row, boolean retry, + LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + Text metadataRow = new Text(tableId.canonical()); + metadataRow.append(new byte[] {';'}, 0, 1); + metadataRow.append(row.getBytes(), 0, row.getLength()); + CachedTablet ptl = parent.findTablet(context, metadataRow, false, LocationNeed.REQUIRED); + + if (ptl != null) { + CachedTablets cachedTablets = + tabletObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent); + while (cachedTablets != null && cachedTablets.getCachedTablets().isEmpty()) { + // try the next tablet, the current tablet does not have any tablets that overlap the row + Text er = ptl.getExtent().endRow(); + if (er != null && er.compareTo(lastTabletRow) < 0) { + // System.out.println("er "+er+" ltr "+lastTabletRow); + ptl = parent.findTablet(context, er, true, LocationNeed.REQUIRED); + if (ptl != null) { + cachedTablets = + tabletObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent); + } else { + break; + } + } else { + break; + } + } + + if (cachedTablets == null) { + return; + } + + // cannot assume the list contains contiguous key extents... so it is probably + // best to deal with each extent individually + + Text lastEndRow = null; + for (CachedTablet cachedTablet : cachedTablets.getCachedTablets()) { + + KeyExtent ke = cachedTablet.getExtent(); + CachedTablet locToCache; + + // create new location if current prevEndRow == endRow + if ((lastEndRow != null) && (ke.prevEndRow() != null) + && ke.prevEndRow().equals(lastEndRow)) { + locToCache = new CachedTablet(new KeyExtent(ke.tableId(), ke.endRow(), lastEndRow), + cachedTablet.getTserverLocation(), cachedTablet.getTserverSession(), + cachedTablet.getAvailability(), cachedTablet.wasHostingRequested()); + } else { + locToCache = cachedTablet; + } + + // save endRow for next iteration + lastEndRow = locToCache.getExtent().endRow(); + + updateCache(locToCache, lcSession); + } + } + + } + + private void updateCache(CachedTablet cachedTablet, LockCheckerSession lcSession) { + if (!cachedTablet.getExtent().tableId().equals(tableId)) { + // sanity check + throw new IllegalStateException( + "Unexpected extent returned " + tableId + " " + cachedTablet.getExtent()); + } + + // clear out any overlapping extents in cache + removeOverlapping(metaCache, cachedTablet.getExtent()); + + // do not add to cache unless lock is held + if (lcSession.checkLock(cachedTablet) == null) { + return; + } + + // add it to cache + Text er = cachedTablet.getExtent().endRow(); + if (er == null) { + er = MAX_TEXT; + } + metaCache.put(er, cachedTablet); + + if (!badExtents.isEmpty()) { + removeOverlapping(badExtents, cachedTablet.getExtent()); + } + } + + static void removeOverlapping(TreeMap<Text,CachedTablet> metaCache, KeyExtent nke) { + Iterator<Entry<Text,CachedTablet>> iter; + + if (nke.prevEndRow() == null) { + iter = metaCache.entrySet().iterator(); + } else { + Text row = rowAfterPrevRow(nke); + SortedMap<Text,CachedTablet> tailMap = metaCache.tailMap(row); + iter = tailMap.entrySet().iterator(); + } + + while (iter.hasNext()) { + Entry<Text,CachedTablet> entry = iter.next(); + + KeyExtent ke = entry.getValue().getExtent(); + + if (stopRemoving(nke, ke)) { + break; + } + + iter.remove(); + } + } + + private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) { + return ke.prevEndRow() != null && nke.endRow() != null + && ke.prevEndRow().compareTo(nke.endRow()) >= 0; + } + + private static Text rowAfterPrevRow(KeyExtent nke) { + Text row = new Text(nke.prevEndRow()); + row.append(new byte[] {0}, 0, 1); + return row; + } + + static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) { + for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) { + extents.remove(overlapping); + } + } + + private CachedTablet findTabletInCache(Text row) { + + Entry<Text,CachedTablet> entry = metaCache.ceilingEntry(row); + + if (entry != null) { + KeyExtent ke = entry.getValue().getExtent(); + if (ke.prevEndRow() == null || ke.prevEndRow().compareTo(row) < 0) { + return entry.getValue(); + } + } + return null; + } + + /** - * @param cacheCutoff Tablets w/o locations are cached. When LocationNeed is REQUIRED, this cut - * off is used to determine if cached entries w/o a location should be used or of we should - * instead ignore them and reread the tablet information from the metadata table. ++ * @param cacheCutoffTimer Tablets w/o locations are cached. When LocationNeed is REQUIRED, this ++ * Timer value is used to determine if cached entries w/o a location should be used or of ++ * we should instead ignore them and reread the tablet information from the metadata table. + */ + protected CachedTablet _findTablet(ClientContext context, Text row, boolean skipRow, + boolean retry, boolean lock, LockCheckerSession lcSession, LocationNeed locationNeed, - NanoTime cacheCutoff) throws AccumuloException, AccumuloSecurityException, ++ Timer cacheCutoffTimer) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + + if (skipRow) { + row = new Text(row); + row.append(new byte[] {0}, 0, 1); + } + + CachedTablet tl; + + if (lock) { + rLock.lock(); + try { + tl = processInvalidatedAndCheckLock(context, lcSession, row); + } finally { + rLock.unlock(); + } + } else { + tl = processInvalidatedAndCheckLock(context, lcSession, row); + } + + if (tl == null || (locationNeed == LocationNeed.REQUIRED && tl.getTserverLocation().isEmpty() - && tl.getCreationTime().compareTo(cacheCutoff) < 0)) { ++ && tl.getCreationTimer().elapsed(NANOSECONDS) > cacheCutoffTimer.elapsed(NANOSECONDS))) { ++ + // not in cache OR the cached entry was created before the cut off time, so obtain info from + // metadata table + if (lock) { + wLock.lock(); + try { + tl = lookupTabletLocationAndCheckLock(context, row, retry, lcSession); + } finally { + wLock.unlock(); + } + } else { + tl = lookupTabletLocationAndCheckLock(context, row, retry, lcSession); + } + } + + return tl; + } + + private CachedTablet lookupTabletLocationAndCheckLock(ClientContext context, Text row, + boolean retry, LockCheckerSession lcSession) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, InvalidTabletHostingRequestException { + lookupTablet(context, row, retry, lcSession); + return lcSession.checkLock(findTabletInCache(row)); + } + + private CachedTablet processInvalidatedAndCheckLock(ClientContext context, + LockCheckerSession lcSession, Text row) throws AccumuloSecurityException, AccumuloException, + TableNotFoundException, InvalidTabletHostingRequestException { + processInvalidated(context, lcSession); + return lcSession.checkLock(findTabletInCache(row)); + } + + @SuppressFBWarnings(value = {"UL_UNRELEASED_LOCK", "UL_UNRELEASED_LOCK_EXCEPTION_PATH"}, + justification = "locking is confusing, but probably correct") + private void processInvalidated(ClientContext context, LockCheckerSession lcSession) + throws AccumuloSecurityException, AccumuloException, TableNotFoundException, + InvalidTabletHostingRequestException { + + if (badExtents.isEmpty()) { + return; + } + + final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread(); + try { + if (!writeLockHeld) { + rLock.unlock(); + wLock.lock(); + if (badExtents.isEmpty()) { + return; + } + } + + List<Range> lookups = new ArrayList<>(badExtents.size()); + + for (KeyExtent be : badExtents) { + lookups.add(be.toMetaRange()); + removeOverlapping(metaCache, be); + } + + lookups = Range.mergeOverlapping(lookups); + + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); + + parent.findTablets(context, lookups, + (cachedTablet, range) -> addRange(binnedRanges, cachedTablet, range), + LocationNeed.REQUIRED); + + // randomize server order + ArrayList<String> tabletServers = new ArrayList<>(binnedRanges.keySet()); + Collections.shuffle(tabletServers); + + for (String tserver : tabletServers) { + List<CachedTablet> locations = + tabletObtainer.lookupTablets(context, tserver, binnedRanges.get(tserver), parent); + + for (CachedTablet cachedTablet : locations) { + updateCache(cachedTablet, lcSession); + } + } + } finally { + if (!writeLockHeld) { + rLock.lock(); + wLock.unlock(); + } + } + } + + static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, CachedTablet ct, + Range range) { + binnedRanges.computeIfAbsent(ct.getTserverLocation().orElseThrow(), k -> new HashMap<>()) + .computeIfAbsent(ct.getExtent(), k -> new ArrayList<>()).add(range); + } +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 329e432b9b,83793eced6..add5b7cf11 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -48,14 -42,11 +48,14 @@@ import java.util.stream.Stream import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.util.ShutdownUtil; ++import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -296,31 -209,21 +296,31 @@@ public class Fate<T> } } - private void undo(long tid, Repo<T> op) { + private void undo(FateId fateId, Repo<T> op) { try { - op.undo(tid, environment); + op.undo(fateId, environment); } catch (Exception e) { - log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e); + log.warn("Failed to undo Repo, " + fateId, e); } } + } - protected long executeIsReady(Long tid, Repo<T> op) throws Exception { - return op.isReady(tid, environment); + protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception { - var startTime = NanoTime.now(); ++ var startTime = Timer.startNew(); + var deferTime = op.isReady(fateId, environment); + log.debug("Running {}.isReady() {} took {} ms and returned {}", op.getName(), fateId, - startTime.elapsed().toMillis(), deferTime); ++ startTime.elapsed(MILLISECONDS), deferTime); + return deferTime; } - protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception { - return op.call(tid, environment); + protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception { - var startTime = NanoTime.now(); ++ var startTime = Timer.startNew(); + var next = op.call(fateId, environment); + log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), fateId, - startTime.elapsed().toMillis(), next == null ? "null" : next.getName()); ++ startTime.elapsed(MILLISECONDS), next == null ? "null" : next.getName()); + + return next; } /** diff --cc core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java index 91a1232954,43388052a3..d04d82714a --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java @@@ -20,6 -20,6 +20,7 @@@ package org.apache.accumulo.core.lock import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; ++import static java.util.concurrent.TimeUnit.SECONDS; import java.util.ArrayList; import java.util.List; @@@ -32,8 -32,6 +33,8 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; ++import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.UuidUtil; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@@ -558,17 -558,6 +559,17 @@@ public class ServiceLock implements Wat LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix, pathToDelete); ZooUtil.recursiveDelete(zooKeeper, pathToDelete, NodeMissingPolicy.SKIP); + // Wait for the delete to happen on the server before exiting method - NanoTime start = NanoTime.now(); ++ Timer start = Timer.startNew(); + while (zooKeeper.exists(pathToDelete, null) != null) { + Thread.onSpinWait(); - if (NanoTime.now().subtract(start).toSeconds() > 10) { - start = NanoTime.now(); ++ if (start.hasElapsed(10, SECONDS)) { ++ start.restart(); + LOG.debug("[{}] Still waiting for zookeeper to delete all at {}", vmLockPrefix, + pathToDelete); + } + } + localLw.lostLock(LockLossReason.LOCK_DELETED); } diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 5aa91afaa1,c993cf4cb1..057818b0ac --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -18,6 -18,6 +18,7 @@@ */ package org.apache.accumulo.core.util.compaction; ++import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; @@@ -47,8 -48,7 +48,8 @@@ import org.apache.accumulo.core.rpc.cli import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; ++import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@@ -278,10 -277,10 +279,10 @@@ public class ExternalCompactionUtil return runningIds; } - public static int countCompactors(String queueName, ClientContext context) { - long start = System.nanoTime(); - String queueRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + queueName; - List<String> children = context.getZooCache().getChildren(queueRoot); + public static int countCompactors(String groupName, ClientContext context) { - var start = NanoTime.now(); ++ var start = Timer.startNew(); + String groupRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + groupName; + List<String> children = context.getZooCache().getChildren(groupRoot); if (children == null) { return 0; } @@@ -295,11 -294,11 +296,11 @@@ } } - long elapsed = start.elapsed().toMillis(); - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); ++ long elapsed = start.elapsed(MILLISECONDS); if (elapsed > 100) { - LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, queueName); + LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, groupName); } else { - LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, queueName); + LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, groupName); } return count; diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 2fd823e447,335f583fd6..02917807cd --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -67,7 -69,8 +69,7 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; - import org.apache.accumulo.core.util.time.NanoTime; + import org.apache.accumulo.core.util.Timer; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index bf54437d73,b7b0470d58..ea8a90bfe4 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -99,11 -99,8 +100,10 @@@ import org.apache.accumulo.core.util.Ut import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.CompactionWatcher; import org.apache.accumulo.server.compaction.FileCompactor; diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index b32751e904,7537ebd44e..a4c9b5f574 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -19,6 -19,6 +19,7 @@@ package org.apache.accumulo.gc; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.io.FileNotFoundException; import java.io.IOException; @@@ -50,12 -44,9 +51,12 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; ++import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; import org.apache.accumulo.server.AbstractServer; @@@ -89,11 -78,8 +90,11 @@@ public class SimpleGarbageCollector ext private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); + private ServiceLock gcLock; - private NanoTime lastCompactorCheck = NanoTime.now(); ++ private Timer lastCompactorCheck = Timer.startNew(); + SimpleGarbageCollector(ConfigOpts opts, String[] args) { - super("gc", opts, args); + super("gc", opts, ServerContext::new, args); final AccumuloConfiguration conf = getConfiguration(); @@@ -305,25 -291,6 +306,25 @@@ gcCycleMetrics.incrementRunCycleCount(); long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); + - if (NanoTime.now().subtract(lastCompactorCheck).toMillis() > gcDelay * 3) { ++ if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) { + Map<String,Set<TableId>> resourceMapping = new HashMap<>(); + for (TableId tid : AccumuloTable.allTableIds()) { + TableConfiguration tconf = getContext().getTableConfiguration(tid); + String resourceGroup = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY); + resourceGroup = + resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup; + resourceMapping.getOrDefault(resourceGroup, new HashSet<>()).add(tid); + } + for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) { + if (ExternalCompactionUtil.countCompactors(e.getKey(), getContext()) == 0) { + log.warn("No Compactors exist in resource group {} for system table {}", e.getKey(), + e.getValue()); + } + } - lastCompactorCheck = NanoTime.now(); ++ lastCompactorCheck.restart(); + } + log.debug("Sleeping for {} milliseconds", gcDelay); Thread.sleep(gcDelay); } catch (InterruptedException e) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 72c79121e5,96f312d570..e3b99b1615 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -115,11 -107,11 +115,11 @@@ import org.apache.accumulo.core.spi.bal import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; + import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; @@@ -1066,10 -1179,10 +1066,10 @@@ public class Manager extends AbstractSe // wait at least 10 seconds final Duration timeToWait = Comparators.max(Duration.ofSeconds(10), Duration.ofMillis(rpcTimeout / 3)); - final NanoTime startTime = NanoTime.now(); + final Timer startTime = Timer.startNew(); // Wait for all tasks to complete while (!tasks.isEmpty()) { - boolean cancel = (startTime.elapsed().compareTo(timeToWait) > 0); - boolean cancel = (startTime.hasElapsed(timeToWait)); ++ boolean cancel = startTime.hasElapsed(timeToWait); Iterator<Future<?>> iter = tasks.iterator(); while (iter.hasNext()) { Future<?> f = iter.next(); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java index 59c010887f,0000000000..bd94afd15b mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java @@@ -1,154 -1,0 +1,156 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.availability; + ++import static java.util.concurrent.TimeUnit.MILLISECONDS; ++ +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; - import org.apache.accumulo.core.util.time.NanoTime; ++import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.Utils; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SetTabletAvailability extends ManagerRepo { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SetTabletAvailability.class); + + private final TableId tableId; + private final NamespaceId namespaceId; + private final TRange tRange; + private final TabletAvailability tabletAvailability; + + public SetTabletAvailability(TableId tableId, NamespaceId namespaceId, TRange range, + TabletAvailability tabletAvailability) { + this.tableId = tableId; + this.namespaceId = namespaceId; + this.tRange = range; + this.tabletAvailability = tabletAvailability; + } + + @Override + public long isReady(FateId fateId, Manager manager) throws Exception { + + if (manager.getContext().getTableState(tableId) != TableState.ONLINE) { + throw new AcceptableThriftTableOperationException(tableId.canonical(), null, + TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, "The table is not online."); + } + + final Range range = new Range(tRange); + LOG.debug("Finding tablets in Range: {} for table:{}", range, tableId); + + // For all practical purposes the start row is always inclusive, even if the key in the + // range is exclusive. For example the exclusive key row="a",family="b",qualifier="c" may + // exclude the column b:c, but it's still falls somewhere in the row "a". The only case where + // this + // would not be true is if the start key in a range is the last possible key in a row. The last + // possible key in a row would contain 2GB column fields of all 0xff, which is why we assume the + // row is always inclusive. + final Text scanRangeStart = (range.getStartKey() == null) ? null : range.getStartKey().getRow(); + + AtomicLong notAccepted = new AtomicLong(); + + Consumer<Ample.ConditionalResult> resultsConsumer = result -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + notAccepted.incrementAndGet(); + LOG.debug("{} failed to set tablet availability for {} '{}'", fateId, result.getExtent(), + Optional.ofNullable(result.readMetadata()).map(TabletMetadata::getOperationId) + .orElse(null)); + } + }; + - var start = NanoTime.now(); ++ var start = Timer.startNew(); + try ( + TabletsMetadata m = manager.getContext().getAmple().readTablets().forTable(tableId) + .overlapping(scanRangeStart, true, null).build(); + Ample.AsyncConditionalTabletsMutator mutator = + manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) { + for (TabletMetadata tm : m) { + final KeyExtent tabletExtent = tm.getExtent(); + LOG.trace("Evaluating tablet {} against range {}", tabletExtent, range); + if (scanRangeStart != null && tm.getEndRow() != null + && tm.getEndRow().compareTo(scanRangeStart) < 0) { + // the end row of this tablet is before the start row, skip it + LOG.trace("tablet {} is before scan start range: {}", tabletExtent, scanRangeStart); + throw new RuntimeException("Bug in ample or this code."); + } + + // Obtaining the end row from a range and knowing if the obtained row is inclusive or + // exclusive is really tricky depending on how the Range was created (using row or key + // constructors). So avoid trying to obtain an end row from the range and instead use + // range.afterKey below. + if (tm.getPrevEndRow() != null + && range.afterEndKey(new Key(tm.getPrevEndRow()).followingKey(PartialKey.ROW))) { + // the start row of this tablet is after the scan range, skip it + LOG.trace("tablet {} is after scan end range: {}", tabletExtent, range); + break; + } + + if (tm.getTabletAvailability() == tabletAvailability) { + LOG.trace("Skipping tablet: {}, tablet availability is already in required state", + tabletExtent); + continue; + } + + LOG.debug("Setting tablet availability to {} requested for: {} ", tabletAvailability, + tabletExtent); + mutator.mutateTablet(tabletExtent).requireAbsentOperation() + .putTabletAvailability(tabletAvailability) + .submit(tabletMeta -> tabletMeta.getTabletAvailability() == tabletAvailability); + } + } + + if (notAccepted.get() > 0) { - return Math.min(30000, Math.max(start.elapsed().toMillis(), 1)); ++ return Math.min(30000, Math.max(start.elapsed(MILLISECONDS), 1)); + } else { + return 0; + } + } + + @Override + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ); + Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE); + return null; + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index 7380aafe81,0000000000..ffaa6adcd2 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@@ -1,132 -1,0 +1,133 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + ++import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; - import org.apache.accumulo.core.util.time.NanoTime; ++import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class ReserveTablets extends ManagerRepo { + + private static final Logger log = LoggerFactory.getLogger(ReserveTablets.class); + + private static final long serialVersionUID = 1L; + + private final MergeInfo data; + + public ReserveTablets(MergeInfo data) { + this.data = data; + } + + @Override + public long isReady(FateId fateId, Manager env) throws Exception { + var range = data.getReserveExtent(); + log.debug("{} reserving tablets in range {}", fateId, range); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); + + AtomicLong opsAccepted = new AtomicLong(0); + Consumer<Ample.ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.ACCEPTED) { + opsAccepted.incrementAndGet(); + } + }; + + int count = 0; + int otherOps = 0; + int opsSet = 0; + int locations = 0; + int wals = 0; + - var startTime = NanoTime.now(); ++ var startTime = Timer.startNew(); + try ( + var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) + .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, LOCATION, LOGS, OPID) + .checkConsistency().build(); + var tabletsMutator = + env.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) { + + for (var tabletMeta : tablets) { + if (tabletMeta.getOperationId() == null) { + tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation() + .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId())); + opsSet++; + } else if (!tabletMeta.getOperationId().equals(opid)) { + otherOps++; + } + + if (tabletMeta.getLocation() != null) { + locations++; + } + + wals += tabletMeta.getLogs().size(); + + count++; + } + } - var maxSleepTime = Math.min(60000, startTime.elapsed().toMillis()); ++ var maxSleepTime = Math.min(60000, startTime.elapsed(MILLISECONDS)); + + log.debug( + "{} reserve tablets op:{} count:{} other opids:{} opids set:{} locations:{} accepted:{} wals:{}", + fateId, data.op, count, otherOps, opsSet, locations, opsAccepted, wals); + + // while there are table lock a tablet can be concurrently deleted, so should always see + // tablets + Preconditions.checkState(count > 0); + + if (locations > 0 && opsAccepted.get() > 0) { + // operation ids were set and tablets have locations, so lets send a signal to get them + // unassigned + env.getEventCoordinator().event(range, "Tablets %d were reserved for merge %s", + opsAccepted.get(), fateId); + } + + long sleepTime = Math.min(Math.max(1000, count), maxSleepTime); + if (locations > 0 || otherOps > 0 || wals > 0 || opsSet != opsAccepted.get()) { + // need to wait on these tablets, must return non-zero to indicate not ready so need to handle + // case of sleepTime being zero + return Math.max(1, sleepTime); + } + + // operations ids were set on all tablets and no tablets have locations, so ready + return 0; + } + + @Override + public Repo<Manager> call(FateId fateId, Manager environment) throws Exception { + return new CountFiles(data); + } +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index c8516d103d,bd5a935b26..7827358ca8 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -91,10 -90,9 +91,10 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.util.Halt; + import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; - import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java index 752260ee8e,fec30031d0..cf2ac7fe1c --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java @@@ -21,9 -21,12 +21,9 @@@ package org.apache.accumulo.tserver import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.TabletLoadState; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; - import org.apache.accumulo.core.util.time.NanoTime; + import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.manager.state.DistributedStoreException; import org.apache.accumulo.server.manager.state.TabletStateStore;