This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 3826ebd538 Log iteration to result count in BulkImport code (#5312) 3826ebd538 is described below commit 3826ebd5380e95cc0dc4139ebaac2f39b7983500 Author: Dom G. <domgargu...@apache.org> AuthorDate: Tue Feb 25 17:35:56 2025 -0500 Log iteration to result count in BulkImport code (#5312) * Log iteration to result count in BulkImport code --- .../manager/tableOps/bulkVer2/LoadFiles.java | 45 ++++++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index d2c6c9ce80..4d77f76d5a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -63,6 +64,7 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.PeekingIterator; import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.server.fs.VolumeManager; @@ -92,6 +94,7 @@ class LoadFiles extends ManagerRepo { @Override public long isReady(long tid, Manager manager) throws Exception { + log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir, FateTxId.formatTid(tid)); if (manager.onlineTabletServers().isEmpty()) { log.warn("There are no tablet server to process bulkDir import, waiting (tid = " + FateTxId.formatTid(tid) + ")"); @@ -312,6 +315,16 @@ class LoadFiles extends ManagerRepo { } } + /** + * Stats for the loadFiles method. Helps track wasted time and iterations. + */ + private static class ImportTimingStats { + Duration totalWastedTime = Duration.ZERO; + long wastedIterations = 0; + long tabletCount = 0; + long callCount = 0; + } + /** * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep * time to isReady based on a factor of the TabletServer with the most Tablets. This method will @@ -341,20 +354,33 @@ class LoadFiles extends ManagerRepo { loader.start(bulkDir, manager, tid, bulkInfo.setTime); - long t1 = System.currentTimeMillis(); + ImportTimingStats importTimingStats = new ImportTimingStats(); + + Timer timer = Timer.startNew(); while (lmi.hasNext()) { loadMapEntry = lmi.next(); List<TabletMetadata> tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter); + findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats); loader.load(tablets, loadMapEntry.getValue()); } + Duration totalProcessingTime = timer.elapsed(); log.trace("{}: Completed Finding Overlapping Tablets", fmtTid); + if (importTimingStats.callCount > 0) { + log.debug( + "Bulk import stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time.", + bulkInfo.sourceDir, FateTxId.formatTid(tid), importTimingStats.tabletCount, + importTimingStats.callCount, totalProcessingTime.toMillis(), + totalProcessingTime.toNanos(), importTimingStats.wastedIterations, + importTimingStats.totalWastedTime.toMillis(), importTimingStats.totalWastedTime.toNanos(), + (importTimingStats.totalWastedTime.toNanos() * 100) / totalProcessingTime.toNanos()); + } + long sleepTime = loader.finish(); if (sleepTime > 0) { log.trace("{}: Tablet Max Sleep is {}", fmtTid, sleepTime); - long scanTime = Math.min(System.currentTimeMillis() - t1, 30_000); + long scanTime = Math.min(totalProcessingTime.toMillis(), 30_000); log.trace("{}: Scan time is {}", fmtTid, scanTime); sleepTime = Math.max(sleepTime, scanTime * 2); } @@ -369,7 +395,7 @@ class LoadFiles extends ManagerRepo { * Find all the tablets within the provided bulk load mapping range. */ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loadRange, - Iterator<TabletMetadata> tabletIter) { + Iterator<TabletMetadata> tabletIter, ImportTimingStats importTimingStats) { TabletMetadata currTablet = null; @@ -381,12 +407,18 @@ class LoadFiles extends ManagerRepo { int cmp; + long wastedIterations = 0; + Timer timer = Timer.startNew(); + // skip tablets until we find the prevEndRow of loadRange while ((cmp = PREV_COMP.compare(currTablet.getPrevEndRow(), loadRange.prevEndRow())) < 0) { + wastedIterations++; log.trace("{}: Skipping tablet: {}", fmtTid, currTablet.getExtent()); currTablet = tabletIter.next(); } + Duration wastedTime = timer.elapsed(); + if (cmp != 0) { throw new IllegalStateException( "Unexpected prev end row " + currTablet.getExtent() + " " + loadRange); @@ -407,6 +439,11 @@ class LoadFiles extends ManagerRepo { throw new IllegalStateException("Unexpected end row " + currTablet + " " + loadRange); } + importTimingStats.wastedIterations += wastedIterations; + importTimingStats.totalWastedTime = importTimingStats.totalWastedTime.plus(wastedTime); + importTimingStats.tabletCount += tablets.size(); + importTimingStats.callCount++; + return tablets; } catch (NoSuchElementException e) { NoSuchElementException ne2 = new NoSuchElementException(