This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f844dc0724 adds per scan tracing statistics (#6010)
f844dc0724 is described below
commit f844dc0724300fa90618a0de9366b911a8ffa4c8
Author: Keith Turner <[email protected]>
AuthorDate: Tue Dec 16 11:53:40 2025 -0800
adds per scan tracing statistics (#6010)
Added per scan tracing statistics for the following :
* The count of compressed bytes read from DFS (when all data is read
from cache this will be zero).
* The count of uncompressed bytes read from DFS or cache
* The count of bytes returned (after iterators filter)
* The count of key/values read before any filtering
* The count of key/values returned after filtering
* The count of seeks done
* Statistics for cache hits, missed, and bypasses. A bypass
is when a rfile block is read w/o using the cache.
The statistics are included in a tracing span that wraps reading each
each batch of key values in tablet or scan server. So if a scan reads
10 batches of key values, then 10 spans will be emitted for tracing
data. Each span will included the statistics for that batch.
Co-authored-by: Dave Marion <[email protected]>
---
core/pom.xml | 1 +
.../core/file/blockfile/cache/BlockCacheUtil.java | 38 +++
.../blockfile/cache/InstrumentedBlockCache.java | 107 ++++++
.../file/blockfile/impl/CachableBlockFile.java | 45 ++-
.../file/blockfile/impl/ScanCacheProvider.java | 6 +-
.../org/apache/accumulo/core/file/rfile/RFile.java | 10 +-
.../accumulo/core/file/rfile/bcfile/BCFile.java | 25 +-
.../core/iteratorsImpl/system/StatsIterator.java | 57 +++-
.../accumulo/core/summary/SummaryReader.java | 6 +-
.../accumulo/core/trace/ScanInstrumentation.java | 145 ++++++++
.../core/trace/ScanInstrumentationImpl.java | 96 ++++++
.../accumulo/core/trace/TraceAttributes.java | 53 +++
.../accumulo/core/util/CountingInputStream.java | 102 ++++++
pom.xml | 14 +
.../accumulo/tserver/TabletHostingServer.java | 3 +
.../org/apache/accumulo/tserver/TabletServer.java | 5 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 46 ++-
.../apache/accumulo/tserver/tablet/Scanner.java | 27 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 4 +-
.../apache/accumulo/tserver/tablet/TabletBase.java | 50 ++-
test/pom.xml | 21 ++
.../java/org/apache/accumulo/test/TestIngest.java | 12 +-
.../accumulo/test/tracing/ScanTraceClient.java | 152 +++++++++
.../accumulo/test/tracing/ScanTracingIT.java | 369 +++++++++++++++++++++
.../org/apache/accumulo/test/tracing/SpanData.java | 43 +++
.../accumulo/test/tracing/TraceCollector.java | 106 ++++++
26 files changed, 1494 insertions(+), 49 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 9710b0afc0..78945cc5dd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -207,6 +207,7 @@
<inlineHeader>${accumulo.build.license.header}</inlineHeader>
<excludes>
<exclude>src/main/java/org/apache/accumulo/core/bloomfilter/*.java</exclude>
+
<exclude>src/main/java/org/apache/accumulo/core/util/CountingInputStream.java</exclude>
<exclude>src/main/java/org/apache/accumulo/core/util/HostAndPort.java</exclude>
<exclude>src/test/resources/*.jceks</exclude>
<exclude>src/test/resources/org/apache/accumulo/core/file/rfile/*.rf</exclude>
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
new file mode 100644
index 0000000000..53c2ce9e7a
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import org.apache.accumulo.core.logging.LoggingBlockCache;
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+public class BlockCacheUtil {
+ public static BlockCache instrument(CacheType type, BlockCache cache) {
+ if (cache == null) {
+ return null;
+ }
+
+ if (cache instanceof InstrumentedBlockCache || cache instanceof
LoggingBlockCache) {
+ // its already instrumented
+ return cache;
+ }
+
+ return LoggingBlockCache.wrap(type, InstrumentedBlockCache.wrap(type,
cache));
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
new file mode 100644
index 0000000000..402136c208
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.cache.CacheEntry;
+import org.apache.accumulo.core.spi.cache.CacheType;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+
+public class InstrumentedBlockCache implements BlockCache {
+
+ private final BlockCache blockCache;
+ private final ScanInstrumentation scanInstrumentation;
+ private final CacheType cacheType;
+
+ public InstrumentedBlockCache(CacheType cacheType, BlockCache blockCache,
+ ScanInstrumentation scanInstrumentation) {
+ this.blockCache = blockCache;
+ this.scanInstrumentation = scanInstrumentation;
+ this.cacheType = cacheType;
+ }
+
+ @Override
+ public CacheEntry cacheBlock(String blockName, byte[] buf) {
+ return blockCache.cacheBlock(blockName, buf);
+ }
+
+ @Override
+ public CacheEntry getBlock(String blockName) {
+ return blockCache.getBlock(blockName);
+ }
+
+ private final class CountingLoader implements Loader {
+
+ private final Loader loader;
+ int loadCount = 0;
+
+ private CountingLoader(Loader loader) {
+ this.loader = loader;
+ }
+
+ @Override
+ public Map<String,Loader> getDependencies() {
+ return loader.getDependencies();
+ }
+
+ @Override
+ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+ loadCount++;
+ return loader.load(maxSize, dependencies);
+ }
+ }
+
+ @Override
+ public CacheEntry getBlock(String blockName, Loader loader) {
+ var cl = new CountingLoader(loader);
+ var ce = blockCache.getBlock(blockName, cl);
+ if (cl.loadCount == 0 && ce != null) {
+ scanInstrumentation.incrementCacheHit(cacheType);
+ } else {
+ scanInstrumentation.incrementCacheMiss(cacheType);
+ }
+ return ce;
+ }
+
+ @Override
+ public long getMaxHeapSize() {
+ return blockCache.getMaxHeapSize();
+ }
+
+ @Override
+ public long getMaxSize() {
+ return blockCache.getMaxSize();
+ }
+
+ @Override
+ public Stats getStats() {
+ return blockCache.getStats();
+ }
+
+ public static BlockCache wrap(CacheType cacheType, BlockCache cache) {
+ var si = ScanInstrumentation.get();
+ if (cache != null && si.enabled()) {
+ return new InstrumentedBlockCache(cacheType, cache, si);
+ } else {
+ return cache;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 854c74154b..f694740d96 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -38,7 +38,10 @@ import
org.apache.accumulo.core.file.streams.RateLimitedInputStream;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
import org.apache.accumulo.core.spi.cache.CacheEntry;
+import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.util.CountingInputStream;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -405,6 +408,7 @@ public class CachableBlockFile {
}
BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
+ incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}
@@ -421,6 +425,7 @@ public class CachableBlockFile {
}
BlockReader _currBlock = getBCFile(null).getDataBlock(offset,
compressedSize, rawSize);
+ incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}
@@ -443,6 +448,7 @@ public class CachableBlockFile {
}
BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
+ incrementCacheBypass(CacheType.DATA);
return new CachedBlockRead(_currBlock);
}
@@ -459,9 +465,14 @@ public class CachableBlockFile {
}
BlockReader _currBlock = getBCFile().getDataBlock(offset,
compressedSize, rawSize);
+ incrementCacheBypass(CacheType.DATA);
return new CachedBlockRead(_currBlock);
}
+ private void incrementCacheBypass(CacheType cacheType) {
+ ScanInstrumentation.get().incrementCacheBypass(cacheType);
+ }
+
@Override
public synchronized void close() throws IOException {
if (closed) {
@@ -491,12 +502,22 @@ public class CachableBlockFile {
}
public static class CachedBlockRead extends DataInputStream {
+
+ private static InputStream wrapForTrace(InputStream inputStream) {
+ var scanInstrumentation = ScanInstrumentation.get();
+ if (scanInstrumentation.enabled()) {
+ return new CountingInputStream(inputStream);
+ } else {
+ return inputStream;
+ }
+ }
+
private final SeekableByteArrayInputStream seekableInput;
private final CacheEntry cb;
boolean indexable;
public CachedBlockRead(InputStream in) {
- super(in);
+ super(wrapForTrace(in));
cb = null;
seekableInput = null;
indexable = false;
@@ -507,7 +528,7 @@ public class CachableBlockFile {
}
private CachedBlockRead(SeekableByteArrayInputStream seekableInput,
CacheEntry cb) {
- super(seekableInput);
+ super(wrapForTrace(seekableInput));
this.seekableInput = seekableInput;
this.cb = cb;
indexable = true;
@@ -536,5 +557,25 @@ public class CachableBlockFile {
public void indexWeightChanged() {
cb.indexWeightChanged();
}
+
+ public void flushStats() {
+ if (in instanceof CountingInputStream) {
+ var cin = ((CountingInputStream) in);
+
ScanInstrumentation.get().incrementUncompressedBytesRead(cin.getCount());
+ cin.resetCount();
+ var src = cin.getWrappedStream();
+ if (src instanceof BlockReader) {
+ var br = (BlockReader) src;
+ br.flushStats();
+ }
+
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushStats();
+ super.close();
+ }
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
index 78b8600b37..155b874326 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.core.file.blockfile.impl;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.logging.LoggingBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.scan.ScanDispatch;
@@ -33,8 +33,8 @@ public class ScanCacheProvider implements CacheProvider {
public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDispatch
dispatch,
BlockCache indexCache, BlockCache dataCache) {
- var loggingIndexCache = LoggingBlockCache.wrap(CacheType.INDEX,
indexCache);
- var loggingDataCache = LoggingBlockCache.wrap(CacheType.DATA, dataCache);
+ var loggingIndexCache = BlockCacheUtil.instrument(CacheType.INDEX,
indexCache);
+ var loggingDataCache = BlockCacheUtil.instrument(CacheType.DATA,
dataCache);
switch (dispatch.getIndexCacheUsage()) {
case ENABLED:
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 68e2be016d..9e2d4633c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -838,7 +838,6 @@ public class RFile {
}
private void _next() throws IOException {
-
if (!hasTop) {
throw new IllegalStateException();
}
@@ -1164,6 +1163,12 @@ public class RFile {
public long estimateOverlappingEntries(KeyExtent extent) throws
IOException {
throw new UnsupportedOperationException();
}
+
+ public void flushStats() {
+ if (currBlock != null) {
+ currBlock.flushStats();
+ }
+ }
}
public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -1304,6 +1309,9 @@ public class RFile {
@Override
public void closeDeepCopies() throws IOException {
+ for (LocalityGroupReader lgr : currentReaders) {
+ lgr.flushStats();
+ }
closeDeepCopies(false);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index aca53e495c..31f32d64ae 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -47,6 +47,8 @@ import org.apache.accumulo.core.spi.crypto.FileDecrypter;
import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.util.CountingInputStream;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -468,6 +470,7 @@ public final class BCFile {
private final CompressionAlgorithm compressAlgo;
private Decompressor decompressor;
private final BlockRegion region;
+ private final InputStream rawInputStream;
private final InputStream in;
private volatile boolean closed;
@@ -481,9 +484,14 @@ public final class BCFile {
BoundedRangeFileInputStream boundedRangeFileInputStream = new
BoundedRangeFileInputStream(
fsin, this.region.getOffset(), this.region.getCompressedSize());
+ if (ScanInstrumentation.get().enabled()) {
+ rawInputStream = new
CountingInputStream(boundedRangeFileInputStream);
+ } else {
+ rawInputStream = boundedRangeFileInputStream;
+ }
+
try {
- InputStream inputStreamToBeCompressed =
- decrypter.decryptStream(boundedRangeFileInputStream);
+ InputStream inputStreamToBeCompressed =
decrypter.decryptStream(rawInputStream);
this.in =
compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor,
getFSInputBufferSize(conf));
} catch (IOException e) {
@@ -506,11 +514,20 @@ public final class BCFile {
return region;
}
+ public void flushStats() {
+ if (rawInputStream instanceof CountingInputStream) {
+ var ci = (CountingInputStream) rawInputStream;
+ ScanInstrumentation.get().incrementFileBytesRead(ci.getCount());
+ ci.resetCount();
+ }
+ }
+
public void finish() throws IOException {
synchronized (in) {
if (!closed) {
try {
in.close();
+ flushStats();
} finally {
closed = true;
if (decompressor != null) {
@@ -538,6 +555,10 @@ public final class BCFile {
rBlkState = rbs;
}
+ public void flushStats() {
+ rBlkState.flushStats();
+ }
+
/**
* Finishing reading the block. Release all resources.
*/
diff --git
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
index 47a0881973..a2f8da5975 100644
---
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java
@@ -19,7 +19,10 @@
package org.apache.accumulo.core.iteratorsImpl.system;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@@ -34,15 +37,21 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
public class StatsIterator extends ServerWrappingIterator {
private int numRead = 0;
- private final AtomicLong seekCounter;
+ private final AtomicLong scanSeekCounter;
+ private final LongAdder serverSeekCounter;
private final AtomicLong scanCounter;
+ private final LongAdder tabletScanCounter;
private final LongAdder serverScanCounter;
+ private final List<StatsIterator> deepCopies =
Collections.synchronizedList(new ArrayList<>());
- public StatsIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong
seekCounter,
- AtomicLong tabletScanCounter, LongAdder serverScanCounter) {
+ public StatsIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong
scanSeekCounter,
+ LongAdder serverSeekCounter, AtomicLong scanCounter, LongAdder
tabletScanCounter,
+ LongAdder serverScanCounter) {
super(source);
- this.seekCounter = seekCounter;
- this.scanCounter = tabletScanCounter;
+ this.scanSeekCounter = scanSeekCounter;
+ this.serverSeekCounter = serverSeekCounter;
+ this.scanCounter = scanCounter;
+ this.tabletScanCounter = tabletScanCounter;
this.serverScanCounter = serverScanCounter;
}
@@ -51,31 +60,43 @@ public class StatsIterator extends ServerWrappingIterator {
source.next();
numRead++;
- if (numRead % 23 == 0) {
- scanCounter.addAndGet(numRead);
- serverScanCounter.add(numRead);
- numRead = 0;
+ if (numRead % 1009 == 0) {
+ // only report on self, do not force deep copies to report
+ report(false);
}
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new StatsIterator(source.deepCopy(env), seekCounter, scanCounter,
serverScanCounter);
+ var deepCopy = new StatsIterator(source.deepCopy(env), scanSeekCounter,
serverSeekCounter,
+ scanCounter, tabletScanCounter, serverScanCounter);
+ deepCopies.add(deepCopy);
+ return deepCopy;
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
throws IOException {
source.seek(range, columnFamilies, inclusive);
- seekCounter.incrementAndGet();
- scanCounter.addAndGet(numRead);
- serverScanCounter.add(numRead);
- numRead = 0;
+ serverSeekCounter.increment();
+ scanSeekCounter.incrementAndGet();
+ // only report on self, do not force deep copies to report
+ report(false);
}
- public void report() {
- scanCounter.addAndGet(numRead);
- serverScanCounter.add(numRead);
- numRead = 0;
+ public void report(boolean reportDeepCopies) {
+ if (numRead > 0) {
+ scanCounter.addAndGet(numRead);
+ tabletScanCounter.add(numRead);
+ serverScanCounter.add(numRead);
+ numRead = 0;
+ }
+
+ if (reportDeepCopies) {
+ // recurse down the fat tree of deep copies forcing them to report
+ for (var deepCopy : deepCopies) {
+ deepCopy.report(true);
+ }
+ }
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index cce1044e65..83c0633974 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -32,12 +32,12 @@ import java.util.function.Predicate;
import org.apache.accumulo.core.client.rfile.RFileSource;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.logging.LoggingBlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheType;
@@ -194,8 +194,8 @@ public class SummaryReader {
// the reason BCFile is used instead of RFile is to avoid reading in the
RFile meta block when
// only summary data is wanted.
CompositeCache compositeCache =
- new CompositeCache(LoggingBlockCache.wrap(CacheType.SUMMARY,
summaryCache),
- LoggingBlockCache.wrap(CacheType.INDEX, indexCache));
+ new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY,
summaryCache),
+ BlockCacheUtil.instrument(CacheType.INDEX, indexCache));
CachableBuilder cb = new CachableBuilder().fsPath(fs,
file).conf(conf).fileLen(fileLenCache)
.cacheProvider(new BasicCacheProvider(compositeCache,
null)).cryptoService(cryptoService);
bcReader = new CachableBlockFile.Reader(cb);
diff --git
a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java
new file mode 100644
index 0000000000..8ed962c53a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+import com.google.common.base.Preconditions;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * This class helps collect per scan information for the purposes of tracing.
+ */
+public abstract class ScanInstrumentation {
+ private static final ThreadLocal<ScanInstrumentation> INSTRUMENTED_SCANS =
new ThreadLocal<>();
+
+ private static final ScanInstrumentation NOOP_SI = new ScanInstrumentation()
{
+ @Override
+ public boolean enabled() {
+ return false;
+ }
+
+ @Override
+ public void incrementFileBytesRead(long amount) {}
+
+ @Override
+ public void incrementUncompressedBytesRead(long amount) {}
+
+ @Override
+ public void incrementCacheMiss(CacheType cacheType) {}
+
+ @Override
+ public void incrementCacheHit(CacheType cacheType) {}
+
+ @Override
+ public void incrementCacheBypass(CacheType cacheType) {}
+
+ @Override
+ public long getFileBytesRead() {
+ return 0;
+ }
+
+ @Override
+ public long getUncompressedBytesRead() {
+ return 0;
+ }
+
+ @Override
+ public int getCacheHits(CacheType cacheType) {
+ return 0;
+ }
+
+ @Override
+ public int getCacheMisses(CacheType cacheType) {
+ return 0;
+ }
+
+ @Override
+ public int getCacheBypasses(CacheType cacheType) {
+ return 0;
+ }
+ };
+
+ public interface ScanScope extends AutoCloseable {
+ @Override
+ void close();
+ }
+
+ public static ScanScope enable(Span span) {
+ if (span.isRecording()) {
+ INSTRUMENTED_SCANS.set(new ScanInstrumentationImpl());
+ var id = Thread.currentThread().getId();
+ return () -> {
+ Preconditions.checkState(id == Thread.currentThread().getId());
+ INSTRUMENTED_SCANS.remove();
+ };
+ } else {
+ return () -> {};
+ }
+ }
+
+ public static ScanInstrumentation get() {
+ var si = INSTRUMENTED_SCANS.get();
+ if (si == null) {
+ return NOOP_SI;
+ }
+ return si;
+ }
+
+ public abstract boolean enabled();
+
+ /**
+ * Increments the raw bytes read directly from DFS by a scan.
+ *
+ * @param amount the amount of bytes read
+ */
+ public abstract void incrementFileBytesRead(long amount);
+
+ /**
+ * Increments the uncompressed and decrypted bytes read by a scan. This will
include all
+ * uncompressed data read by a scan regardless of if the underlying data
came from cache or DFS.
+ */
+ public abstract void incrementUncompressedBytesRead(long amount);
+
+ /**
+ * Increments the count of rfile blocks that were not already in the cache.
+ */
+ public abstract void incrementCacheMiss(CacheType cacheType);
+
+ /**
+ * Increments the count of rfile blocks that were already in the cache.
+ */
+ public abstract void incrementCacheHit(CacheType cacheType);
+
+ /**
+ * Increments the count of rfile blocks that were directly read from DFS
bypassing the cache.
+ */
+ public abstract void incrementCacheBypass(CacheType cacheType);
+
+ public abstract long getFileBytesRead();
+
+ public abstract long getUncompressedBytesRead();
+
+ public abstract int getCacheHits(CacheType cacheType);
+
+ public abstract int getCacheMisses(CacheType cacheType);
+
+ public abstract int getCacheBypasses(CacheType cacheType);
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
new file mode 100644
index 0000000000..67754ee738
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.spi.cache.CacheType;
+
+class ScanInstrumentationImpl extends ScanInstrumentation {
+ private final AtomicLong fileBytesRead = new AtomicLong();
+ private final AtomicLong uncompressedBytesRead = new AtomicLong();
+ private final AtomicInteger[] cacheHits = new
AtomicInteger[CacheType.values().length];
+ private final AtomicInteger[] cacheMisses = new
AtomicInteger[CacheType.values().length];
+ private final AtomicInteger[] cacheBypasses = new
AtomicInteger[CacheType.values().length];
+
+ ScanInstrumentationImpl() {
+ for (int i = 0; i < CacheType.values().length; i++) {
+ cacheHits[i] = new AtomicInteger();
+ cacheMisses[i] = new AtomicInteger();
+ cacheBypasses[i] = new AtomicInteger();
+ }
+ }
+
+ @Override
+ public boolean enabled() {
+ return true;
+ }
+
+ @Override
+ public void incrementFileBytesRead(long amount) {
+ fileBytesRead.addAndGet(amount);
+ }
+
+ @Override
+ public void incrementUncompressedBytesRead(long amount) {
+ uncompressedBytesRead.addAndGet(amount);
+ }
+
+ @Override
+ public void incrementCacheMiss(CacheType cacheType) {
+ cacheMisses[cacheType.ordinal()].incrementAndGet();
+ }
+
+ @Override
+ public void incrementCacheHit(CacheType cacheType) {
+ cacheHits[cacheType.ordinal()].incrementAndGet();
+ }
+
+ @Override
+ public void incrementCacheBypass(CacheType cacheType) {
+ cacheBypasses[cacheType.ordinal()].incrementAndGet();
+ }
+
+ @Override
+ public long getFileBytesRead() {
+ return fileBytesRead.get();
+ }
+
+ @Override
+ public long getUncompressedBytesRead() {
+ return uncompressedBytesRead.get();
+ }
+
+ @Override
+ public int getCacheHits(CacheType cacheType) {
+ return cacheHits[cacheType.ordinal()].get();
+ }
+
+ @Override
+ public int getCacheMisses(CacheType cacheType) {
+ return cacheMisses[cacheType.ordinal()].get();
+ }
+
+ @Override
+ public int getCacheBypasses(CacheType cacheType) {
+ return cacheBypasses[cacheType.ordinal()].get();
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java
b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java
new file mode 100644
index 0000000000..e2c5b078b7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+public class TraceAttributes {
+ public static final AttributeKey<Long> ENTRIES_RETURNED_KEY =
+ AttributeKey.longKey("accumulo.scan.entries.returned");
+ public static final AttributeKey<Long> BYTES_RETURNED_KEY =
+ AttributeKey.longKey("accumulo.scan.bytes.returned");
+ public static final AttributeKey<Long> BYTES_READ_KEY =
+ AttributeKey.longKey("accumulo.scan.bytes.read");
+ public static final AttributeKey<Long> BYTES_READ_FILE_KEY =
+ AttributeKey.longKey("accumulo.scan.bytes.read.file");
+ public static final AttributeKey<String> EXECUTOR_KEY =
+ AttributeKey.stringKey("accumulo.scan.executor");
+ public static final AttributeKey<String> TABLE_ID_KEY =
+ AttributeKey.stringKey("accumulo.table.id");
+ public static final AttributeKey<String> EXTENT_KEY =
AttributeKey.stringKey("accumulo.extent");
+ public static final AttributeKey<Long> INDEX_HITS_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.index.hits");
+ public static final AttributeKey<Long> INDEX_MISSES_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.index.misses");
+ public static final AttributeKey<Long> INDEX_BYPASSES_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.index.bypasses");
+ public static final AttributeKey<Long> DATA_HITS_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.data.hits");
+ public static final AttributeKey<Long> DATA_MISSES_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.data.misses");
+ public static final AttributeKey<Long> DATA_BYPASSES_KEY =
+ AttributeKey.longKey("accumulo.scan.cache.data.bypasses");
+ public static final AttributeKey<String> SERVER_KEY =
AttributeKey.stringKey("accumulo.server");
+ public static final AttributeKey<Long> ENTRIES_READ_KEY =
+ AttributeKey.longKey("accumulo.scan.entries.read");
+ public static final AttributeKey<Long> SEEKS_KEY =
AttributeKey.longKey("accumulo.scan.seeks");
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java
b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java
new file mode 100644
index 0000000000..15d41b18d7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+/**
+ * This class was copied from Guava and modified. If this class was not final
in Guava it could have
+ * been extended. Guava has issue 590 open about this.
+ *
+ * An {@link InputStream} that counts the number of bytes read.
+ *
+ * @author Chris Nokleberg
+ */
+public final class CountingInputStream extends FilterInputStream {
+
+ private long count;
+ private long mark = -1;
+
+ /**
+ * Wraps another input stream, counting the number of bytes read.
+ *
+ * @param in the input stream to be wrapped
+ */
+ public CountingInputStream(InputStream in) {
+ super(Objects.requireNonNull(in));
+ }
+
+ /** Returns the number of bytes read. */
+ public long getCount() {
+ return count;
+ }
+
+ /** Resets the count of bytes read to zero */
+ public void resetCount() {
+ count = 0;
+ }
+
+ /** Returns the input stream this is wrapping */
+ public InputStream getWrappedStream() {
+ return in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = in.read();
+ if (result != -1) {
+ count++;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = in.read(b, off, len);
+ if (result != -1) {
+ count += result;
+ }
+ return result;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long result = in.skip(n);
+ count += result;
+ return result;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ mark = count;
+ // it's okay to mark even if mark isn't supported, as reset won't work
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (!in.markSupported()) {
+ throw new IOException("Mark not supported");
+ }
+ if (mark == -1) {
+ throw new IOException("Mark not set");
+ }
+
+ in.reset();
+ count = mark;
+ }
+}
diff --git a/pom.xml b/pom.xml
index e7ed0e963f..6e07454736 100644
--- a/pom.xml
+++ b/pom.xml
@@ -345,6 +345,18 @@ under the License.
<artifactId>commons-validator</artifactId>
<version>1.10.0</version>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry.javaagent</groupId>
+ <artifactId>opentelemetry-javaagent</artifactId>
+ <!-- This version was selected because it aligns with the version of
open telemetry that Accumulo 2.1 is using. -->
+ <version>2.14.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.proto</groupId>
+ <artifactId>opentelemetry-proto</artifactId>
+ <!-- This version was selected because it aligns with the version of
protocol buffers that Accumulo 2.1 is using. -->
+ <version>1.3.2-alpha</version>
+ </dependency>
<dependency>
<!-- legacy junit version specified here for dependency convergence -->
<groupId>junit</groupId>
@@ -1020,6 +1032,8 @@ under the License.
<unused>org.junit.vintage:junit-vintage-engine:jar:*</unused>
<unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused>
<unused>org.lz4:lz4-java:jar:*</unused>
+ <!-- This is only used at runtime only by test, not at compile
time -->
+
<unused>io.opentelemetry.javaagent:opentelemetry-javaagent:jar:*</unused>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
index 3715eac0c2..8226f20dd3 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.scan.ScanServerInfo;
+import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
@@ -61,4 +62,6 @@ public interface TabletHostingServer {
GarbageCollectionLogger getGcLogger();
BlockCacheManager.Configuration
getBlockCacheConfiguration(AccumuloConfiguration acuConf);
+
+ HostAndPort getAdvertiseAddress();
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2573d6b451..25ea527672 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -227,7 +228,7 @@ public class TabletServer extends AbstractServer
private String lockID;
private volatile long lockSessionId = -1;
- public static final AtomicLong seekCount = new AtomicLong(0);
+ public static final LongAdder seekCount = new LongAdder();
private final AtomicLong totalMinorCompactions = new AtomicLong(0);
@@ -1201,7 +1202,7 @@ public class TabletServer extends AbstractServer
result.osLoad =
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
result.name = getClientAddressString();
result.holdTime = resourceManager.holdTime();
- result.lookups = seekCount.get();
+ result.lookups = seekCount.sum();
result.indexCacheHits =
resourceManager.getIndexCache().getStats().hitCount();
result.indexCacheRequest =
resourceManager.getIndexCache().getStats().requestCount();
result.dataCacheHits =
resourceManager.getDataCache().getStats().hitCount();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 6bcfbc4b9e..1e6c408ee5 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -44,6 +44,7 @@ import
org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.trace.TraceAttributes;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig;
import org.apache.accumulo.server.fs.FileManager.ScanFileManager;
@@ -57,6 +58,10 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
+import io.opentelemetry.api.trace.Span;
+
class ScanDataSource implements DataSource {
private static final Logger log =
LoggerFactory.getLogger(ScanDataSource.class);
@@ -67,7 +72,7 @@ class ScanDataSource implements DataSource {
private SortedKeyValueIterator<Key,Value> iter;
private long expectedDeletionCount;
private List<MemoryIterator> memIters = null;
- private long fileReservationId;
+ private long fileReservationId = -1;
private final AtomicBoolean interruptFlag;
private StatsIterator statsIterator;
@@ -76,6 +81,9 @@ class ScanDataSource implements DataSource {
private final byte[] defaultLabels;
private final long scanDataSourceId;
+ private final AtomicLong scanSeekCounter;
+ private final AtomicLong scanCounter;
+
ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean
loadIters,
AtomicBoolean interruptFlag) {
this.tablet = tablet;
@@ -85,6 +93,8 @@ class ScanDataSource implements DataSource {
this.loadIters = loadIters;
this.defaultLabels = tablet.getDefaultSecurityLabels();
this.scanDataSourceId = nextSourceId.incrementAndGet();
+ this.scanSeekCounter = new AtomicLong();
+ this.scanCounter = new AtomicLong();
log.trace("new scan data source, scanId {}, tablet: {}, params: {},
loadIterators: {}",
this.scanDataSourceId, this.tablet, this.scanParams, this.loadIters);
}
@@ -111,6 +121,9 @@ class ScanDataSource implements DataSource {
} finally {
expectedDeletionCount = tablet.getDataSourceDeletions();
iter = null;
+ if (statsIterator != null) {
+ statsIterator.report(true);
+ }
}
}
}
@@ -174,6 +187,7 @@ class ScanDataSource implements DataSource {
memIters = tablet.getMemIterators(samplerConfig);
Pair<Long,Map<TabletFile,DataFileValue>> reservation =
tablet.reserveFilesForScan();
fileReservationId = reservation.getFirst();
+ Preconditions.checkState(fileReservationId >= 0);
files = reservation.getSecond();
}
@@ -200,8 +214,8 @@ class ScanDataSource implements DataSource {
}
SystemIteratorEnvironment iterEnv = (SystemIteratorEnvironment)
builder.build();
- statsIterator = new StatsIterator(multiIter, TabletServer.seekCount,
tablet.getScannedCounter(),
- tablet.getScanMetrics().getScannedCounter());
+ statsIterator = new StatsIterator(multiIter, scanSeekCounter,
TabletServer.seekCount,
+ scanCounter, tablet.getScannedCounter(),
tablet.getScanMetrics().getScannedCounter());
SortedKeyValueIterator<Key,Value> visFilter =
SystemIteratorUtil.setupSystemScanIterators(statsIterator,
scanParams.getColumnSet(),
@@ -257,9 +271,11 @@ class ScanDataSource implements DataSource {
tablet.returnMemIterators(memIters);
memIters = null;
try {
- log.trace("Returning file iterators for {}, scanId:{}, fid:{}",
tablet.getExtent(),
- scanDataSourceId, fileReservationId);
- tablet.returnFilesForScan(fileReservationId);
+ if (fileReservationId >= 0) {
+ log.trace("Returning file iterators for {}, scanId:{}, fid:{}",
tablet.getExtent(),
+ scanDataSourceId, fileReservationId);
+ tablet.returnFilesForScan(fileReservationId);
+ }
} catch (Exception e) {
log.warn("Error Returning file iterators for scan: {}, :{}",
scanDataSourceId, e);
// Continue bubbling the exception up for handling.
@@ -270,8 +286,16 @@ class ScanDataSource implements DataSource {
}
}
+ private boolean closed = false;
+
@Override
public void close(boolean sawErrors) {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+
try {
returnIterators();
} finally {
@@ -288,12 +312,20 @@ class ScanDataSource implements DataSource {
} finally {
fileManager = null;
if (statsIterator != null) {
- statsIterator.report();
+ statsIterator.report(true);
}
}
}
}
+ public void setAttributes(Span span) {
+ if (statsIterator != null && span.isRecording()) {
+ statsIterator.report(true);
+ span.setAttribute(TraceAttributes.ENTRIES_READ_KEY, scanCounter.get());
+ span.setAttribute(TraceAttributes.SEEKS_KEY, scanSeekCounter.get());
+ }
+ }
+
public void interrupt() {
interruptFlag.set(true);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 2b89a2005b..9e2a7c7af0 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -30,7 +30,11 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ShutdownUtil;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
import org.apache.accumulo.tserver.scan.ScanParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +72,23 @@ public class Scanner {
}
public ScanBatch read() throws IOException, TabletClosedException {
+ var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch");
+ try (var scope = span.makeCurrent(); var scanScope =
ScanInstrumentation.enable(span)) {
+ var batchAndSource = readInternal();
+ // This needs to be called after the ScanDataSource was closed inorder
to make sure all
+ // statistics related to files reads are seen.
+ tablet.recordScanTrace(span, batchAndSource.getFirst().getResults(),
scanParams,
+ batchAndSource.getSecond());
+ return batchAndSource.getFirst();
+ } catch (IOException | RuntimeException e) {
+ span.recordException(e);
+ throw e;
+ } finally {
+ span.end();
+ }
+ }
+
+ private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException,
TabletClosedException {
ScanDataSource dataSource = null;
@@ -121,13 +142,13 @@ public class Scanner {
if (results.getResults() == null) {
range = null;
- return new ScanBatch(new ArrayList<>(), false);
+ return new Pair<>(new ScanBatch(new ArrayList<>(), false), dataSource);
} else if (results.getContinueKey() == null) {
- return new ScanBatch(results.getResults(), false);
+ return new Pair<>(new ScanBatch(results.getResults(), false),
dataSource);
} else {
range = new Range(results.getContinueKey(),
!results.isSkipContinueKey(), range.getEndKey(),
range.isEndKeyInclusive());
- return new ScanBatch(results.getResults(), true);
+ return new Pair<>(new ScanBatch(results.getResults(), true),
dataSource);
}
} catch (IterationInterruptedException iie) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 90a53c734a..d7ff6a7c8d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1793,7 +1793,7 @@ public class Tablet extends TabletBase {
}
public long totalScannedCount() {
- return this.scannedCount.get();
+ return this.scannedCount.sum();
}
public long totalLookupCount() {
@@ -1806,7 +1806,7 @@ public class Tablet extends TabletBase {
queryByteRate.update(now, this.queryResultBytes.get());
ingestRate.update(now, ingestCount);
ingestByteRate.update(now, ingestBytes);
- scannedRate.update(now, this.scannedCount.get());
+ scannedRate.update(now, this.scannedCount.sum());
}
public long getSplitCreationTime() {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index dcc54e9da7..0ac8f6bd14 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -31,6 +31,7 @@ import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -48,6 +49,10 @@ import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.spi.cache.CacheType;
+import org.apache.accumulo.core.trace.ScanInstrumentation;
+import org.apache.accumulo.core.trace.TraceAttributes;
+import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ShutdownUtil;
@@ -62,6 +67,8 @@ import org.apache.accumulo.tserver.scan.ScanParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.opentelemetry.api.trace.Span;
+
/**
* This class exists to share code for scanning a tablet between {@link
Tablet} and
* {@link SnapshotTablet}
@@ -79,7 +86,7 @@ public abstract class TabletBase {
protected AtomicLong lookupCount = new AtomicLong(0);
protected AtomicLong queryResultCount = new AtomicLong(0);
protected AtomicLong queryResultBytes = new AtomicLong(0);
- protected final AtomicLong scannedCount = new AtomicLong(0);
+ protected final LongAdder scannedCount = new LongAdder();
protected final Set<ScanDataSource> activeScans = new HashSet<>();
@@ -154,7 +161,7 @@ public abstract class TabletBase {
return new Scanner(this, range, scanParams, interruptFlag);
}
- public AtomicLong getScannedCounter() {
+ public LongAdder getScannedCounter() {
return this.scannedCount;
}
@@ -205,25 +212,30 @@ public abstract class TabletBase {
tabletRange.clip(range);
}
- SourceSwitchingIterator.DataSource dataSource =
- createDataSource(scanParams, true, interruptFlag);
+ ScanDataSource dataSource = createDataSource(scanParams, true,
interruptFlag);
Tablet.LookupResult result = null;
boolean sawException = false;
- try {
+ var span = TraceUtil.startSpan(TabletBase.class, "multiscan-batch");
+ try (var scope = span.makeCurrent(); var scanScope =
ScanInstrumentation.enable(span)) {
SortedKeyValueIterator<Key,Value> iter = new
SourceSwitchingIterator(dataSource);
this.lookupCount.incrementAndGet();
this.server.getScanMetrics().incrementLookupCount(1);
result = lookup(iter, ranges, results, scanParams, maxResultSize);
+ // must close data source before recording scan trace in order to flush
all file read stats
+ dataSource.close(false);
+ recordScanTrace(span, results, scanParams, dataSource);
return result;
} catch (IOException | RuntimeException e) {
sawException = true;
+ span.recordException(e);
throw e;
} finally {
// code in finally block because always want
// to return mapfiles, even when exception is thrown
dataSource.close(sawException);
+ span.end();
synchronized (this) {
queryResultCount.addAndGet(results.size());
@@ -236,6 +248,34 @@ public abstract class TabletBase {
}
}
+ void recordScanTrace(Span span, List<KVEntry> batch, ScanParameters
scanParameters,
+ ScanDataSource dataSource) {
+ if (span.isRecording()) {
+ span.setAttribute(TraceAttributes.ENTRIES_RETURNED_KEY, batch.size());
+ long bytesReturned = 0;
+ for (var e : batch) {
+ bytesReturned += e.getKey().getLength() + e.getValue().get().length;
+ }
+ span.setAttribute(TraceAttributes.BYTES_RETURNED_KEY, bytesReturned);
+ span.setAttribute(TraceAttributes.EXECUTOR_KEY,
+ scanParameters.getScanDispatch().getExecutorName());
+ span.setAttribute(TraceAttributes.TABLE_ID_KEY,
getExtent().tableId().canonical());
+ span.setAttribute(TraceAttributes.EXTENT_KEY, getExtent().toString());
+ var si = ScanInstrumentation.get();
+ span.setAttribute(TraceAttributes.BYTES_READ_FILE_KEY,
si.getFileBytesRead());
+ span.setAttribute(TraceAttributes.BYTES_READ_KEY,
si.getUncompressedBytesRead());
+ span.setAttribute(TraceAttributes.INDEX_HITS_KEY,
si.getCacheHits(CacheType.INDEX));
+ span.setAttribute(TraceAttributes.INDEX_MISSES_KEY,
si.getCacheMisses(CacheType.INDEX));
+ span.setAttribute(TraceAttributes.INDEX_BYPASSES_KEY,
si.getCacheBypasses(CacheType.INDEX));
+ span.setAttribute(TraceAttributes.DATA_HITS_KEY,
si.getCacheHits(CacheType.DATA));
+ span.setAttribute(TraceAttributes.DATA_MISSES_KEY,
si.getCacheMisses(CacheType.DATA));
+ span.setAttribute(TraceAttributes.DATA_BYPASSES_KEY,
si.getCacheBypasses(CacheType.DATA));
+ span.setAttribute(TraceAttributes.SERVER_KEY,
server.getAdvertiseAddress().toString());
+
+ dataSource.setAttributes(span);
+ }
+ }
+
Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range,
ScanParameters scanParams)
throws IOException {
diff --git a/test/pom.xml b/test/pom.xml
index c58e377869..4430f6d929 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -62,6 +62,10 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@@ -82,6 +86,14 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry.proto</groupId>
+ <artifactId>opentelemetry-proto</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-compaction-coordinator</artifactId>
@@ -194,6 +206,10 @@
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
<dependency>
<groupId>org.jline</groupId>
<artifactId>jline</artifactId>
@@ -214,6 +230,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry.javaagent</groupId>
+ <artifactId>opentelemetry-javaagent</artifactId>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index f20cf31afe..a6f85f9aaf 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -246,6 +247,10 @@ public class TestIngest {
}
}
+ public static IteratorSetting.Column generateColumn(IngestParams params, int
column) {
+ return new IteratorSetting.Column(new Text(params.columnFamily),
generateQualifier(column));
+ }
+
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
@@ -299,7 +304,7 @@ public class TestIngest {
Mutation m = new Mutation(row);
for (int j = 0; j < params.cols; j++) {
Text colf = new Text(params.columnFamily);
- Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10,
COL_PREFIX));
+ Text colq = generateQualifier(j);
if (writer != null) {
Key key = new Key(row, colf, colq, labBA);
@@ -405,6 +410,11 @@ public class TestIngest {
elapsed);
}
+ private static Text generateQualifier(int j) {
+ Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
+ return colq;
+ }
+
public static void ingest(AccumuloClient c, IngestParams params)
throws MutationsRejectedException, IOException, AccumuloException,
AccumuloSecurityException,
TableNotFoundException, TableExistsException {
diff --git
a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java
b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java
new file mode 100644
index 0000000000..2196302fea
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Range;
+
+import com.google.gson.FormattingStyle;
+import com.google.gson.GsonBuilder;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+
+public class ScanTraceClient {
+
+ public static class Options {
+ String clientPropsPath;
+ String table;
+ String startRow;
+ String endRow;
+ String family;
+ String qualifier;
+
+ Options() {}
+
+ Options(String table) {
+ this.table = table;
+ }
+
+ void conigureScanner(Scanner scanner) {
+ if (startRow != null || endRow != null) {
+ scanner.setRange(new Range(startRow, true, endRow, false));
+ }
+ setColumn(scanner);
+ }
+
+ void conigureScanner(BatchScanner scanner) {
+ if (startRow != null || endRow != null) {
+ scanner.setRanges(List.of(new Range(startRow, true, endRow, false)));
+ } else {
+ scanner.setRanges(List.of(new Range()));
+ }
+ setColumn(scanner);
+ }
+
+ void setColumn(ScannerBase scanner) {
+ System.out.println(scanner.getClass().getName() + " fam " + family);
+ if (family != null) {
+ scanner.fetchColumn(family, qualifier);
+ }
+ }
+
+ }
+
+ public static class Results {
+ // trace id for the batch scan
+ String traceId1;
+ // trace id for the normal scan
+ String traceId2;
+ // The number of entries returned by both scans
+ long scanCount;
+ // The number of bytes returned by both scans
+ long scanSize;
+
+ @Override
+ public String toString() {
+ return "Results{scanCount=" + scanCount + ", traceId1='" + traceId1 +
"', traceId2='"
+ + traceId2 + "', scanSize=" + scanSize + '}';
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Options opts = new GsonBuilder().create().fromJson(args[0], Options.class);
+
+ String clientPropsPath = opts.clientPropsPath;
+ String table = opts.table;
+
+ Tracer tracer =
GlobalOpenTelemetry.get().getTracer(ScanTraceClient.class.getName());
+ try (var client = Accumulo.newClient().from(clientPropsPath).build()) {
+ long scanCount = 0;
+ long scanSize = 0;
+ long batchScancount = 0;
+ long batchScanSize = 0;
+
+ Span span = tracer.spanBuilder("batch-scan").startSpan();
+ try (var scanner = client.createBatchScanner(table); var scope =
span.makeCurrent()) {
+ opts.conigureScanner(scanner);
+ for (var entry : scanner) {
+ batchScancount++;
+ batchScanSize += entry.getKey().getSize() +
entry.getValue().getSize();
+ }
+ } finally {
+ span.end();
+ }
+ var traceId1 = span.getSpanContext().getTraceId();
+
+ // start a second trace
+ span = tracer.spanBuilder("seq-scan").startSpan();
+ try (var scanner = client.createScanner(table); var scope =
span.makeCurrent()) {
+ opts.conigureScanner(scanner);
+ scanner.setBatchSize(10_000);
+ for (var entry : scanner) {
+ scanCount++;
+ scanSize += entry.getKey().getSize() + entry.getValue().getSize();
+ }
+ } finally {
+ span.end();
+ }
+ var traceId2 = span.getSpanContext().getTraceId();
+
+ assertEquals(scanCount, batchScancount);
+ assertEquals(scanSize, batchScanSize);
+ assertNotEquals(traceId1, traceId2);
+
+ Results results = new Results();
+ results.traceId1 = traceId1;
+ results.traceId2 = traceId2;
+ results.scanCount = scanCount;
+ results.scanSize = scanSize;
+
+ var gson = new
GsonBuilder().setFormattingStyle(FormattingStyle.COMPACT).create();
+ System.out.println("RESULT:" + gson.toJson(results));
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java
b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java
new file mode 100644
index 0000000000..8d0fe70bba
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import static org.apache.accumulo.core.trace.TraceAttributes.EXECUTOR_KEY;
+import static org.apache.accumulo.core.trace.TraceAttributes.EXTENT_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.trace.TraceAttributes;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+
+class ScanTracingIT extends ConfigurableMacBase {
+
+ private static final int OTLP_PORT = PortUtils.getRandomFreePort();
+
+ private static List<String> getJvmArgs() {
+ String javaAgent = null;
+ for (var cpi : System.getProperty("java.class.path").split(":")) {
+ if (cpi.contains("opentelemetry-javaagent")) {
+ javaAgent = cpi;
+ }
+ }
+
+ Objects.requireNonNull(javaAgent);
+
+ return List.of("-Dotel.traces.exporter=otlp",
"-Dotel.exporter.otlp.protocol=http/protobuf",
+ "-Dotel.exporter.otlp.endpoint=http://localhost:" + OTLP_PORT,
+ "-Dotel.metrics.exporter=none", "-Dotel.logs.exporter=none",
"-javaagent:" + javaAgent);
+ }
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ getJvmArgs().forEach(cfg::addJvmOption);
+ // sized such that full table scans will not fit in the cache
+ cfg.setProperty(Property.TSERV_DATACACHE_SIZE.getKey(), "8M");
+ cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() +
"pool1.threads", "8");
+ }
+
+ private TraceCollector collector;
+
+ @BeforeEach
+ public void startCollector() throws Exception {
+ collector = new TraceCollector("localhost", OTLP_PORT);
+ }
+
+ @AfterEach
+ public void stopCollector() throws Exception {
+ collector.stop();
+ }
+
+ @Test
+ public void test() throws Exception {
+ var names = getUniqueNames(7);
+ runTest(names[0], 0, false, false, -1, -1, -1);
+ runTest(names[1], 10, false, false, -1, -1, -1);
+ runTest(names[2], 0, true, false, -1, -1, -1);
+ runTest(names[3], 0, false, false, -1, -1, 2);
+ runTest(names[4], 0, false, false, 32, 256, -1);
+ runTest(names[5], 0, true, true, 32, 256, -1);
+ runTest(names[6], 0, true, false, -1, -1, 2);
+ }
+
+ private void runTest(String tableName, int numSplits, boolean cacheData,
+ boolean secondScanFitsInCache, int startRow, int endRow, int column)
throws Exception {
+
+ var ingestParams = new TestIngest.IngestParams(getClientProperties(),
tableName);
+ ingestParams.createTable = false;
+ ingestParams.rows = 1000;
+ ingestParams.cols = 10;
+
+ try (var client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ var ntc = new NewTableConfiguration();
+ if (numSplits > 0) {
+ var splits = TestIngest.getSplitPoints(0, 1000, numSplits);
+ ntc.withSplits(splits);
+ }
+
+ var props = new HashMap<String,String>();
+
+ if (cacheData) {
+ props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+ }
+
+ // use a different executor for batch scans
+ props.put("table.scan.dispatcher.opts.multi_executor", "pool1");
+
+ ntc.setProperties(props);
+
+ client.tableOperations().create(tableName, ntc);
+
+ TestIngest.ingest(client, ingestParams);
+ client.tableOperations().flush(tableName, null, null, true);
+ }
+
+ long expectedRows = ingestParams.rows;
+
+ var options = new ScanTraceClient.Options(tableName);
+ if (startRow != -1 && endRow != -1) {
+ options.startRow = TestIngest.generateRow(startRow, 0).toString();
+ options.endRow = TestIngest.generateRow(endRow, 0).toString();
+ expectedRows = IntStream.range(startRow, endRow).count();
+ }
+
+ int expectedColumns = ingestParams.cols;
+
+ if (column != -1) {
+ var col = TestIngest.generateColumn(ingestParams, column);
+ options.family = col.getColumnFamily().toString();
+ options.qualifier = col.getColumnQualifier().toString();
+ expectedColumns = 1;
+ }
+
+ var results = run(options);
+ System.out.println(results);
+
+ var tableId = getServerContext().getTableId(tableName).canonical();
+
+ var expectedServers =
getServerContext().getAmple().readTablets().forTable(TableId.of(tableId))
+ .fetch(TabletMetadata.ColumnType.LOCATION).build().stream()
+ .map(tm ->
tm.getLocation().getHostAndPort().toString()).collect(Collectors.toSet());
+
+ ScanTraceStats scanStats = new ScanTraceStats(false);
+ ScanTraceStats batchScanStats = new ScanTraceStats(true);
+ Set<String> extents1 = new TreeSet<>();
+ Set<String> extents2 = new TreeSet<>();
+
+ while (scanStats.getEntriesReturned() < expectedRows * expectedColumns
+ || batchScanStats.getEntriesReturned() < expectedRows *
expectedColumns) {
+ var span = collector.take();
+ var stats = ScanTraceStats.create(span);
+ if (stats != null
+ &&
span.stringAttributes.get(TraceAttributes.TABLE_ID_KEY.getKey()).equals(tableId)
+ && (results.traceId1.equals(span.traceId) ||
results.traceId2.equals(span.traceId))) {
+ assertTrue(
+ expectedServers
+
.contains(span.stringAttributes.get(TraceAttributes.SERVER_KEY.getKey())),
+ () -> expectedServers + " " + span);
+ if (stats.isBatchScan()) {
+ assertEquals("pool1",
span.stringAttributes.get(EXECUTOR_KEY.getKey()));
+ } else {
+ assertEquals("default",
span.stringAttributes.get(EXECUTOR_KEY.getKey()));
+ }
+ if (numSplits == 0) {
+ assertEquals(tableId + "<<",
span.stringAttributes.get(EXTENT_KEY.getKey()));
+ } else {
+ var extent = span.stringAttributes.get(EXTENT_KEY.getKey());
+ assertTrue(extent.startsWith(tableId + ";") ||
extent.startsWith(tableId + "<"));
+ }
+ assertEquals(1, stats.getSeeks());
+ if (stats.isBatchScan()) {
+ assertEquals(results.traceId1, span.traceId);
+ extents1.add(span.stringAttributes.get(EXTENT_KEY.getKey()));
+ } else {
+ assertEquals(results.traceId2, span.traceId);
+ extents2.add(span.stringAttributes.get(EXTENT_KEY.getKey()));
+ }
+ } else {
+ continue;
+ }
+
+ if (stats.isBatchScan()) {
+ batchScanStats.merge(stats);
+ } else {
+ scanStats.merge(stats);
+ }
+ }
+
+ if (numSplits > 0) {
+ assertEquals(numSplits, extents1.size());
+ assertEquals(numSplits, extents2.size());
+ }
+
+ System.out.println(scanStats);
+ System.out.println(batchScanStats);
+
+ assertEquals(expectedRows * expectedColumns, results.scanCount,
results::toString);
+
+ var statsList = List.of(batchScanStats, scanStats);
+ for (int i = 0; i < statsList.size(); i++) {
+ var stats = statsList.get(i);
+ assertEquals(expectedRows * 10, stats.getEntriesRead(), stats::toString);
+ assertEquals(results.scanCount, stats.getEntriesReturned(),
stats::toString);
+ // When filtering on columns will read more data than we return
+ double colMultiplier = 10.0 / expectedColumns;
+ assertClose((long) (results.scanSize * colMultiplier),
stats.getBytesRead(), .05);
+ assertClose(results.scanSize, stats.getBytesReturned(), .05);
+ if (secondScanFitsInCache && i == 1) {
+ assertEquals(0, stats.getFileBytesRead(), stats::toString);
+ } else {
+ assertClose((long) (stats.getBytesRead() * .005),
stats.getFileBytesRead(), .2);
+ }
+ if (cacheData) {
+ assertEquals(0, stats.getDataCacheBypasses(), stats::toString);
+ assertTrue(stats.getDataCacheHits() + stats.getDataCacheMisses() > 0,
stats::toString);
+ if (stats.getFileBytesRead() == 0) {
+ assertEquals(0L, stats.getDataCacheMisses(), stats::toString);
+ }
+ // When caching data, does not seem to hit the cache much
+ var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses();
+ assertTrue(cacheSum == 0 || cacheSum == 1, stats::toString);
+ } else {
+ assertEquals(0, stats.getDataCacheHits(), stats::toString);
+ assertEquals(0, stats.getDataCacheMisses(), stats::toString);
+ assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(),
stats::toString);
+ assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(),
.05);
+ }
+ assertEquals(0, stats.getIndexCacheBypasses(), stats::toString);
+ }
+
+ }
+
+ public void assertClose(long expected, long value, double e) {
+ assertTrue(Math.abs(1 - (double) expected / (double) value) < e,
+ () -> expected + " " + value + " " + e);
+ }
+
+ /**
+ * Runs ScanTraceClient in an external process so it can be instrumented
with the open telemetry
+ * java agent. Use json to get data to/from external process.
+ */
+ public ScanTraceClient.Results run(ScanTraceClient.Options opts)
+ throws IOException, InterruptedException {
+ opts.clientPropsPath = getCluster().getClientPropsPath();
+ var proc = getCluster().exec(ScanTraceClient.class, getJvmArgs(), new
Gson().toJson(opts));
+ assertEquals(0, proc.getProcess().waitFor());
+ var out = proc.readStdOut();
+ var result = Arrays.stream(out.split("\\n")).filter(line ->
line.startsWith("RESULT:"))
+ .findFirst().orElse("RESULT:{}");
+ result = result.substring("RESULT:".length());
+ return new Gson().fromJson(result, ScanTraceClient.Results.class);
+ }
+
+ /**
+ * Helper class that encapsulates data from a scan trace making it easier to
access and
+ * centralizing the code for accessing data from a span.
+ */
+ static class ScanTraceStats {
+ final Map<String,Long> scanStats;
+ final boolean isBatchScan;
+
+ ScanTraceStats(SpanData spanData) {
+ this.scanStats = spanData.integerAttributes;
+ this.isBatchScan = spanData.name.contains("multiscan-batch");
+ }
+
+ ScanTraceStats(boolean isBatchScan) {
+ scanStats = new TreeMap<>();
+ this.isBatchScan = isBatchScan;
+ }
+
+ void merge(ScanTraceStats other) {
+ Preconditions.checkArgument(isBatchScan == other.isBatchScan);
+ other.scanStats.forEach((k, v) -> {
+ scanStats.merge(k, v, Long::sum);
+ });
+ }
+
+ /**
+ * @return a ScanTrace if span is from a scan batch, otherwise return null
+ */
+ static ScanTraceStats create(SpanData data) {
+ if (data.name.contains("scan-batch")) {
+ return new ScanTraceStats(data);
+ }
+ return null;
+ }
+
+ boolean isBatchScan() {
+ return isBatchScan;
+ }
+
+ long getEntriesRead() {
+ return scanStats.getOrDefault(TraceAttributes.ENTRIES_READ_KEY.getKey(),
0L);
+ }
+
+ long getEntriesReturned() {
+ return
scanStats.getOrDefault(TraceAttributes.ENTRIES_RETURNED_KEY.getKey(), 0L);
+ }
+
+ long getFileBytesRead() {
+ return
scanStats.getOrDefault(TraceAttributes.BYTES_READ_FILE_KEY.getKey(), 0L);
+ }
+
+ long getBytesRead() {
+ return scanStats.getOrDefault(TraceAttributes.BYTES_READ_KEY.getKey(),
0L);
+ }
+
+ long getBytesReturned() {
+ return
scanStats.getOrDefault(TraceAttributes.BYTES_RETURNED_KEY.getKey(), 0L);
+ }
+
+ long getDataCacheHits() {
+ return scanStats.getOrDefault(TraceAttributes.DATA_HITS_KEY.getKey(),
0L);
+ }
+
+ long getDataCacheMisses() {
+ return scanStats.getOrDefault(TraceAttributes.DATA_MISSES_KEY.getKey(),
0L);
+ }
+
+ long getDataCacheBypasses() {
+ return
scanStats.getOrDefault(TraceAttributes.DATA_BYPASSES_KEY.getKey(), 0L);
+ }
+
+ long getIndexCacheHits() {
+ return scanStats.getOrDefault(TraceAttributes.INDEX_HITS_KEY.getKey(),
0L);
+ }
+
+ long getIndexCacheMisses() {
+ return scanStats.getOrDefault(TraceAttributes.INDEX_MISSES_KEY.getKey(),
0L);
+ }
+
+ long getIndexCacheBypasses() {
+ return
scanStats.getOrDefault(TraceAttributes.INDEX_BYPASSES_KEY.getKey(), 0L);
+ }
+
+ long getSeeks() {
+ return scanStats.getOrDefault(TraceAttributes.SEEKS_KEY.getKey(), 0L);
+ }
+
+ @Override
+ public String toString() {
+ return "ScanTraceStats{isBatchScan=" + isBatchScan + ", scanStats=" +
scanStats + '}';
+ }
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java
b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java
new file mode 100644
index 0000000000..c84845275a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import java.util.Map;
+
+public class SpanData {
+
+ public final String traceId;
+ public final String name;
+ public final Map<String,String> stringAttributes;
+ public final Map<String,Long> integerAttributes;
+
+ public SpanData(String traceId, String name, Map<String,String>
stringAttributes,
+ Map<String,Long> integerAttributes) {
+ this.traceId = traceId;
+ this.name = name;
+ this.stringAttributes = stringAttributes;
+ this.integerAttributes = integerAttributes;
+ }
+
+ @Override
+ public String toString() {
+ return "SpanData{traceId='" + traceId + "', name='" + name + "',
stringAttributes="
+ + stringAttributes + ", integerAttributes=" + integerAttributes + '}';
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java
b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java
new file mode 100644
index 0000000000..9a3b6c9701
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.tracing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+import org.apache.commons.codec.binary.Hex;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * Open telemetry tracing data sink for testing. Processes can send
http/protobuf trace data to this
+ * sink over http, and it will add them to an in memory queue that tests can
read from.
+ */
+public class TraceCollector {
+ private final Server server;
+
+ private final LinkedBlockingQueue<SpanData> spanQueue = new
LinkedBlockingQueue<>();
+
+ private class TraceHandler extends AbstractHandler {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest
request,
+ HttpServletResponse response) throws IOException {
+
+ if (!target.equals("/v1/traces")) {
+ System.err.println("unexpected target : " + target);
+ response.setStatus(404);
+ response.getOutputStream().close();
+ return;
+ }
+
+ var body = request.getInputStream().readAllBytes();
+ try {
+ var etsr =
+
io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest.parseFrom(body);
+ var spans =
+ etsr.getResourceSpansList().stream().flatMap(r ->
r.getScopeSpansList().stream())
+ .flatMap(r ->
r.getSpansList().stream()).collect(Collectors.toList());
+
+ spans.forEach(s -> {
+ var traceId = Hex.encodeHexString(s.getTraceId().toByteArray(),
true);
+
+ Map<String,String> stringAttrs = new HashMap<>();
+ Map<String,Long> intAttrs = new HashMap<>();
+
+ s.getAttributesList().forEach(kv -> {
+ if (kv.getValue().hasIntValue()) {
+ intAttrs.put(kv.getKey(), kv.getValue().getIntValue());
+ } else if (kv.getValue().hasStringValue()) {
+ stringAttrs.put(kv.getKey(), kv.getValue().getStringValue());
+ }
+ });
+
+ spanQueue.add(
+ new SpanData(traceId, s.getName(), Map.copyOf(stringAttrs),
Map.copyOf(intAttrs)));
+ });
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+
+ response.setStatus(200);
+ response.getOutputStream().close();
+ }
+ }
+
+ TraceCollector(String host, int port) throws Exception {
+ server = new Server(new InetSocketAddress(host, port));
+ server.setHandler(new TraceHandler());
+ server.start();
+ }
+
+ SpanData take() throws InterruptedException {
+ return spanQueue.take();
+ }
+
+ void stop() throws Exception {
+ server.stop();
+ }
+}