This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 3826ebd538 Log iteration to result count in BulkImport code (#5312)
3826ebd538 is described below
commit 3826ebd5380e95cc0dc4139ebaac2f39b7983500
Author: Dom G. <[email protected]>
AuthorDate: Tue Feb 25 17:35:56 2025 -0500
Log iteration to result count in BulkImport code (#5312)
* Log iteration to result count in BulkImport code
---
.../manager/tableOps/bulkVer2/LoadFiles.java | 45 ++++++++++++++++++++--
1 file changed, 41 insertions(+), 4 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index d2c6c9ce80..4d77f76d5a 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -23,6 +23,7 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -63,6 +64,7 @@ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -92,6 +94,7 @@ class LoadFiles extends ManagerRepo {
@Override
public long isReady(long tid, Manager manager) throws Exception {
+ log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir,
FateTxId.formatTid(tid));
if (manager.onlineTabletServers().isEmpty()) {
log.warn("There are no tablet server to process bulkDir import, waiting
(tid = "
+ FateTxId.formatTid(tid) + ")");
@@ -312,6 +315,16 @@ class LoadFiles extends ManagerRepo {
}
}
+ /**
+ * Stats for the loadFiles method. Helps track wasted time and iterations.
+ */
+ private static class ImportTimingStats {
+ Duration totalWastedTime = Duration.ZERO;
+ long wastedIterations = 0;
+ long tabletCount = 0;
+ long callCount = 0;
+ }
+
/**
* Make asynchronous load calls to each overlapping Tablet in the bulk
mapping. Return a sleep
* time to isReady based on a factor of the TabletServer with the most
Tablets. This method will
@@ -341,20 +354,33 @@ class LoadFiles extends ManagerRepo {
loader.start(bulkDir, manager, tid, bulkInfo.setTime);
- long t1 = System.currentTimeMillis();
+ ImportTimingStats importTimingStats = new ImportTimingStats();
+
+ Timer timer = Timer.startNew();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
List<TabletMetadata> tablets =
- findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter);
+ findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter,
importTimingStats);
loader.load(tablets, loadMapEntry.getValue());
}
+ Duration totalProcessingTime = timer.elapsed();
log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
+ if (importTimingStats.callCount > 0) {
+ log.debug(
+ "Bulk import stats for {} (tid = {}): processed {} tablets in {}
calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({}
nanos) or {}% of the processing time.",
+ bulkInfo.sourceDir, FateTxId.formatTid(tid),
importTimingStats.tabletCount,
+ importTimingStats.callCount, totalProcessingTime.toMillis(),
+ totalProcessingTime.toNanos(), importTimingStats.wastedIterations,
+ importTimingStats.totalWastedTime.toMillis(),
importTimingStats.totalWastedTime.toNanos(),
+ (importTimingStats.totalWastedTime.toNanos() * 100) /
totalProcessingTime.toNanos());
+ }
+
long sleepTime = loader.finish();
if (sleepTime > 0) {
log.trace("{}: Tablet Max Sleep is {}", fmtTid, sleepTime);
- long scanTime = Math.min(System.currentTimeMillis() - t1, 30_000);
+ long scanTime = Math.min(totalProcessingTime.toMillis(), 30_000);
log.trace("{}: Scan time is {}", fmtTid, scanTime);
sleepTime = Math.max(sleepTime, scanTime * 2);
}
@@ -369,7 +395,7 @@ class LoadFiles extends ManagerRepo {
* Find all the tablets within the provided bulk load mapping range.
*/
private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent
loadRange,
- Iterator<TabletMetadata> tabletIter) {
+ Iterator<TabletMetadata> tabletIter, ImportTimingStats
importTimingStats) {
TabletMetadata currTablet = null;
@@ -381,12 +407,18 @@ class LoadFiles extends ManagerRepo {
int cmp;
+ long wastedIterations = 0;
+ Timer timer = Timer.startNew();
+
// skip tablets until we find the prevEndRow of loadRange
while ((cmp = PREV_COMP.compare(currTablet.getPrevEndRow(),
loadRange.prevEndRow())) < 0) {
+ wastedIterations++;
log.trace("{}: Skipping tablet: {}", fmtTid, currTablet.getExtent());
currTablet = tabletIter.next();
}
+ Duration wastedTime = timer.elapsed();
+
if (cmp != 0) {
throw new IllegalStateException(
"Unexpected prev end row " + currTablet.getExtent() + " " +
loadRange);
@@ -407,6 +439,11 @@ class LoadFiles extends ManagerRepo {
throw new IllegalStateException("Unexpected end row " + currTablet + "
" + loadRange);
}
+ importTimingStats.wastedIterations += wastedIterations;
+ importTimingStats.totalWastedTime =
importTimingStats.totalWastedTime.plus(wastedTime);
+ importTimingStats.tabletCount += tablets.size();
+ importTimingStats.callCount++;
+
return tablets;
} catch (NoSuchElementException e) {
NoSuchElementException ne2 = new NoSuchElementException(