This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f844dc0724 adds per scan tracing statistics (#6010)
f844dc0724 is described below

commit f844dc0724300fa90618a0de9366b911a8ffa4c8
Author: Keith Turner <[email protected]>
AuthorDate: Tue Dec 16 11:53:40 2025 -0800

    adds per scan tracing statistics (#6010)
    
    Added per scan tracing statistics for the following :
    
     * The count of compressed bytes read from DFS (when all data is read
       from cache this will be zero).
     * The count of uncompressed bytes read from DFS or cache
     * The count of bytes returned (after iterators filter)
     * The count of key/values read before any filtering
     * The count of key/values returned after filtering
     * The count of seeks done
     * Statistics for cache hits, missed, and bypasses.  A bypass
       is when a rfile block is read w/o using the cache.
    
    The statistics are included in a tracing span that wraps reading each
    each batch of key values in tablet or scan server.  So if a scan reads
    10 batches of key values, then 10 spans will be emitted for tracing
    data.  Each span will included the statistics for that batch.
    
    
    Co-authored-by: Dave Marion <[email protected]>
---
 core/pom.xml                                       |   1 +
 .../core/file/blockfile/cache/BlockCacheUtil.java  |  38 +++
 .../blockfile/cache/InstrumentedBlockCache.java    | 107 ++++++
 .../file/blockfile/impl/CachableBlockFile.java     |  45 ++-
 .../file/blockfile/impl/ScanCacheProvider.java     |   6 +-
 .../org/apache/accumulo/core/file/rfile/RFile.java |  10 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    |  25 +-
 .../core/iteratorsImpl/system/StatsIterator.java   |  57 +++-
 .../accumulo/core/summary/SummaryReader.java       |   6 +-
 .../accumulo/core/trace/ScanInstrumentation.java   | 145 ++++++++
 .../core/trace/ScanInstrumentationImpl.java        |  96 ++++++
 .../accumulo/core/trace/TraceAttributes.java       |  53 +++
 .../accumulo/core/util/CountingInputStream.java    | 102 ++++++
 pom.xml                                            |  14 +
 .../accumulo/tserver/TabletHostingServer.java      |   3 +
 .../org/apache/accumulo/tserver/TabletServer.java  |   5 +-
 .../accumulo/tserver/tablet/ScanDataSource.java    |  46 ++-
 .../apache/accumulo/tserver/tablet/Scanner.java    |  27 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   4 +-
 .../apache/accumulo/tserver/tablet/TabletBase.java |  50 ++-
 test/pom.xml                                       |  21 ++
 .../java/org/apache/accumulo/test/TestIngest.java  |  12 +-
 .../accumulo/test/tracing/ScanTraceClient.java     | 152 +++++++++
 .../accumulo/test/tracing/ScanTracingIT.java       | 369 +++++++++++++++++++++
 .../org/apache/accumulo/test/tracing/SpanData.java |  43 +++
 .../accumulo/test/tracing/TraceCollector.java      | 106 ++++++
 26 files changed, 1494 insertions(+), 49 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 9710b0afc0..78945cc5dd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -207,6 +207,7 @@
                 <inlineHeader>${accumulo.build.license.header}</inlineHeader>
                 <excludes>
                   
<exclude>src/main/java/org/apache/accumulo/core/bloomfilter/*.java</exclude>
+                  
<exclude>src/main/java/org/apache/accumulo/core/util/CountingInputStream.java</exclude>
                   
<exclude>src/main/java/org/apache/accumulo/core/util/HostAndPort.java</exclude>
                   <exclude>src/test/resources/*.jceks</exclude>
                   
<exclude>src/test/resources/org/apache/accumulo/core/file/rfile/*.rf</exclude>
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
new file mode 100644
index 0000000000..53c2ce9e7a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import org.apache.accumulo.core.logging.LoggingBlockCache;
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+public class BlockCacheUtil {
+  public static BlockCache instrument(CacheType type, BlockCache cache) {
+    if (cache == null) {
+      return null;
+    }
+
+    if (cache instanceof InstrumentedBlockCache || cache instanceof 
LoggingBlockCache) {
+      // its already instrumented
+      return cache;
+    }
+
+    return LoggingBlockCache.wrap(type, InstrumentedBlockCache.wrap(type, 
cache));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
new file mode 100644
index 0000000000..402136c208
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.cache.CacheEntry;
+import org.apache.accumulo.core.spi.cache.CacheType;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+
+public class InstrumentedBlockCache implements BlockCache {
+
+  private final BlockCache blockCache;
+  private final ScanInstrumentation scanInstrumentation;
+  private final CacheType cacheType;
+
+  public InstrumentedBlockCache(CacheType cacheType, BlockCache blockCache,
+      ScanInstrumentation scanInstrumentation) {
+    this.blockCache = blockCache;
+    this.scanInstrumentation = scanInstrumentation;
+    this.cacheType = cacheType;
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buf) {
+    return blockCache.cacheBlock(blockName, buf);
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    return blockCache.getBlock(blockName);
+  }
+
+  private final class CountingLoader implements Loader {
+
+    private final Loader loader;
+    int loadCount = 0;
+
+    private CountingLoader(Loader loader) {
+      this.loader = loader;
+    }
+
+    @Override
+    public Map<String,Loader> getDependencies() {
+      return loader.getDependencies();
+    }
+
+    @Override
+    public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+      loadCount++;
+      return loader.load(maxSize, dependencies);
+    }
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName, Loader loader) {
+    var cl = new CountingLoader(loader);
+    var ce = blockCache.getBlock(blockName, cl);
+    if (cl.loadCount == 0 && ce != null) {
+      scanInstrumentation.incrementCacheHit(cacheType);
+    } else {
+      scanInstrumentation.incrementCacheMiss(cacheType);
+    }
+    return ce;
+  }
+
+  @Override
+  public long getMaxHeapSize() {
+    return blockCache.getMaxHeapSize();
+  }
+
+  @Override
+  public long getMaxSize() {
+    return blockCache.getMaxSize();
+  }
+
+  @Override
+  public Stats getStats() {
+    return blockCache.getStats();
+  }
+
+  public static BlockCache wrap(CacheType cacheType, BlockCache cache) {
+    var si = ScanInstrumentation.get();
+    if (cache != null && si.enabled()) {
+      return new InstrumentedBlockCache(cacheType, cache, si);
+    } else {
+      return cache;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 854c74154b..f694740d96 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -38,7 +38,10 @@ import 
org.apache.accumulo.core.file.streams.RateLimitedInputStream;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
+import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.util.CountingInputStream;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -405,6 +408,7 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
+      incrementCacheBypass(CacheType.INDEX);
       return new CachedBlockRead(_currBlock);
     }
 
@@ -421,6 +425,7 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile(null).getDataBlock(offset, 
compressedSize, rawSize);
+      incrementCacheBypass(CacheType.INDEX);
       return new CachedBlockRead(_currBlock);
     }
 
@@ -443,6 +448,7 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
+      incrementCacheBypass(CacheType.DATA);
       return new CachedBlockRead(_currBlock);
     }
 
@@ -459,9 +465,14 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile().getDataBlock(offset, 
compressedSize, rawSize);
+      incrementCacheBypass(CacheType.DATA);
       return new CachedBlockRead(_currBlock);
     }
 
+    private void incrementCacheBypass(CacheType cacheType) {
+      ScanInstrumentation.get().incrementCacheBypass(cacheType);
+    }
+
     @Override
     public synchronized void close() throws IOException {
       if (closed) {
@@ -491,12 +502,22 @@ public class CachableBlockFile {
   }
 
   public static class CachedBlockRead extends DataInputStream {
+
+    private static InputStream wrapForTrace(InputStream inputStream) {
+      var scanInstrumentation = ScanInstrumentation.get();
+      if (scanInstrumentation.enabled()) {
+        return new CountingInputStream(inputStream);
+      } else {
+        return inputStream;
+      }
+    }
+
     private final SeekableByteArrayInputStream seekableInput;
     private final CacheEntry cb;
     boolean indexable;
 
     public CachedBlockRead(InputStream in) {
-      super(in);
+      super(wrapForTrace(in));
       cb = null;
       seekableInput = null;
       indexable = false;
@@ -507,7 +528,7 @@ public class CachableBlockFile {
     }
 
     private CachedBlockRead(SeekableByteArrayInputStream seekableInput, 
CacheEntry cb) {
-      super(seekableInput);
+      super(wrapForTrace(seekableInput));
       this.seekableInput = seekableInput;
       this.cb = cb;
       indexable = true;
@@ -536,5 +557,25 @@ public class CachableBlockFile {
     public void indexWeightChanged() {
       cb.indexWeightChanged();
     }
+
+    public void flushStats() {
+      if (in instanceof CountingInputStream) {
+        var cin = ((CountingInputStream) in);
+        
ScanInstrumentation.get().incrementUncompressedBytesRead(cin.getCount());
+        cin.resetCount();
+        var src = cin.getWrappedStream();
+        if (src instanceof BlockReader) {
+          var br = (BlockReader) src;
+          br.flushStats();
+        }
+
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      flushStats();
+      super.close();
+    }
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
index 78b8600b37..155b874326 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.core.file.blockfile.impl;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.logging.LoggingBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.spi.scan.ScanDispatch;
@@ -33,8 +33,8 @@ public class ScanCacheProvider implements CacheProvider {
   public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDispatch 
dispatch,
       BlockCache indexCache, BlockCache dataCache) {
 
-    var loggingIndexCache = LoggingBlockCache.wrap(CacheType.INDEX, 
indexCache);
-    var loggingDataCache = LoggingBlockCache.wrap(CacheType.DATA, dataCache);
+    var loggingIndexCache = BlockCacheUtil.instrument(CacheType.INDEX, 
indexCache);
+    var loggingDataCache = BlockCacheUtil.instrument(CacheType.DATA, 
dataCache);
 
     switch (dispatch.getIndexCacheUsage()) {
       case ENABLED:
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 68e2be016d..9e2d4633c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -838,7 +838,6 @@ public class RFile {
     }
 
     private void _next() throws IOException {
-
       if (!hasTop) {
         throw new IllegalStateException();
       }
@@ -1164,6 +1163,12 @@ public class RFile {
     public long estimateOverlappingEntries(KeyExtent extent) throws 
IOException {
       throw new UnsupportedOperationException();
     }
+
+    public void flushStats() {
+      if (currBlock != null) {
+        currBlock.flushStats();
+      }
+    }
   }
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -1304,6 +1309,9 @@ public class RFile {
 
     @Override
     public void closeDeepCopies() throws IOException {
+      for (LocalityGroupReader lgr : currentReaders) {
+        lgr.flushStats();
+      }
       closeDeepCopies(false);
     }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index aca53e495c..31f32d64ae 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -47,6 +47,8 @@ import org.apache.accumulo.core.spi.crypto.FileDecrypter;
 import org.apache.accumulo.core.spi.crypto.FileEncrypter;
 import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
 import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.util.CountingInputStream;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -468,6 +470,7 @@ public final class BCFile {
       private final CompressionAlgorithm compressAlgo;
       private Decompressor decompressor;
       private final BlockRegion region;
+      private final InputStream rawInputStream;
       private final InputStream in;
       private volatile boolean closed;
 
@@ -481,9 +484,14 @@ public final class BCFile {
         BoundedRangeFileInputStream boundedRangeFileInputStream = new 
BoundedRangeFileInputStream(
             fsin, this.region.getOffset(), this.region.getCompressedSize());
 
+        if (ScanInstrumentation.get().enabled()) {
+          rawInputStream = new 
CountingInputStream(boundedRangeFileInputStream);
+        } else {
+          rawInputStream = boundedRangeFileInputStream;
+        }
+
         try {
-          InputStream inputStreamToBeCompressed =
-              decrypter.decryptStream(boundedRangeFileInputStream);
+          InputStream inputStreamToBeCompressed = 
decrypter.decryptStream(rawInputStream);
           this.in = 
compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor,
               getFSInputBufferSize(conf));
         } catch (IOException e) {
@@ -506,11 +514,20 @@ public final class BCFile {
         return region;
       }
 
+      public void flushStats() {
+        if (rawInputStream instanceof CountingInputStream) {
+          var ci = (CountingInputStream) rawInputStream;
+          ScanInstrumentation.get().incrementFileBytesRead(ci.getCount());
+          ci.resetCount();
+        }
+      }
+
       public void finish() throws IOException {
         synchronized (in) {
           if (!closed) {
             try {
               in.close();
+              flushStats();
             } finally {
               closed = true;
               if (decompressor != null) {
@@ -538,6 +555,10 @@ public final class BCFile {
         rBlkState = rbs;
       }
 
+      public void flushStats() {
+        rBlkState.flushStats();
+      }
+
       /**
        * Finishing reading the block. Release all resources.
        */
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
index 47a0881973..a2f8da5975 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
@@ -19,7 +19,10 @@
 package org.apache.accumulo.core.iteratorsImpl.system;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -34,15 +37,21 @@ import 
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 public class StatsIterator extends ServerWrappingIterator {
 
   private int numRead = 0;
-  private final AtomicLong seekCounter;
+  private final AtomicLong scanSeekCounter;
+  private final LongAdder serverSeekCounter;
   private final AtomicLong scanCounter;
+  private final LongAdder tabletScanCounter;
   private final LongAdder serverScanCounter;
+  private final List<StatsIterator> deepCopies = 
Collections.synchronizedList(new ArrayList<>());
 
-  public StatsIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong 
seekCounter,
-      AtomicLong tabletScanCounter, LongAdder serverScanCounter) {
+  public StatsIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong 
scanSeekCounter,
+      LongAdder serverSeekCounter, AtomicLong scanCounter, LongAdder 
tabletScanCounter,
+      LongAdder serverScanCounter) {
     super(source);
-    this.seekCounter = seekCounter;
-    this.scanCounter = tabletScanCounter;
+    this.scanSeekCounter = scanSeekCounter;
+    this.serverSeekCounter = serverSeekCounter;
+    this.scanCounter = scanCounter;
+    this.tabletScanCounter = tabletScanCounter;
     this.serverScanCounter = serverScanCounter;
   }
 
@@ -51,31 +60,43 @@ public class StatsIterator extends ServerWrappingIterator {
     source.next();
     numRead++;
 
-    if (numRead % 23 == 0) {
-      scanCounter.addAndGet(numRead);
-      serverScanCounter.add(numRead);
-      numRead = 0;
+    if (numRead % 1009 == 0) {
+      // only report on self, do not force deep copies to report
+      report(false);
     }
   }
 
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    return new StatsIterator(source.deepCopy(env), seekCounter, scanCounter, 
serverScanCounter);
+    var deepCopy = new StatsIterator(source.deepCopy(env), scanSeekCounter, 
serverSeekCounter,
+        scanCounter, tabletScanCounter, serverScanCounter);
+    deepCopies.add(deepCopy);
+    return deepCopy;
   }
 
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
       throws IOException {
     source.seek(range, columnFamilies, inclusive);
-    seekCounter.incrementAndGet();
-    scanCounter.addAndGet(numRead);
-    serverScanCounter.add(numRead);
-    numRead = 0;
+    serverSeekCounter.increment();
+    scanSeekCounter.incrementAndGet();
+    // only report on self, do not force deep copies to report
+    report(false);
   }
 
-  public void report() {
-    scanCounter.addAndGet(numRead);
-    serverScanCounter.add(numRead);
-    numRead = 0;
+  public void report(boolean reportDeepCopies) {
+    if (numRead > 0) {
+      scanCounter.addAndGet(numRead);
+      tabletScanCounter.add(numRead);
+      serverScanCounter.add(numRead);
+      numRead = 0;
+    }
+
+    if (reportDeepCopies) {
+      // recurse down the fat tree of deep copies forcing them to report
+      for (var deepCopy : deepCopies) {
+        deepCopy.report(true);
+      }
+    }
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java 
b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index cce1044e65..83c0633974 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -32,12 +32,12 @@ import java.util.function.Predicate;
 import org.apache.accumulo.core.client.rfile.RFileSource;
 import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil;
 import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import 
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.logging.LoggingBlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.cache.CacheType;
@@ -194,8 +194,8 @@ public class SummaryReader {
       // the reason BCFile is used instead of RFile is to avoid reading in the 
RFile meta block when
       // only summary data is wanted.
       CompositeCache compositeCache =
-          new CompositeCache(LoggingBlockCache.wrap(CacheType.SUMMARY, 
summaryCache),
-              LoggingBlockCache.wrap(CacheType.INDEX, indexCache));
+          new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, 
summaryCache),
+              BlockCacheUtil.instrument(CacheType.INDEX, indexCache));
       CachableBuilder cb = new CachableBuilder().fsPath(fs, 
file).conf(conf).fileLen(fileLenCache)
           .cacheProvider(new BasicCacheProvider(compositeCache, 
null)).cryptoService(cryptoService);
       bcReader = new CachableBlockFile.Reader(cb);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java 
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java
new file mode 100644
index 0000000000..8ed962c53a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+import com.google.common.base.Preconditions;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * This class helps collect per scan information for the purposes of tracing.
+ */
+public abstract class ScanInstrumentation {
+  private static final ThreadLocal<ScanInstrumentation> INSTRUMENTED_SCANS = 
new ThreadLocal<>();
+
+  private static final ScanInstrumentation NOOP_SI = new ScanInstrumentation() 
{
+    @Override
+    public boolean enabled() {
+      return false;
+    }
+
+    @Override
+    public void incrementFileBytesRead(long amount) {}
+
+    @Override
+    public void incrementUncompressedBytesRead(long amount) {}
+
+    @Override
+    public void incrementCacheMiss(CacheType cacheType) {}
+
+    @Override
+    public void incrementCacheHit(CacheType cacheType) {}
+
+    @Override
+    public void incrementCacheBypass(CacheType cacheType) {}
+
+    @Override
+    public long getFileBytesRead() {
+      return 0;
+    }
+
+    @Override
+    public long getUncompressedBytesRead() {
+      return 0;
+    }
+
+    @Override
+    public int getCacheHits(CacheType cacheType) {
+      return 0;
+    }
+
+    @Override
+    public int getCacheMisses(CacheType cacheType) {
+      return 0;
+    }
+
+    @Override
+    public int getCacheBypasses(CacheType cacheType) {
+      return 0;
+    }
+  };
+
+  public interface ScanScope extends AutoCloseable {
+    @Override
+    void close();
+  }
+
+  public static ScanScope enable(Span span) {
+    if (span.isRecording()) {
+      INSTRUMENTED_SCANS.set(new ScanInstrumentationImpl());
+      var id = Thread.currentThread().getId();
+      return () -> {
+        Preconditions.checkState(id == Thread.currentThread().getId());
+        INSTRUMENTED_SCANS.remove();
+      };
+    } else {
+      return () -> {};
+    }
+  }
+
+  public static ScanInstrumentation get() {
+    var si = INSTRUMENTED_SCANS.get();
+    if (si == null) {
+      return NOOP_SI;
+    }
+    return si;
+  }
+
+  public abstract boolean enabled();
+
+  /**
+   * Increments the raw bytes read directly from DFS by a scan.
+   *
+   * @param amount the amount of bytes read
+   */
+  public abstract void incrementFileBytesRead(long amount);
+
+  /**
+   * Increments the uncompressed and decrypted bytes read by a scan. This will 
include all
+   * uncompressed data read by a scan regardless of if the underlying data 
came from cache or DFS.
+   */
+  public abstract void incrementUncompressedBytesRead(long amount);
+
+  /**
+   * Increments the count of rfile blocks that were not already in the cache.
+   */
+  public abstract void incrementCacheMiss(CacheType cacheType);
+
+  /**
+   * Increments the count of rfile blocks that were already in the cache.
+   */
+  public abstract void incrementCacheHit(CacheType cacheType);
+
+  /**
+   * Increments the count of rfile blocks that were directly read from DFS 
bypassing the cache.
+   */
+  public abstract void incrementCacheBypass(CacheType cacheType);
+
+  public abstract long getFileBytesRead();
+
+  public abstract long getUncompressedBytesRead();
+
+  public abstract int getCacheHits(CacheType cacheType);
+
+  public abstract int getCacheMisses(CacheType cacheType);
+
+  public abstract int getCacheBypasses(CacheType cacheType);
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
new file mode 100644
index 0000000000..67754ee738
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+class ScanInstrumentationImpl extends ScanInstrumentation {
+  private final AtomicLong fileBytesRead = new AtomicLong();
+  private final AtomicLong uncompressedBytesRead = new AtomicLong();
+  private final AtomicInteger[] cacheHits = new 
AtomicInteger[CacheType.values().length];
+  private final AtomicInteger[] cacheMisses = new 
AtomicInteger[CacheType.values().length];
+  private final AtomicInteger[] cacheBypasses = new 
AtomicInteger[CacheType.values().length];
+
+  ScanInstrumentationImpl() {
+    for (int i = 0; i < CacheType.values().length; i++) {
+      cacheHits[i] = new AtomicInteger();
+      cacheMisses[i] = new AtomicInteger();
+      cacheBypasses[i] = new AtomicInteger();
+    }
+  }
+
+  @Override
+  public boolean enabled() {
+    return true;
+  }
+
+  @Override
+  public void incrementFileBytesRead(long amount) {
+    fileBytesRead.addAndGet(amount);
+  }
+
+  @Override
+  public void incrementUncompressedBytesRead(long amount) {
+    uncompressedBytesRead.addAndGet(amount);
+  }
+
+  @Override
+  public void incrementCacheMiss(CacheType cacheType) {
+    cacheMisses[cacheType.ordinal()].incrementAndGet();
+  }
+
+  @Override
+  public void incrementCacheHit(CacheType cacheType) {
+    cacheHits[cacheType.ordinal()].incrementAndGet();
+  }
+
+  @Override
+  public void incrementCacheBypass(CacheType cacheType) {
+    cacheBypasses[cacheType.ordinal()].incrementAndGet();
+  }
+
+  @Override
+  public long getFileBytesRead() {
+    return fileBytesRead.get();
+  }
+
+  @Override
+  public long getUncompressedBytesRead() {
+    return uncompressedBytesRead.get();
+  }
+
+  @Override
+  public int getCacheHits(CacheType cacheType) {
+    return cacheHits[cacheType.ordinal()].get();
+  }
+
+  @Override
+  public int getCacheMisses(CacheType cacheType) {
+    return cacheMisses[cacheType.ordinal()].get();
+  }
+
+  @Override
+  public int getCacheBypasses(CacheType cacheType) {
+    return cacheBypasses[cacheType.ordinal()].get();
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java
new file mode 100644
index 0000000000..e2c5b078b7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+public class TraceAttributes {
+  public static final AttributeKey<Long> ENTRIES_RETURNED_KEY =
+      AttributeKey.longKey("accumulo.scan.entries.returned");
+  public static final AttributeKey<Long> BYTES_RETURNED_KEY =
+      AttributeKey.longKey("accumulo.scan.bytes.returned");
+  public static final AttributeKey<Long> BYTES_READ_KEY =
+      AttributeKey.longKey("accumulo.scan.bytes.read");
+  public static final AttributeKey<Long> BYTES_READ_FILE_KEY =
+      AttributeKey.longKey("accumulo.scan.bytes.read.file");
+  public static final AttributeKey<String> EXECUTOR_KEY =
+      AttributeKey.stringKey("accumulo.scan.executor");
+  public static final AttributeKey<String> TABLE_ID_KEY =
+      AttributeKey.stringKey("accumulo.table.id");
+  public static final AttributeKey<String> EXTENT_KEY = 
AttributeKey.stringKey("accumulo.extent");
+  public static final AttributeKey<Long> INDEX_HITS_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.index.hits");
+  public static final AttributeKey<Long> INDEX_MISSES_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.index.misses");
+  public static final AttributeKey<Long> INDEX_BYPASSES_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.index.bypasses");
+  public static final AttributeKey<Long> DATA_HITS_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.data.hits");
+  public static final AttributeKey<Long> DATA_MISSES_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.data.misses");
+  public static final AttributeKey<Long> DATA_BYPASSES_KEY =
+      AttributeKey.longKey("accumulo.scan.cache.data.bypasses");
+  public static final AttributeKey<String> SERVER_KEY = 
AttributeKey.stringKey("accumulo.server");
+  public static final AttributeKey<Long> ENTRIES_READ_KEY =
+      AttributeKey.longKey("accumulo.scan.entries.read");
+  public static final AttributeKey<Long> SEEKS_KEY = 
AttributeKey.longKey("accumulo.scan.seeks");
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java 
b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java
new file mode 100644
index 0000000000..15d41b18d7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+/**
+ * This class was copied from Guava and modified. If this class was not final 
in Guava it could have
+ * been extended. Guava has issue 590 open about this.
+ *
+ * An {@link InputStream} that counts the number of bytes read.
+ *
+ * @author Chris Nokleberg
+ */
+public final class CountingInputStream extends FilterInputStream {
+
+  private long count;
+  private long mark = -1;
+
+  /**
+   * Wraps another input stream, counting the number of bytes read.
+   *
+   * @param in the input stream to be wrapped
+   */
+  public CountingInputStream(InputStream in) {
+    super(Objects.requireNonNull(in));
+  }
+
+  /** Returns the number of bytes read. */
+  public long getCount() {
+    return count;
+  }
+
+  /** Resets the count of bytes read to zero */
+  public void resetCount() {
+    count = 0;
+  }
+
+  /** Returns the input stream this is wrapping */
+  public InputStream getWrappedStream() {
+    return in;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int result = in.read();
+    if (result != -1) {
+      count++;
+    }
+    return result;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    int result = in.read(b, off, len);
+    if (result != -1) {
+      count += result;
+    }
+    return result;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    long result = in.skip(n);
+    count += result;
+    return result;
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    in.mark(readlimit);
+    mark = count;
+    // it's okay to mark even if mark isn't supported, as reset won't work
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    if (!in.markSupported()) {
+      throw new IOException("Mark not supported");
+    }
+    if (mark == -1) {
+      throw new IOException("Mark not set");
+    }
+
+    in.reset();
+    count = mark;
+  }
+}
diff --git a/pom.xml b/pom.xml
index e7ed0e963f..6e07454736 100644
--- a/pom.xml
+++ b/pom.xml
@@ -345,6 +345,18 @@ under the License.
         <artifactId>commons-validator</artifactId>
         <version>1.10.0</version>
       </dependency>
+      <dependency>
+        <groupId>io.opentelemetry.javaagent</groupId>
+        <artifactId>opentelemetry-javaagent</artifactId>
+        <!-- This version was selected because it aligns with the version of 
open telemetry that Accumulo 2.1 is using. -->
+        <version>2.14.0</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry.proto</groupId>
+        <artifactId>opentelemetry-proto</artifactId>
+        <!-- This version was selected because it aligns with the version of 
protocol buffers that Accumulo 2.1 is using. -->
+        <version>1.3.2-alpha</version>
+      </dependency>
       <dependency>
         <!-- legacy junit version specified here for dependency convergence -->
         <groupId>junit</groupId>
@@ -1020,6 +1032,8 @@ under the License.
                 <unused>org.junit.vintage:junit-vintage-engine:jar:*</unused>
                 <unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused>
                 <unused>org.lz4:lz4-java:jar:*</unused>
+                <!-- This is only used at runtime only by test, not at compile 
time  -->
+                
<unused>io.opentelemetry.javaagent:opentelemetry-javaagent:jar:*</unused>
               </ignoredUnusedDeclaredDependencies>
             </configuration>
           </execution>
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
index 3715eac0c2..8226f20dd3 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.scan.ScanServerInfo;
+import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
@@ -61,4 +62,6 @@ public interface TabletHostingServer {
   GarbageCollectionLogger getGcLogger();
 
   BlockCacheManager.Configuration 
getBlockCacheConfiguration(AccumuloConfiguration acuConf);
+
+  HostAndPort getAdvertiseAddress();
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2573d6b451..25ea527672 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -227,7 +228,7 @@ public class TabletServer extends AbstractServer
   private String lockID;
   private volatile long lockSessionId = -1;
 
-  public static final AtomicLong seekCount = new AtomicLong(0);
+  public static final LongAdder seekCount = new LongAdder();
 
   private final AtomicLong totalMinorCompactions = new AtomicLong(0);
 
@@ -1201,7 +1202,7 @@ public class TabletServer extends AbstractServer
     result.osLoad = 
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
     result.name = getClientAddressString();
     result.holdTime = resourceManager.holdTime();
-    result.lookups = seekCount.get();
+    result.lookups = seekCount.sum();
     result.indexCacheHits = 
resourceManager.getIndexCache().getStats().hitCount();
     result.indexCacheRequest = 
resourceManager.getIndexCache().getStats().requestCount();
     result.dataCacheHits = 
resourceManager.getDataCache().getStats().hitCount();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 6bcfbc4b9e..1e6c408ee5 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -44,6 +44,7 @@ import 
org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.trace.TraceAttributes;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig;
 import org.apache.accumulo.server.fs.FileManager.ScanFileManager;
@@ -57,6 +58,10 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
+import io.opentelemetry.api.trace.Span;
+
 class ScanDataSource implements DataSource {
 
   private static final Logger log = 
LoggerFactory.getLogger(ScanDataSource.class);
@@ -67,7 +72,7 @@ class ScanDataSource implements DataSource {
   private SortedKeyValueIterator<Key,Value> iter;
   private long expectedDeletionCount;
   private List<MemoryIterator> memIters = null;
-  private long fileReservationId;
+  private long fileReservationId = -1;
   private final AtomicBoolean interruptFlag;
   private StatsIterator statsIterator;
 
@@ -76,6 +81,9 @@ class ScanDataSource implements DataSource {
   private final byte[] defaultLabels;
   private final long scanDataSourceId;
 
+  private final AtomicLong scanSeekCounter;
+  private final AtomicLong scanCounter;
+
   ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean 
loadIters,
       AtomicBoolean interruptFlag) {
     this.tablet = tablet;
@@ -85,6 +93,8 @@ class ScanDataSource implements DataSource {
     this.loadIters = loadIters;
     this.defaultLabels = tablet.getDefaultSecurityLabels();
     this.scanDataSourceId = nextSourceId.incrementAndGet();
+    this.scanSeekCounter = new AtomicLong();
+    this.scanCounter = new AtomicLong();
     log.trace("new scan data source, scanId {}, tablet: {}, params: {}, 
loadIterators: {}",
         this.scanDataSourceId, this.tablet, this.scanParams, this.loadIters);
   }
@@ -111,6 +121,9 @@ class ScanDataSource implements DataSource {
         } finally {
           expectedDeletionCount = tablet.getDataSourceDeletions();
           iter = null;
+          if (statsIterator != null) {
+            statsIterator.report(true);
+          }
         }
       }
     }
@@ -174,6 +187,7 @@ class ScanDataSource implements DataSource {
       memIters = tablet.getMemIterators(samplerConfig);
       Pair<Long,Map<TabletFile,DataFileValue>> reservation = 
tablet.reserveFilesForScan();
       fileReservationId = reservation.getFirst();
+      Preconditions.checkState(fileReservationId >= 0);
       files = reservation.getSecond();
     }
 
@@ -200,8 +214,8 @@ class ScanDataSource implements DataSource {
     }
     SystemIteratorEnvironment iterEnv = (SystemIteratorEnvironment) 
builder.build();
 
-    statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, 
tablet.getScannedCounter(),
-        tablet.getScanMetrics().getScannedCounter());
+    statsIterator = new StatsIterator(multiIter, scanSeekCounter, 
TabletServer.seekCount,
+        scanCounter, tablet.getScannedCounter(), 
tablet.getScanMetrics().getScannedCounter());
 
     SortedKeyValueIterator<Key,Value> visFilter =
         SystemIteratorUtil.setupSystemScanIterators(statsIterator, 
scanParams.getColumnSet(),
@@ -257,9 +271,11 @@ class ScanDataSource implements DataSource {
       tablet.returnMemIterators(memIters);
       memIters = null;
       try {
-        log.trace("Returning file iterators for {}, scanId:{}, fid:{}", 
tablet.getExtent(),
-            scanDataSourceId, fileReservationId);
-        tablet.returnFilesForScan(fileReservationId);
+        if (fileReservationId >= 0) {
+          log.trace("Returning file iterators for {}, scanId:{}, fid:{}", 
tablet.getExtent(),
+              scanDataSourceId, fileReservationId);
+          tablet.returnFilesForScan(fileReservationId);
+        }
       } catch (Exception e) {
         log.warn("Error Returning file iterators for scan: {}, :{}", 
scanDataSourceId, e);
         // Continue bubbling the exception up for handling.
@@ -270,8 +286,16 @@ class ScanDataSource implements DataSource {
     }
   }
 
+  private boolean closed = false;
+
   @Override
   public void close(boolean sawErrors) {
+    if (closed) {
+      return;
+    }
+
+    closed = true;
+
     try {
       returnIterators();
     } finally {
@@ -288,12 +312,20 @@ class ScanDataSource implements DataSource {
       } finally {
         fileManager = null;
         if (statsIterator != null) {
-          statsIterator.report();
+          statsIterator.report(true);
         }
       }
     }
   }
 
+  public void setAttributes(Span span) {
+    if (statsIterator != null && span.isRecording()) {
+      statsIterator.report(true);
+      span.setAttribute(TraceAttributes.ENTRIES_READ_KEY, scanCounter.get());
+      span.setAttribute(TraceAttributes.SEEKS_KEY, scanSeekCounter.get());
+    }
+  }
+
   public void interrupt() {
     interruptFlag.set(true);
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 2b89a2005b..9e2a7c7af0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -30,7 +30,11 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import 
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
 import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ShutdownUtil;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
 import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +72,23 @@ public class Scanner {
   }
 
   public ScanBatch read() throws IOException, TabletClosedException {
+    var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch");
+    try (var scope = span.makeCurrent(); var scanScope = 
ScanInstrumentation.enable(span)) {
+      var batchAndSource = readInternal();
+      // This needs to be called after the ScanDataSource was closed inorder 
to make sure all
+      // statistics related to files reads are seen.
+      tablet.recordScanTrace(span, batchAndSource.getFirst().getResults(), 
scanParams,
+          batchAndSource.getSecond());
+      return batchAndSource.getFirst();
+    } catch (IOException | RuntimeException e) {
+      span.recordException(e);
+      throw e;
+    } finally {
+      span.end();
+    }
+  }
+
+  private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException, 
TabletClosedException {
 
     ScanDataSource dataSource = null;
 
@@ -121,13 +142,13 @@ public class Scanner {
 
       if (results.getResults() == null) {
         range = null;
-        return new ScanBatch(new ArrayList<>(), false);
+        return new Pair<>(new ScanBatch(new ArrayList<>(), false), dataSource);
       } else if (results.getContinueKey() == null) {
-        return new ScanBatch(results.getResults(), false);
+        return new Pair<>(new ScanBatch(results.getResults(), false), 
dataSource);
       } else {
         range = new Range(results.getContinueKey(), 
!results.isSkipContinueKey(), range.getEndKey(),
             range.isEndKeyInclusive());
-        return new ScanBatch(results.getResults(), true);
+        return new Pair<>(new ScanBatch(results.getResults(), true), 
dataSource);
       }
 
     } catch (IterationInterruptedException iie) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 90a53c734a..d7ff6a7c8d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1793,7 +1793,7 @@ public class Tablet extends TabletBase {
   }
 
   public long totalScannedCount() {
-    return this.scannedCount.get();
+    return this.scannedCount.sum();
   }
 
   public long totalLookupCount() {
@@ -1806,7 +1806,7 @@ public class Tablet extends TabletBase {
     queryByteRate.update(now, this.queryResultBytes.get());
     ingestRate.update(now, ingestCount);
     ingestByteRate.update(now, ingestBytes);
-    scannedRate.update(now, this.scannedCount.get());
+    scannedRate.update(now, this.scannedCount.sum());
   }
 
   public long getSplitCreationTime() {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index dcc54e9da7..0ac8f6bd14 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -31,6 +31,7 @@ import java.util.SortedMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -48,6 +49,10 @@ import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.spi.cache.CacheType;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.trace.TraceAttributes;
+import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ShutdownUtil;
@@ -62,6 +67,8 @@ import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.opentelemetry.api.trace.Span;
+
 /**
  * This class exists to share code for scanning a tablet between {@link 
Tablet} and
  * {@link SnapshotTablet}
@@ -79,7 +86,7 @@ public abstract class TabletBase {
   protected AtomicLong lookupCount = new AtomicLong(0);
   protected AtomicLong queryResultCount = new AtomicLong(0);
   protected AtomicLong queryResultBytes = new AtomicLong(0);
-  protected final AtomicLong scannedCount = new AtomicLong(0);
+  protected final LongAdder scannedCount = new LongAdder();
 
   protected final Set<ScanDataSource> activeScans = new HashSet<>();
 
@@ -154,7 +161,7 @@ public abstract class TabletBase {
     return new Scanner(this, range, scanParams, interruptFlag);
   }
 
-  public AtomicLong getScannedCounter() {
+  public LongAdder getScannedCounter() {
     return this.scannedCount;
   }
 
@@ -205,25 +212,30 @@ public abstract class TabletBase {
       tabletRange.clip(range);
     }
 
-    SourceSwitchingIterator.DataSource dataSource =
-        createDataSource(scanParams, true, interruptFlag);
+    ScanDataSource dataSource = createDataSource(scanParams, true, 
interruptFlag);
 
     Tablet.LookupResult result = null;
 
     boolean sawException = false;
-    try {
+    var span = TraceUtil.startSpan(TabletBase.class, "multiscan-batch");
+    try (var scope = span.makeCurrent(); var scanScope = 
ScanInstrumentation.enable(span)) {
       SortedKeyValueIterator<Key,Value> iter = new 
SourceSwitchingIterator(dataSource);
       this.lookupCount.incrementAndGet();
       this.server.getScanMetrics().incrementLookupCount(1);
       result = lookup(iter, ranges, results, scanParams, maxResultSize);
+      // must close data source before recording scan trace in order to flush 
all file read stats
+      dataSource.close(false);
+      recordScanTrace(span, results, scanParams, dataSource);
       return result;
     } catch (IOException | RuntimeException e) {
       sawException = true;
+      span.recordException(e);
       throw e;
     } finally {
       // code in finally block because always want
       // to return mapfiles, even when exception is thrown
       dataSource.close(sawException);
+      span.end();
 
       synchronized (this) {
         queryResultCount.addAndGet(results.size());
@@ -236,6 +248,34 @@ public abstract class TabletBase {
     }
   }
 
+  void recordScanTrace(Span span, List<KVEntry> batch, ScanParameters 
scanParameters,
+      ScanDataSource dataSource) {
+    if (span.isRecording()) {
+      span.setAttribute(TraceAttributes.ENTRIES_RETURNED_KEY, batch.size());
+      long bytesReturned = 0;
+      for (var e : batch) {
+        bytesReturned += e.getKey().getLength() + e.getValue().get().length;
+      }
+      span.setAttribute(TraceAttributes.BYTES_RETURNED_KEY, bytesReturned);
+      span.setAttribute(TraceAttributes.EXECUTOR_KEY,
+          scanParameters.getScanDispatch().getExecutorName());
+      span.setAttribute(TraceAttributes.TABLE_ID_KEY, 
getExtent().tableId().canonical());
+      span.setAttribute(TraceAttributes.EXTENT_KEY, getExtent().toString());
+      var si = ScanInstrumentation.get();
+      span.setAttribute(TraceAttributes.BYTES_READ_FILE_KEY, 
si.getFileBytesRead());
+      span.setAttribute(TraceAttributes.BYTES_READ_KEY, 
si.getUncompressedBytesRead());
+      span.setAttribute(TraceAttributes.INDEX_HITS_KEY, 
si.getCacheHits(CacheType.INDEX));
+      span.setAttribute(TraceAttributes.INDEX_MISSES_KEY, 
si.getCacheMisses(CacheType.INDEX));
+      span.setAttribute(TraceAttributes.INDEX_BYPASSES_KEY, 
si.getCacheBypasses(CacheType.INDEX));
+      span.setAttribute(TraceAttributes.DATA_HITS_KEY, 
si.getCacheHits(CacheType.DATA));
+      span.setAttribute(TraceAttributes.DATA_MISSES_KEY, 
si.getCacheMisses(CacheType.DATA));
+      span.setAttribute(TraceAttributes.DATA_BYPASSES_KEY, 
si.getCacheBypasses(CacheType.DATA));
+      span.setAttribute(TraceAttributes.SERVER_KEY, 
server.getAdvertiseAddress().toString());
+
+      dataSource.setAttributes(span);
+    }
+  }
+
   Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, 
ScanParameters scanParams)
       throws IOException {
 
diff --git a/test/pom.xml b/test/pom.xml
index c58e377869..4430f6d929 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -62,6 +62,10 @@
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -82,6 +86,14 @@
       <groupId>io.opentelemetry</groupId>
       <artifactId>opentelemetry-context</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.opentelemetry.proto</groupId>
+      <artifactId>opentelemetry-proto</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>jakarta.servlet</groupId>
+      <artifactId>jakarta.servlet-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-compaction-coordinator</artifactId>
@@ -194,6 +206,10 @@
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.jline</groupId>
       <artifactId>jline</artifactId>
@@ -214,6 +230,11 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.opentelemetry.javaagent</groupId>
+      <artifactId>opentelemetry-javaagent</artifactId>
+      <scope>runtime</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client-runtime</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java 
b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index f20cf31afe..a6f85f9aaf 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -246,6 +247,10 @@ public class TestIngest {
     }
   }
 
+  public static IteratorSetting.Column generateColumn(IngestParams params, int 
column) {
+    return new IteratorSetting.Column(new Text(params.columnFamily), 
generateQualifier(column));
+  }
+
   public static void main(String[] args) throws Exception {
 
     Opts opts = new Opts();
@@ -299,7 +304,7 @@ public class TestIngest {
       Mutation m = new Mutation(row);
       for (int j = 0; j < params.cols; j++) {
         Text colf = new Text(params.columnFamily);
-        Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, 
COL_PREFIX));
+        Text colq = generateQualifier(j);
 
         if (writer != null) {
           Key key = new Key(row, colf, colq, labBA);
@@ -405,6 +410,11 @@ public class TestIngest {
         elapsed);
   }
 
+  private static Text generateQualifier(int j) {
+    Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
+    return colq;
+  }
+
   public static void ingest(AccumuloClient c, IngestParams params)
       throws MutationsRejectedException, IOException, AccumuloException, 
AccumuloSecurityException,
       TableNotFoundException, TableExistsException {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java 
b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java
new file mode 100644
index 0000000000..2196302fea
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Range;
+
+import com.google.gson.FormattingStyle;
+import com.google.gson.GsonBuilder;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+
+public class ScanTraceClient {
+
+  public static class Options {
+    String clientPropsPath;
+    String table;
+    String startRow;
+    String endRow;
+    String family;
+    String qualifier;
+
+    Options() {}
+
+    Options(String table) {
+      this.table = table;
+    }
+
+    void conigureScanner(Scanner scanner) {
+      if (startRow != null || endRow != null) {
+        scanner.setRange(new Range(startRow, true, endRow, false));
+      }
+      setColumn(scanner);
+    }
+
+    void conigureScanner(BatchScanner scanner) {
+      if (startRow != null || endRow != null) {
+        scanner.setRanges(List.of(new Range(startRow, true, endRow, false)));
+      } else {
+        scanner.setRanges(List.of(new Range()));
+      }
+      setColumn(scanner);
+    }
+
+    void setColumn(ScannerBase scanner) {
+      System.out.println(scanner.getClass().getName() + " fam " + family);
+      if (family != null) {
+        scanner.fetchColumn(family, qualifier);
+      }
+    }
+
+  }
+
+  public static class Results {
+    // trace id for the batch scan
+    String traceId1;
+    // trace id for the normal scan
+    String traceId2;
+    // The number of entries returned by both scans
+    long scanCount;
+    // The number of bytes returned by both scans
+    long scanSize;
+
+    @Override
+    public String toString() {
+      return "Results{scanCount=" + scanCount + ", traceId1='" + traceId1 + 
"', traceId2='"
+          + traceId2 + "', scanSize=" + scanSize + '}';
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    Options opts = new GsonBuilder().create().fromJson(args[0], Options.class);
+
+    String clientPropsPath = opts.clientPropsPath;
+    String table = opts.table;
+
+    Tracer tracer = 
GlobalOpenTelemetry.get().getTracer(ScanTraceClient.class.getName());
+    try (var client = Accumulo.newClient().from(clientPropsPath).build()) {
+      long scanCount = 0;
+      long scanSize = 0;
+      long batchScancount = 0;
+      long batchScanSize = 0;
+
+      Span span = tracer.spanBuilder("batch-scan").startSpan();
+      try (var scanner = client.createBatchScanner(table); var scope = 
span.makeCurrent()) {
+        opts.conigureScanner(scanner);
+        for (var entry : scanner) {
+          batchScancount++;
+          batchScanSize += entry.getKey().getSize() + 
entry.getValue().getSize();
+        }
+      } finally {
+        span.end();
+      }
+      var traceId1 = span.getSpanContext().getTraceId();
+
+      // start a second trace
+      span = tracer.spanBuilder("seq-scan").startSpan();
+      try (var scanner = client.createScanner(table); var scope = 
span.makeCurrent()) {
+        opts.conigureScanner(scanner);
+        scanner.setBatchSize(10_000);
+        for (var entry : scanner) {
+          scanCount++;
+          scanSize += entry.getKey().getSize() + entry.getValue().getSize();
+        }
+      } finally {
+        span.end();
+      }
+      var traceId2 = span.getSpanContext().getTraceId();
+
+      assertEquals(scanCount, batchScancount);
+      assertEquals(scanSize, batchScanSize);
+      assertNotEquals(traceId1, traceId2);
+
+      Results results = new Results();
+      results.traceId1 = traceId1;
+      results.traceId2 = traceId2;
+      results.scanCount = scanCount;
+      results.scanSize = scanSize;
+
+      var gson = new 
GsonBuilder().setFormattingStyle(FormattingStyle.COMPACT).create();
+      System.out.println("RESULT:" + gson.toJson(results));
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java 
b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java
new file mode 100644
index 0000000000..8d0fe70bba
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import static org.apache.accumulo.core.trace.TraceAttributes.EXECUTOR_KEY;
+import static org.apache.accumulo.core.trace.TraceAttributes.EXTENT_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.trace.TraceAttributes;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+
+class ScanTracingIT extends ConfigurableMacBase {
+
+  private static final int OTLP_PORT = PortUtils.getRandomFreePort();
+
+  private static List<String> getJvmArgs() {
+    String javaAgent = null;
+    for (var cpi : System.getProperty("java.class.path").split(":")) {
+      if (cpi.contains("opentelemetry-javaagent")) {
+        javaAgent = cpi;
+      }
+    }
+
+    Objects.requireNonNull(javaAgent);
+
+    return List.of("-Dotel.traces.exporter=otlp", 
"-Dotel.exporter.otlp.protocol=http/protobuf",
+        "-Dotel.exporter.otlp.endpoint=http://localhost:"; + OTLP_PORT,
+        "-Dotel.metrics.exporter=none", "-Dotel.logs.exporter=none", 
"-javaagent:" + javaAgent);
+  }
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    getJvmArgs().forEach(cfg::addJvmOption);
+    // sized such that full table scans will not fit in the cache
+    cfg.setProperty(Property.TSERV_DATACACHE_SIZE.getKey(), "8M");
+    cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + 
"pool1.threads", "8");
+  }
+
+  private TraceCollector collector;
+
+  @BeforeEach
+  public void startCollector() throws Exception {
+    collector = new TraceCollector("localhost", OTLP_PORT);
+  }
+
+  @AfterEach
+  public void stopCollector() throws Exception {
+    collector.stop();
+  }
+
+  @Test
+  public void test() throws Exception {
+    var names = getUniqueNames(7);
+    runTest(names[0], 0, false, false, -1, -1, -1);
+    runTest(names[1], 10, false, false, -1, -1, -1);
+    runTest(names[2], 0, true, false, -1, -1, -1);
+    runTest(names[3], 0, false, false, -1, -1, 2);
+    runTest(names[4], 0, false, false, 32, 256, -1);
+    runTest(names[5], 0, true, true, 32, 256, -1);
+    runTest(names[6], 0, true, false, -1, -1, 2);
+  }
+
+  private void runTest(String tableName, int numSplits, boolean cacheData,
+      boolean secondScanFitsInCache, int startRow, int endRow, int column) 
throws Exception {
+
+    var ingestParams = new TestIngest.IngestParams(getClientProperties(), 
tableName);
+    ingestParams.createTable = false;
+    ingestParams.rows = 1000;
+    ingestParams.cols = 10;
+
+    try (var client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      var ntc = new NewTableConfiguration();
+      if (numSplits > 0) {
+        var splits = TestIngest.getSplitPoints(0, 1000, numSplits);
+        ntc.withSplits(splits);
+      }
+
+      var props = new HashMap<String,String>();
+
+      if (cacheData) {
+        props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+      }
+
+      // use a different executor for batch scans
+      props.put("table.scan.dispatcher.opts.multi_executor", "pool1");
+
+      ntc.setProperties(props);
+
+      client.tableOperations().create(tableName, ntc);
+
+      TestIngest.ingest(client, ingestParams);
+      client.tableOperations().flush(tableName, null, null, true);
+    }
+
+    long expectedRows = ingestParams.rows;
+
+    var options = new ScanTraceClient.Options(tableName);
+    if (startRow != -1 && endRow != -1) {
+      options.startRow = TestIngest.generateRow(startRow, 0).toString();
+      options.endRow = TestIngest.generateRow(endRow, 0).toString();
+      expectedRows = IntStream.range(startRow, endRow).count();
+    }
+
+    int expectedColumns = ingestParams.cols;
+
+    if (column != -1) {
+      var col = TestIngest.generateColumn(ingestParams, column);
+      options.family = col.getColumnFamily().toString();
+      options.qualifier = col.getColumnQualifier().toString();
+      expectedColumns = 1;
+    }
+
+    var results = run(options);
+    System.out.println(results);
+
+    var tableId = getServerContext().getTableId(tableName).canonical();
+
+    var expectedServers = 
getServerContext().getAmple().readTablets().forTable(TableId.of(tableId))
+        .fetch(TabletMetadata.ColumnType.LOCATION).build().stream()
+        .map(tm -> 
tm.getLocation().getHostAndPort().toString()).collect(Collectors.toSet());
+
+    ScanTraceStats scanStats = new ScanTraceStats(false);
+    ScanTraceStats batchScanStats = new ScanTraceStats(true);
+    Set<String> extents1 = new TreeSet<>();
+    Set<String> extents2 = new TreeSet<>();
+
+    while (scanStats.getEntriesReturned() < expectedRows * expectedColumns
+        || batchScanStats.getEntriesReturned() < expectedRows * 
expectedColumns) {
+      var span = collector.take();
+      var stats = ScanTraceStats.create(span);
+      if (stats != null
+          && 
span.stringAttributes.get(TraceAttributes.TABLE_ID_KEY.getKey()).equals(tableId)
+          && (results.traceId1.equals(span.traceId) || 
results.traceId2.equals(span.traceId))) {
+        assertTrue(
+            expectedServers
+                
.contains(span.stringAttributes.get(TraceAttributes.SERVER_KEY.getKey())),
+            () -> expectedServers + " " + span);
+        if (stats.isBatchScan()) {
+          assertEquals("pool1", 
span.stringAttributes.get(EXECUTOR_KEY.getKey()));
+        } else {
+          assertEquals("default", 
span.stringAttributes.get(EXECUTOR_KEY.getKey()));
+        }
+        if (numSplits == 0) {
+          assertEquals(tableId + "<<", 
span.stringAttributes.get(EXTENT_KEY.getKey()));
+        } else {
+          var extent = span.stringAttributes.get(EXTENT_KEY.getKey());
+          assertTrue(extent.startsWith(tableId + ";") || 
extent.startsWith(tableId + "<"));
+        }
+        assertEquals(1, stats.getSeeks());
+        if (stats.isBatchScan()) {
+          assertEquals(results.traceId1, span.traceId);
+          extents1.add(span.stringAttributes.get(EXTENT_KEY.getKey()));
+        } else {
+          assertEquals(results.traceId2, span.traceId);
+          extents2.add(span.stringAttributes.get(EXTENT_KEY.getKey()));
+        }
+      } else {
+        continue;
+      }
+
+      if (stats.isBatchScan()) {
+        batchScanStats.merge(stats);
+      } else {
+        scanStats.merge(stats);
+      }
+    }
+
+    if (numSplits > 0) {
+      assertEquals(numSplits, extents1.size());
+      assertEquals(numSplits, extents2.size());
+    }
+
+    System.out.println(scanStats);
+    System.out.println(batchScanStats);
+
+    assertEquals(expectedRows * expectedColumns, results.scanCount, 
results::toString);
+
+    var statsList = List.of(batchScanStats, scanStats);
+    for (int i = 0; i < statsList.size(); i++) {
+      var stats = statsList.get(i);
+      assertEquals(expectedRows * 10, stats.getEntriesRead(), stats::toString);
+      assertEquals(results.scanCount, stats.getEntriesReturned(), 
stats::toString);
+      // When filtering on columns will read more data than we return
+      double colMultiplier = 10.0 / expectedColumns;
+      assertClose((long) (results.scanSize * colMultiplier), 
stats.getBytesRead(), .05);
+      assertClose(results.scanSize, stats.getBytesReturned(), .05);
+      if (secondScanFitsInCache && i == 1) {
+        assertEquals(0, stats.getFileBytesRead(), stats::toString);
+      } else {
+        assertClose((long) (stats.getBytesRead() * .005), 
stats.getFileBytesRead(), .2);
+      }
+      if (cacheData) {
+        assertEquals(0, stats.getDataCacheBypasses(), stats::toString);
+        assertTrue(stats.getDataCacheHits() + stats.getDataCacheMisses() > 0, 
stats::toString);
+        if (stats.getFileBytesRead() == 0) {
+          assertEquals(0L, stats.getDataCacheMisses(), stats::toString);
+        }
+        // When caching data, does not seem to hit the cache much
+        var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses();
+        assertTrue(cacheSum == 0 || cacheSum == 1, stats::toString);
+      } else {
+        assertEquals(0, stats.getDataCacheHits(), stats::toString);
+        assertEquals(0, stats.getDataCacheMisses(), stats::toString);
+        assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(), 
stats::toString);
+        assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(), 
.05);
+      }
+      assertEquals(0, stats.getIndexCacheBypasses(), stats::toString);
+    }
+
+  }
+
+  public void assertClose(long expected, long value, double e) {
+    assertTrue(Math.abs(1 - (double) expected / (double) value) < e,
+        () -> expected + " " + value + " " + e);
+  }
+
+  /**
+   * Runs ScanTraceClient in an external process so it can be instrumented 
with the open telemetry
+   * java agent. Use json to get data to/from external process.
+   */
+  public ScanTraceClient.Results run(ScanTraceClient.Options opts)
+      throws IOException, InterruptedException {
+    opts.clientPropsPath = getCluster().getClientPropsPath();
+    var proc = getCluster().exec(ScanTraceClient.class, getJvmArgs(), new 
Gson().toJson(opts));
+    assertEquals(0, proc.getProcess().waitFor());
+    var out = proc.readStdOut();
+    var result = Arrays.stream(out.split("\\n")).filter(line -> 
line.startsWith("RESULT:"))
+        .findFirst().orElse("RESULT:{}");
+    result = result.substring("RESULT:".length());
+    return new Gson().fromJson(result, ScanTraceClient.Results.class);
+  }
+
+  /**
+   * Helper class that encapsulates data from a scan trace making it easier to 
access and
+   * centralizing the code for accessing data from a span.
+   */
+  static class ScanTraceStats {
+    final Map<String,Long> scanStats;
+    final boolean isBatchScan;
+
+    ScanTraceStats(SpanData spanData) {
+      this.scanStats = spanData.integerAttributes;
+      this.isBatchScan = spanData.name.contains("multiscan-batch");
+    }
+
+    ScanTraceStats(boolean isBatchScan) {
+      scanStats = new TreeMap<>();
+      this.isBatchScan = isBatchScan;
+    }
+
+    void merge(ScanTraceStats other) {
+      Preconditions.checkArgument(isBatchScan == other.isBatchScan);
+      other.scanStats.forEach((k, v) -> {
+        scanStats.merge(k, v, Long::sum);
+      });
+    }
+
+    /**
+     * @return a ScanTrace if span is from a scan batch, otherwise return null
+     */
+    static ScanTraceStats create(SpanData data) {
+      if (data.name.contains("scan-batch")) {
+        return new ScanTraceStats(data);
+      }
+      return null;
+    }
+
+    boolean isBatchScan() {
+      return isBatchScan;
+    }
+
+    long getEntriesRead() {
+      return scanStats.getOrDefault(TraceAttributes.ENTRIES_READ_KEY.getKey(), 
0L);
+    }
+
+    long getEntriesReturned() {
+      return 
scanStats.getOrDefault(TraceAttributes.ENTRIES_RETURNED_KEY.getKey(), 0L);
+    }
+
+    long getFileBytesRead() {
+      return 
scanStats.getOrDefault(TraceAttributes.BYTES_READ_FILE_KEY.getKey(), 0L);
+    }
+
+    long getBytesRead() {
+      return scanStats.getOrDefault(TraceAttributes.BYTES_READ_KEY.getKey(), 
0L);
+    }
+
+    long getBytesReturned() {
+      return 
scanStats.getOrDefault(TraceAttributes.BYTES_RETURNED_KEY.getKey(), 0L);
+    }
+
+    long getDataCacheHits() {
+      return scanStats.getOrDefault(TraceAttributes.DATA_HITS_KEY.getKey(), 
0L);
+    }
+
+    long getDataCacheMisses() {
+      return scanStats.getOrDefault(TraceAttributes.DATA_MISSES_KEY.getKey(), 
0L);
+    }
+
+    long getDataCacheBypasses() {
+      return 
scanStats.getOrDefault(TraceAttributes.DATA_BYPASSES_KEY.getKey(), 0L);
+    }
+
+    long getIndexCacheHits() {
+      return scanStats.getOrDefault(TraceAttributes.INDEX_HITS_KEY.getKey(), 
0L);
+    }
+
+    long getIndexCacheMisses() {
+      return scanStats.getOrDefault(TraceAttributes.INDEX_MISSES_KEY.getKey(), 
0L);
+    }
+
+    long getIndexCacheBypasses() {
+      return 
scanStats.getOrDefault(TraceAttributes.INDEX_BYPASSES_KEY.getKey(), 0L);
+    }
+
+    long getSeeks() {
+      return scanStats.getOrDefault(TraceAttributes.SEEKS_KEY.getKey(), 0L);
+    }
+
+    @Override
+    public String toString() {
+      return "ScanTraceStats{isBatchScan=" + isBatchScan + ", scanStats=" + 
scanStats + '}';
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java 
b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java
new file mode 100644
index 0000000000..c84845275a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import java.util.Map;
+
+public class SpanData {
+
+  public final String traceId;
+  public final String name;
+  public final Map<String,String> stringAttributes;
+  public final Map<String,Long> integerAttributes;
+
+  public SpanData(String traceId, String name, Map<String,String> 
stringAttributes,
+      Map<String,Long> integerAttributes) {
+    this.traceId = traceId;
+    this.name = name;
+    this.stringAttributes = stringAttributes;
+    this.integerAttributes = integerAttributes;
+  }
+
+  @Override
+  public String toString() {
+    return "SpanData{traceId='" + traceId + "', name='" + name + "', 
stringAttributes="
+        + stringAttributes + ", integerAttributes=" + integerAttributes + '}';
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java 
b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java
new file mode 100644
index 0000000000..9a3b6c9701
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+import org.apache.commons.codec.binary.Hex;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * Open telemetry tracing data sink for testing. Processes can send 
http/protobuf trace data to this
+ * sink over http, and it will add them to an in memory queue that tests can 
read from.
+ */
+public class TraceCollector {
+  private final Server server;
+
+  private final LinkedBlockingQueue<SpanData> spanQueue = new 
LinkedBlockingQueue<>();
+
+  private class TraceHandler extends AbstractHandler {
+    @Override
+    public void handle(String target, Request baseRequest, HttpServletRequest 
request,
+        HttpServletResponse response) throws IOException {
+
+      if (!target.equals("/v1/traces")) {
+        System.err.println("unexpected target : " + target);
+        response.setStatus(404);
+        response.getOutputStream().close();
+        return;
+      }
+
+      var body = request.getInputStream().readAllBytes();
+      try {
+        var etsr =
+            
io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest.parseFrom(body);
+        var spans =
+            etsr.getResourceSpansList().stream().flatMap(r -> 
r.getScopeSpansList().stream())
+                .flatMap(r -> 
r.getSpansList().stream()).collect(Collectors.toList());
+
+        spans.forEach(s -> {
+          var traceId = Hex.encodeHexString(s.getTraceId().toByteArray(), 
true);
+
+          Map<String,String> stringAttrs = new HashMap<>();
+          Map<String,Long> intAttrs = new HashMap<>();
+
+          s.getAttributesList().forEach(kv -> {
+            if (kv.getValue().hasIntValue()) {
+              intAttrs.put(kv.getKey(), kv.getValue().getIntValue());
+            } else if (kv.getValue().hasStringValue()) {
+              stringAttrs.put(kv.getKey(), kv.getValue().getStringValue());
+            }
+          });
+
+          spanQueue.add(
+              new SpanData(traceId, s.getName(), Map.copyOf(stringAttrs), 
Map.copyOf(intAttrs)));
+        });
+
+      } catch (Throwable e) {
+        e.printStackTrace();
+        throw e;
+      }
+
+      response.setStatus(200);
+      response.getOutputStream().close();
+    }
+  }
+
+  TraceCollector(String host, int port) throws Exception {
+    server = new Server(new InetSocketAddress(host, port));
+    server.setHandler(new TraceHandler());
+    server.start();
+  }
+
+  SpanData take() throws InterruptedException {
+    return spanQueue.take();
+  }
+
+  void stop() throws Exception {
+    server.stop();
+  }
+}

Reply via email to