This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 0f4d1bbb8b Track lastAccessTime in Tablet, not in TabletServer collection (#3855) 0f4d1bbb8b is described below commit 0f4d1bbb8bd7587fbf436515461ffc7492fc27fd Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Oct 18 15:32:15 2023 -0400 Track lastAccessTime in Tablet, not in TabletServer collection (#3855) Fixes #3263 --- .../core/tabletserver/UnloaderParamsImpl.java | 5 +- .../apache/accumulo/tserver/AssignmentHandler.java | 5 +- .../org/apache/accumulo/tserver/TabletServer.java | 75 +++++----------------- .../accumulo/tserver/UnloadTabletHandler.java | 3 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 11 ++++ .../tserver/DefaultOnDemandTabletUnloaderTest.java | 7 +- 6 files changed, 32 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/UnloaderParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/UnloaderParamsImpl.java index 7874b0cdb2..b8f751ea36 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/UnloaderParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/UnloaderParamsImpl.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; @@ -38,12 +37,12 @@ public class UnloaderParamsImpl implements UnloaderParams { private final SortedMap<TabletId,Long> online; private final Set<KeyExtent> unloads; - public UnloaderParamsImpl(TableId tid, ServiceEnvironment env, Map<KeyExtent,AtomicLong> online, + public UnloaderParamsImpl(TableId tid, ServiceEnvironment env, Map<KeyExtent,Long> online, Set<KeyExtent> unload) { this.tid = tid; this.env = env; this.online = new TreeMap<>(); - online.forEach((k, v) -> this.online.put(new TabletIdImpl(k), v.get())); + online.forEach((k, v) -> this.online.put(new TabletIdImpl(k), v)); this.unloads = unload; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index 3bbbd4c1eb..0b4b2f21ff 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -27,7 +27,6 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.TabletLoadState; @@ -195,6 +194,7 @@ class AssignmentHandler implements Runnable { server.recentlyUnloadedCache.remove(tablet.getExtent()); } } + tablet = null; // release this reference successful = true; } catch (Exception e) { @@ -213,9 +213,6 @@ class AssignmentHandler implements Runnable { if (successful) { server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent)); - if (tabletMetadata.getHostingGoal() == TabletHostingGoal.ONDEMAND) { - server.insertOnDemandAccessTime(extent); - } } else { synchronized (server.unopenedTablets) { synchronized (server.openingTablets) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 0555aa72cf..8a68b2a7cc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -156,8 +156,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.net.HostAndPort; -import io.opentelemetry.context.Scope; - public class TabletServer extends AbstractServer implements TabletHostingServer { private static final Logger log = LoggerFactory.getLogger(TabletServer.class); @@ -195,8 +193,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private final AtomicLong syncCounter = new AtomicLong(0); final OnlineTablets onlineTablets = new OnlineTablets(); - private final Map<KeyExtent,AtomicLong> onDemandTabletAccessTimes = - Collections.synchronizedMap(new HashMap<>()); final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>()); final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>()); final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap<>(1000)); @@ -1000,8 +996,8 @@ public class TabletServer extends AbstractServer implements TabletHostingServer @Override public Tablet getOnlineTablet(KeyExtent extent) { Tablet t = onlineTablets.snapshot().get(extent); - if (t != null && t.isOnDemand()) { - updateOnDemandAccessTime(extent); + if (t != null) { + t.setLastAccessTime(); } return t; } @@ -1142,28 +1138,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer return onDemandUnloadedLowMemory.get(); } - // called from AssignmentHandler - public void insertOnDemandAccessTime(KeyExtent extent) { - if (extent.isMeta()) { - return; - } - onDemandTabletAccessTimes.putIfAbsent(extent, new AtomicLong(System.nanoTime())); - } - - // called from getOnlineExtent - private void updateOnDemandAccessTime(KeyExtent extent) { - final long currentTime = System.nanoTime(); - AtomicLong l = onDemandTabletAccessTimes.get(extent); - if (l != null) { - l.set(currentTime); - } - } - - // called from UnloadTabletHandler - public void removeOnDemandAccessTime(KeyExtent extent) { - onDemandTabletAccessTimes.remove(extent); - } - private boolean isTabletInUse(KeyExtent extent) { // Don't call getOnlineTablet as that will update the last access time final Tablet t = onlineTablets.snapshot().get(extent); @@ -1182,42 +1156,29 @@ public class TabletServer extends AbstractServer implements TabletHostingServer final SortedMap<KeyExtent,Tablet> online = getOnlineTablets(); - // Find and remove access time entries for KeyExtents - // that are no longer in the onlineTablets collection - Set<KeyExtent> missing = onDemandTabletAccessTimes.keySet().stream() - .filter(k -> !online.containsKey(k)).collect(Collectors.toSet()); - if (!missing.isEmpty()) { - log.debug("Removing onDemandAccessTimes for tablets as tablets no longer online: {}", - missing); - missing.forEach(onDemandTabletAccessTimes::remove); - if (onDemandTabletAccessTimes.isEmpty()) { - return; - } - } - - // It's possible, from a tablet split or merge for example, - // that there is an on-demand tablet that is hosted for which - // we have no access time. Add any missing online on-demand - // tablets - online.forEach((k, v) -> { - if (v.isOnDemand() && !onDemandTabletAccessTimes.containsKey(k)) { - insertOnDemandAccessTime(k); + // Sort the extents so that we can process them by table. + final SortedMap<KeyExtent,Long> sortedOnDemandExtents = new TreeMap<>(); + // We only want to operate on OnDemand Tablets + online.entrySet().forEach((e) -> { + if (e.getValue().isOnDemand()) { + sortedOnDemandExtents.put(e.getKey(), e.getValue().getLastAccessTime()); } }); - log.debug("Evaluating online on-demand tablets: {}", onDemandTabletAccessTimes); - - if (onDemandTabletAccessTimes.isEmpty()) { + if (sortedOnDemandExtents.isEmpty()) { return; } + log.debug("Evaluating online on-demand tablets: {}", sortedOnDemandExtents); + // If the TabletServer is running low on memory, don't call the SPI // plugin to evaluate which on-demand tablets to unload, just get the // on-demand tablet with the oldest access time and unload it. if (getContext().getLowMemoryDetector().isRunningLowOnMemory()) { final SortedMap<Long,KeyExtent> timeSortedOnDemandExtents = new TreeMap<>(); - onDemandTabletAccessTimes.forEach((k, v) -> timeSortedOnDemandExtents.put(v.get(), k)); - Long oldestAccessTime = timeSortedOnDemandExtents.firstKey(); + long currTime = System.nanoTime(); + sortedOnDemandExtents.forEach((k, v) -> timeSortedOnDemandExtents.put(v - currTime, k)); + Long oldestAccessTime = timeSortedOnDemandExtents.lastKey(); KeyExtent oldestKeyExtent = timeSortedOnDemandExtents.get(oldestAccessTime); log.warn("Unloading on-demand tablet: {} for table: {} due to low memory", oldestKeyExtent, oldestKeyExtent.tableId()); @@ -1226,12 +1187,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer return; } - // onDemandTabletAccessTimes is a HashMap. Sort the extents - // so that we can process them by table. - final SortedMap<KeyExtent,AtomicLong> sortedOnDemandExtents = - new TreeMap<KeyExtent,AtomicLong>(); - sortedOnDemandExtents.putAll(onDemandTabletAccessTimes); - // The access times are updated when getOnlineTablet is called by other methods, // but may not necessarily capture whether or not the Tablet is currently being used. // For example, getOnlineTablet is called from startScan but not from continueScan. @@ -1276,7 +1231,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer }); tableIds.forEach(tid -> { Map<KeyExtent, - AtomicLong> subset = sortedOnDemandExtents.entrySet().stream() + Long> subset = sortedOnDemandExtents.entrySet().stream() .filter((e) -> e.getKey().tableId().equals(tid)) .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); Set<KeyExtent> onDemandTabletsToUnload = new HashSet<>(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java index 4a6081cd07..fc7bbcca39 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java @@ -107,9 +107,6 @@ class UnloadTabletHandler implements Runnable { // exceptions server.recentlyUnloadedCache.put(extent, System.currentTimeMillis()); server.onlineTablets.remove(extent); - if (t.isOnDemand()) { - server.removeOnDemandAccessTime(extent); - } try { TServerInstance instance = server.getTabletSession(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 608603cea5..38853da19d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -178,6 +178,8 @@ public class Tablet extends TabletBase { private final int logId; + private volatile long lastAccessTime = System.nanoTime(); + public int getLogId() { return logId; } @@ -1502,4 +1504,13 @@ public class Tablet extends TabletBase { scanfileManager.removeFilesAfterScan(getMetadata().getScans()); } } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void setLastAccessTime() { + this.lastAccessTime = System.nanoTime(); + } + } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/DefaultOnDemandTabletUnloaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/DefaultOnDemandTabletUnloaderTest.java index eeb27d75b8..3db9c0e3e8 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/DefaultOnDemandTabletUnloaderTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/DefaultOnDemandTabletUnloaderTest.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -63,13 +62,13 @@ public class DefaultOnDemandTabletUnloaderTest { expect(tconf.get(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD)) .andReturn(inactivityTimeSeconds); expect(tconf.newDeriver(anyObject())).andReturn(Map::of).anyTimes(); - Map<KeyExtent,AtomicLong> online = new HashMap<>(); + Map<KeyExtent,Long> online = new HashMap<>(); // add an extent whose last access time is less than the currentTime - inactivityTime final KeyExtent activeExtent = new KeyExtent(tid, new Text("m"), new Text("a")); - online.put(activeExtent, new AtomicLong(currentTime - inactivityTime - 10)); + online.put(activeExtent, currentTime - inactivityTime - 10); // add an extent whose last access time is greater than the currentTime - inactivityTime final KeyExtent inactiveExtent = new KeyExtent(tid, new Text("z"), new Text("m")); - online.put(inactiveExtent, new AtomicLong(currentTime - inactivityTime + 10)); + online.put(inactiveExtent, currentTime - inactivityTime + 10); Set<KeyExtent> onDemandTabletsToUnload = new HashSet<>(); replay(context, tconf);