This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 83d2dc24e87641171b2aef3fa3e4e35dc94cf279 Merge: 6622274056 e4127a5014 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Sep 14 19:45:56 2023 -0400 Merge branch 'main' into elasticity .../accumulo/core/conf/AccumuloConfiguration.java | 4 + .../org/apache/accumulo/core/conf/Property.java | 5 + .../thrift/TabletIngestClientService.java | 1631 -------------------- core/src/main/thrift/tabletingest.thrift | 13 - .../apache/accumulo/server/rpc/TServerUtils.java | 63 +- .../AccumuloConfigurationIsPropertySetTest.java | 302 ++++ .../apache/accumulo/gc/SimpleGarbageCollector.java | 3 +- .../accumulo/tserver/TabletClientHandler.java | 93 -- .../org/apache/accumulo/tserver/tablet/Tablet.java | 36 +- .../org/apache/accumulo/test/BatchWriterIT.java | 102 -- .../test/compaction/CompactionConfigChangeIT.java | 103 ++ .../accumulo/test/functional/ZombieTServer.java | 10 +- .../accumulo/test/performance/NullTserver.java | 9 +- 13 files changed, 486 insertions(+), 1888 deletions(-) diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index d75514b7b6,0273e3951e..cdede7a06a --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@@ -18,7 -18,7 +18,6 @@@ */ package org.apache.accumulo.tserver; - import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toList; import java.io.IOException; @@@ -83,11 -87,14 +82,9 @@@ import org.apache.accumulo.core.summary import org.apache.accumulo.core.summary.SummaryCollection; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.tablet.thrift.TabletManagementClientService; - import org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException; -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; import org.apache.accumulo.core.tabletingest.thrift.TDurability; import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; -import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; --import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; -import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index c3e424c11f,ec949e5e49..60ffaaeb4b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -1353,144 -2184,28 +1352,155 @@@ public class Tablet extends TabletBase return timer.getTabletStats(); } - private static String createTabletDirectoryName(ServerContext context, Text endRow) { - if (endRow == null) { - return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; - } else { - UniqueNameAllocator namer = context.getUniqueNameAllocator(); - return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName(); - } + public boolean isOnDemand() { + // TODO a change in the hosting goal could refresh online tablets + return getMetadata().getHostingGoal() == TabletHostingGoal.ONDEMAND; } - public Set<Long> getBulkIngestedTxIds() { - return bulkImported.keySet(); - } + // The purpose of this lock is to prevent race conditions between concurrent refresh RPC calls and + // between minor compactions and refresh calls. + private final Lock refreshLock = new ReentrantLock(); + + void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile, + ReferencedTabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, + long flushId) { + Optional<StoredTabletFile> newFile; + // rename before putting in metadata table, so files in metadata table should + // always exist + boolean attemptedRename = false; + VolumeManager vm = getTabletServer().getContext().getVolumeManager(); + do { + try { + if (dfv.getNumEntries() == 0) { + log.debug("No data entries so delete temporary file {}", tmpDatafile); + vm.deleteRecursively(tmpDatafile.getPath()); + } else { + if (!attemptedRename && vm.exists(newDatafile.getPath())) { + log.warn("Target data file already exist {}", newDatafile); + throw new RuntimeException("File unexpectedly exists " + newDatafile.getPath()); + } + // the following checks for spurious rename failures that succeeded but gave an IoE + if (attemptedRename && vm.exists(newDatafile.getPath()) + && !vm.exists(tmpDatafile.getPath())) { + // seems like previous rename succeeded, so break + break; + } + attemptedRename = true; + ScanfileManager.rename(vm, tmpDatafile.getPath(), newDatafile.getPath()); + } + break; + } catch (IOException ioe) { + log.warn("Tablet " + getExtent() + " failed to rename " + newDatafile + + " after MinC, will retry in 60 secs...", ioe); + sleepUninterruptibly(1, TimeUnit.MINUTES); + } + } while (true); + + // The refresh lock must be held for the metadata write that adds the new file to the tablet. + // This prevents a concurrent refresh operation from pulling in the new tablet file before the + // in memory map reference related to the file is deactivated. Scans should use one of the in + // memory map or the new file, never both. ++ Preconditions.checkState(!getLogLock().isHeldByCurrentThread()); + refreshLock.lock(); + try { - Set<String> unusedWalLogs = beginClearingUnusedLogs(); ++ // Can not hold tablet lock while acquiring the log lock. The following check is there to ++ // prevent deadlock. ++ Preconditions.checkState(!Thread.holdsLock(this)); ++ getLogLock().lock(); ++ // do not place any code here between lock and try + try { ++ // The following call pairs with tablet.finishClearingUnusedLogs() later in this block. If ++ // moving where the following method is called, examine it and finishClearingUnusedLogs() ++ // before moving. ++ Set<String> unusedWalLogs = beginClearingUnusedLogs(); + // the order of writing to metadata and walog is important in the face of machine/process + // failures need to write to metadata before writing to walog, when things are done in the + // reverse order data could be lost... the minor compaction start event should be written + // before the following metadata write is made + + newFile = updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, dfv, + unusedWalLogs, flushId); - } finally { ++ + finishClearingUnusedLogs(); ++ } finally { ++ getLogLock().unlock(); + } + + // Without the refresh lock, if a refresh happened here it could make the new file written to + // the metadata table above available for scans while the in memory map from which the file + // was produced is still available for scans - public void cleanupBulkLoadedFiles(Set<Long> tids) { - bulkImported.keySet().removeAll(tids); + do { + try { + // the purpose of making this update use the new commit session, instead of the old one + // passed in, is because the new one will reference the logs used by current memory... + getTabletServer().minorCompactionFinished(getTabletMemory().getCommitSession(), + commitSession.getWALogSeq() + 2); + break; + } catch (IOException e) { + log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e); + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } while (true); + + refreshMetadata(RefreshPurpose.MINC_COMPLETION); + } finally { + refreshLock.unlock(); + } + TabletLogger.flushed(getExtent(), newFile); + + long splitSize = getTableConfiguration().getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + if (dfv.getSize() > splitSize) { + log.debug(String.format("Minor Compaction wrote out file larger than split threshold." + + " split threshold = %,d file size = %,d", splitSize, dfv.getSize())); + } } - public String getDirName() { - return dirName; + public enum RefreshPurpose { + MINC_COMPLETION, REFRESH_RPC, FLUSH_ID_UPDATE } - public Compactable asCompactable() { - return compactable; + public void refreshMetadata(RefreshPurpose refreshPurpose) { + refreshLock.lock(); + try { + + // do not want to hold tablet lock while doing metadata read as this could negatively impact + // scans + TabletMetadata tabletMetadata = getContext().getAmple().readTablet(getExtent()); + + synchronized (this) { + latestMetadata = tabletMetadata; + + if (refreshPurpose == RefreshPurpose.MINC_COMPLETION) { + // Atomically replace the in memory map with the new file. Before this synch block a scan + // starting would see the in memory map. After this synch block it should see the file in + // the tabletMetadata. Scans sync on the tablet also, so they can not be in this code + // block at the same time. + + lastLocation = null; + tabletMemory.finishedMinC(); + + // the files and in memory map changed, incrementing this will cause scans to switch data + // sources + dataSourceDeletions.incrementAndGet(); + + // important to call this after updating latestMetadata and tabletMemory + computeNumEntries(); + } else if (!latestMetadata.getFilesMap().equals(tabletMetadata.getFilesMap())) { + + // the files changed, incrementing this will cause scans to switch data sources + dataSourceDeletions.incrementAndGet(); + + // important to call this after updating latestMetadata + computeNumEntries(); + } + } + } finally { + refreshLock.unlock(); + } + + if (refreshPurpose == RefreshPurpose.REFRESH_RPC) { + scanfileManager.removeFilesAfterScan(getMetadata().getScans()); + } } }