bharath-techie opened a new issue, #15152:
URL: https://github.com/apache/lucene/issues/15152

   ### Description
   
   ### Context 
   
   ByteBlockPool is used to store terms in `FreqProxTermsWriter` across fields 
until flush gets triggered. And since each buffer size is 32 kb and offset 
tracker is integer, if we allocate more than 65536 buffers, it results in 
overflow.
   
   We have past issues such as https://github.com/apache/lucene/issues/9660 - 
which handles the overflow by throwing an exception in this PR - 
https://github.com/apache/lucene/pull/12392.
   
   But the issue continues to occur as during indexing , if a field has large 
amount of tokens, buffer will overflow and the exception will be thrown as 
below: 
   
   ```
    message [shard failure, reason [index id[3458764570588151359] 
origin[LOCAL_TRANSLOG_RECOVERY] seq#[53664468]]], failure 
[NotSerializableExceptionWrapper[arithmetic_exception: integer overflow]], 
markAsStale [true]]
   NotSerializableExceptionWrapper[arithmetic_exception: integer overflow]
       at java.lang.Math.addExact(Math.java:883)
       at 
org.apache.lucene.util.ByteBlockPool.nextBuffer(ByteBlockPool.java:199)
       at 
org.apache.lucene.index.ByteSlicePool.allocKnownSizeSlice(ByteSlicePool.java:118)
       at 
org.apache.lucene.index.ByteSlicePool.allocSlice(ByteSlicePool.java:98)
       at 
org.apache.lucene.index.TermsHashPerField.writeByte(TermsHashPerField.java:226)
       at 
org.apache.lucene.index.TermsHashPerField.writeVInt(TermsHashPerField.java:266)
       at 
org.apache.lucene.index.FreqProxTermsWriterPerField.writeProx(FreqProxTermsWriterPerField.java:86)
       at 
org.apache.lucene.index.FreqProxTermsWriterPerField.addTerm(FreqProxTermsWriterPerField.java:197)
       at 
org.apache.lucene.index.TermsHashPerField.positionStreamSlice(TermsHashPerField.java:214)
       at 
org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:202)
       at 
org.apache.lucene.index.IndexingChain$PerField.invertTokenStream(IndexingChain.java:1287)
       at 
org.apache.lucene.index.IndexingChain$PerField.invert(IndexingChain.java:1183)
       at 
org.apache.lucene.index.IndexingChain.processField(IndexingChain.java:731)
       at 
org.apache.lucene.index.IndexingChain.processDocument(IndexingChain.java:609)
       at 
org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:263)
       at 
org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:425)
       at 
org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1558)
       at 
org.apache.lucene.index.IndexWriter.addDocuments(IndexWriter.java:1516)
       at 
org.opensearch.index.engine.InternalEngine.addStaleDocs(InternalEngine.java:1291)
       at 
org.opensearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1210)
       at 
org.opensearch.index.engine.InternalEngine.index(InternalEngine.java:1011)
       at org.opensearch.index.shard.IndexShard.index(IndexShard.java:1226)
       at 
org.opensearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:1171)
       at 
org.opensearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:2418)
       at 
org.opensearch.index.shard.IndexShard.runTranslogRecovery(IndexShard.java:2474)
       at 
org.opensearch.index.shard.IndexShard.lambda$recoverLocallyUpToGlobalCheckpoint$14(IndexShard.java:2291)
       at 
org.opensearch.index.engine.InternalEngine.recoverFromTranslogInternal(InternalEngine.java:571)
       at 
org.opensearch.index.engine.InternalEngine.recoverFromTranslog(InternalEngine.java:546)
       at 
org.opensearch.index.engine.InternalEngine.recoverFromTranslog(InternalEngine.java:146)
       at 
org.opensearch.index.shard.IndexShard.recoverLocallyUpToGlobalCheckpoint(IndexShard.java:2301)
       at 
org.opensearch.index.shard.IndexShard.recoverLocallyAndFetchStartSeqNo(IndexShard.java:2331)
       at 
org.opensearch.indices.recovery.PeerRecoveryTargetService.doRecovery(PeerRecoveryTargetService.java:267)
       at 
org.opensearch.indices.recovery.PeerRecoveryTargetService$RecoveryRunner.doRun(PeerRecoveryTargetService.java:618)
       at 
org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:922)
       at 
org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       at java.lang.Thread.run(Thread.java:840)
   ```
   
   ### Proposal
   Can we flush the DWPT if the ByteBlockPool buffer is reaching its limit ?
   
    In `DocumentsWriterFlushControl` , in `doAfterDocument` , similar to how we 
mark the DWPT as `flushPending` when `perThread.ramBytesUsed() > 
hardMaxBytesPerDWPT` , we can pass the buffer size approaching threshold from 
byteblockpool -> indexingchain -> DWPT -> DocumentsWriterFlushControl and we 
can mark the DWPT `flushPending`
   
   POC code : 
https://github.com/apache/lucene/commit/23fb34f7485102144c9ab522d684a5556351cb3a
   
   ```
     DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread 
perThread) {
     ...
     ...    
          
             activeBytes += delta;
             assert updatePeaks(delta);
             flushPolicy.onChange(this, perThread);
             if (!perThread.isFlushPending() && perThread.ramBytesUsed() > 
hardMaxBytesPerDWPT) {
               // Safety check to prevent a single DWPT exceeding its RAM 
limit. This
               // is super important since we can not address more than 2048 MB 
per DWPT
               setFlushPending(perThread);
             }
           
             // Buffer count limit check
             if (!perThread.isFlushPending() && 
perThread.isApproachingBufferLimit()) {
               if (infoStream.isEnabled("DWFC")) {
                 infoStream.message("DWFC",
                     "force flush due to buffer count limit approaching in DWPT 
" +
                         perThread.getSegmentInfo().name + ": " +
                         ", limit: " + 65000);
               }
               setFlushPending(perThread);
             }
   
           }
           return checkout(perThread, false);
         } finally {
           boolean stalled = updateStallState();
           assert assertNumDocsSinceStalled(stalled) && assertMemory();
         }
       }
     }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to