ddcprg opened a new issue, #9091:
URL: https://github.com/apache/pinot/issues/9091

   This issue seems to have been introduced with the works on #7927 to deal 
with tombstones
   
   We have seen cases where a record is dropped during ingestion with no 
apparent reason. We have a set of tables with different flush settings for row 
count and time thresholds.
   
   This is what we think could be happening:
   
   1) `consumeLoop()` while loop 
https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L394
   
   2) `processStreamEvents()` function 
https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L420
   
   3) `processStreamEvents()` for loop 
https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L475
   
   4) advance condition 
https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L422
   
   5) discard unfiltered messages 
https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L432
   
   The table needs to have a segment time threshold that could be reached 
before the max row count threshold is reached, e.g. low traffic topic with big 
row count threshold
   
   Example:
   `LLRealtimeSegmentDataManager` enters loop 1) because end criteria is not 
met and processes the first batch, calls `processStreamEvents()` at line 2). 
Inside this function the end criteria check at 3) is still not met and records 
are indexed, this means `_currentOffset` has been updated.
   
   At this point condition at 4) is met and `lastUpdatedOffset` is updated with 
`_currentOffset`
   
   The data manager enters loop 1) again and the end criteria is not met, reads 
the next batch and calls `processStreamEvents()` at line 2) again. This time 
the end criteria check at 3) is met because time has elapsed beyond the segment 
time threshold and no records are processed inside this function, this means 
`_currentOffset` hasn't been updated.
   
   At this point condition at 4) is not met because `lastUpdatedOffset` is 
equal to `_currentOffset`. Instead condition at 5) is met and `_currentOffset` 
is updated with the last offset in the batch, skipping all the records in the 
unprocessed batch.
   
   Graphic example if this makes sense
   ```
   offset   current offset   lastupdated offset    batch # (start,last)
   0        0                0                      1 (0,1)
   1        1                0                      1 (0,1)     -> 
_currentOffset.compareTo(lastUpdatedOffset) != 0
   2        2                2                      2 (2,2)
   === end criteria reached inside processStreamEvents()        -> 
messageBatch.getUnfilteredMessageCount() > 0 => lastUpdatedOffset = 
_currentOffset = nextOffset = 2+1
   3
   4
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to