billoley opened a new issue, #5245:
URL: https://github.com/apache/accumulo/issues/5245

   Another variation on https://github.com/apache/accumulo/issues/3617 that was 
fixed in 2.1.2.
   
   While testing a new datawave feature using Accumulo 2.1.3 and Hadoop 3.3.5.  
In RFile$LocalityGroupReader _seek or _next, when calling currBlock = 
getDataBlock(indexEntry); on line 855 or 1044, if the thread is interrupted 
while in the HDFS code while executing 
DFSInputStream.blockSeekTo(DFSInputStream.java:645) causing an 
InterruptedIOException the variable DFSInputStream.blockReader is left as null 
which will cause a NullPointerException if the stream is used again.
   
   I think what happened is that when the Datawave code was yielding in an 
Ivarator (**separate thread**), I cancelled the Future that the Ivarator 
Runnable was in with interrupt=true.  While the InterruptedIOExecption did 
propagate through the RFile$LocalityGroupReader, it did not get propagated back 
to the LookupTask.
   
   If propagating that Exception will be enough to not keep subsequent calls 
from using the RFile, then I think the Accumulo code is OK.. but I have not 
verified this.  If the clean-up code in RFileLocalityGroupReader.reset should 
be able to deal with the exception happening in a separate thread, then there 
may be an issue.... which is probably the same issue that was previously worked.
   
   I think that if the RFile gets re-used, then even though 
CacheableBlockFile.Reader.bcfr gets set to null when 
RFile.LocalityGroupReader.reset() calls CacheableBlockFile.Reader.close on an 
Exception, a subsequent call to CacheableBlockFile.Reader.getBCFile(byte[] 
serializedMetadata) will re-populate bcfr using the InputStream from the 
CacheableBlockFile.Reader.inputSupplier.  I have not traced this yet, but I 
think that's how the code would work and is my best explanation for how I am 
hitting this same condition again even though the improved cleanup code is 
in-place where bcfr is set to null and the reader is closed.
   
   
   The cleanup code in RFileLocalityGroupReader.reset that the seek or next 
catch block that calls
   `
       private void reset(boolean exceptionThrown) {
         rk = null;
         hasTop = false;
         if (currBlock != null) {
           try {
             try {
               currBlock.close();
               if (exceptionThrown) {
                 reader.close();
               }
             } catch (IOException e) {
               log.warn("Failed to close block reader", e);
             }
           } finally {
             currBlock = null;
           }
         }
       }
   `
   which calls reader.close
   `
   public synchronized void close() throws IOException {
         if (closed) {
           return;
         }
   
         closed = true;
   
         BCFile.Reader reader = bcfr.getAndSet(null);
         if (reader != null) {
           reader.close();
         }
   
         if (fin != null) {
           // synchronize on the FSDataInputStream to ensure thread safety with 
the
           // BoundedRangeFileInputStream
           synchronized (fin) {
             fin.close();
           }
         }
       }
   `
   
   
   
   java.io.InterruptedIOException
   `
   2025-01-09T17:22:23,974 [DATAWAVE Ivarator (4fb08bee)-6658 -> 
DatawaveFieldIndexRegexIteratorJexl in 
c259f7d7-e4d8-4aa4-82c0-a646fc7e3c6f_168b80f_[20170716_0 
fi%00;REVISION_TEXT_TOKEN:Q%85;?%ea; [] 9223372036854775807 false,20170716_0 
fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 9223372036854775807 false)] 
[impl.BlockReaderFactory] WARN : I/O error constructing remote block reader.
   java.io.InterruptedIOException: Interrupted while waiting for IO on channel 
java.nio.channels.SocketChannel[connection-pending remote=/10.117.191.71:9866]. 
Total timeout mills is 60000, 60000 millis timeout left.
     at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:350)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:202) 
~[hadoop-client-api-3.3.5.jar:?]
     at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:600) 
~[hadoop-client-api-3.3.5.jar:?]
     at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3033) 
~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:829)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:754)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:381)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:715) 
~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:645) 
~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:845) 
~[hadoop-client-api-3.3.5.jar:?]
     at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:918) 
~[hadoop-client-api-3.3.5.jar:?]
     at java.base/java.io.DataInputStream.read(DataInputStream.java:149) ~[?:?]
     at 
org.apache.accumulo.core.file.streams.RateLimitedInputStream.read(RateLimitedInputStream.java:52)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at java.base/java.io.DataInputStream.read(DataInputStream.java:149) ~[?:?]
     at 
org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream.read(BoundedRangeFileInputStream.java:98)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:179)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:163)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290) ~[?:?]
     at 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) ~[?:?]
     at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200) 
~[?:?]
     at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170) 
~[?:?]
     at 
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader$BaseBlockLoader.load(CachableBlockFile.java:362)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache.getBlock(SynchronousLoadingBlockCache.java:126)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader.getDataBlock(CachableBlockFile.java:460)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.getDataBlock(RFile.java:893)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader._next(RFile.java:855)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.next(RFile.java:833)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:86)
 ~[accumulo-server-base-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.StatsIterator.next(StatsIterator.java:51)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator.next(DeletingIterator.java:65)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.ServerSkippingIterator.next(ServerSkippingIterator.java:45)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.SynchronizedServerFilter.next(SynchronizedServerFilter.java:51)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
datawave.iterators.PropogatingIterator.next(PropogatingIterator.java:251) ~[?:?]
     at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:100)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:60)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at datawave.core.iterators.IvaratorRunnable.run(IvaratorRunnable.java:225) 
~[?:?]
     at 
datawave.core.iterators.IteratorThreadPoolManager.lambda$execute$4(IteratorThreadPoolManager.java:194)
 ~[?:?]
     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 ~[?:?]
     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
~[?:?]
     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 ~[?:?]
     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 ~[?:?]
     at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
   `
   
   Subsequent call producing an NPE
   
   `
   2025-01-09T17:22:19,646 [DATAWAVE Ivarator (4fb08bee)-6667 -> 
DatawaveFieldIndexRegexIteratorJexl in 
c259f7d7-e4d8-4aa4-82c0-a646fc7e3c6f_168b80f_[20170716_0 
fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 9223372036854775807 
false,20170716_0 fi%00;REVISION_TEXT_TOKEN:%f4;%8f;%bf;%bf; [] 
9223372036854775807 false]] [iterators.IvaratorRunnable] ERROR: Failed to 
complete fillSet([20170716_0 fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 
9223372036854775807 false,20170716_0 fi%00;REVISION_TEXT_TOKEN:%f4;%8f;%bf;%bf; 
[] 9223372036854775807 false])
   java.lang.NullPointerException: null
     at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1575) 
~[hadoop-client-api-3.3.5.jar:?]
     at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:73) 
~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.accumulo.core.file.streams.RateLimitedInputStream.seek(RateLimitedInputStream.java:61)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.streams.SeekableDataInputStream.seek(SeekableDataInputStream.java:38)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream.read(BoundedRangeFileInputStream.java:97)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:179)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:163)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
 ~[hadoop-client-api-3.3.5.jar:?]
     at 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290) ~[?:?]
     at 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) ~[?:?]
     at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200) 
~[?:?]
     at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170) 
~[?:?]
     at 
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader$BaseBlockLoader.load(CachableBlockFile.java:362)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache.getBlock(SynchronousLoadingBlockCache.java:126)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader.getDataBlock(CachableBlockFile.java:460)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.getDataBlock(RFile.java:893)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader._next(RFile.java:855)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.next(RFile.java:833)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:86)
 ~[accumulo-server-base-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.StatsIterator.next(StatsIterator.java:51)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator.next(DeletingIterator.java:65)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.ServerSkippingIterator.next(ServerSkippingIterator.java:45)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.SynchronizedServerFilter.next(SynchronizedServerFilter.java:51)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
datawave.iterators.PropogatingIterator.next(PropogatingIterator.java:251) ~[?:?]
     at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:100)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at 
org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:60)
 ~[accumulo-core-2.1.3.jar:2.1.3]
     at datawave.core.iterators.IvaratorRunnable.run(IvaratorRunnable.java:225) 
~[?:?]
     at 
datawave.core.iterators.IteratorThreadPoolManager.lambda$execute$4(IteratorThreadPoolManager.java:194)
 ~[?:?]
     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 ~[?:?]
     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
~[?:?]
     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 ~[?:?]
     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 ~[?:?]
     at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
   
   `
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to