ddcprg opened a new issue, #9091: URL: https://github.com/apache/pinot/issues/9091
This issue seems to have been introduced with the works on #7927 to deal with tombstones We have seen cases where a record is dropped during ingestion with no apparent reason. We have a set of tables with different flush settings for row count and time thresholds. This is what we think could be happening: 1) `consumeLoop()` while loop https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L394 2) `processStreamEvents()` function https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L420 3) `processStreamEvents()` for loop https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L475 4) advance condition https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L422 5) discard unfiltered messages https://github.com/apache/pinot/blob/release-0.10.0/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java#L432 The table needs to have a segment time threshold that could be reached before the max row count threshold is reached, e.g. low traffic topic with big row count threshold Example: `LLRealtimeSegmentDataManager` enters loop 1) because end criteria is not met and processes the first batch, calls `processStreamEvents()` at line 2). Inside this function the end criteria check at 3) is still not met and records are indexed, this means `_currentOffset` has been updated. At this point condition at 4) is met and `lastUpdatedOffset` is updated with `_currentOffset` The data manager enters loop 1) again and the end criteria is not met, reads the next batch and calls `processStreamEvents()` at line 2) again. This time the end criteria check at 3) is met because time has elapsed beyond the segment time threshold and no records are processed inside this function, this means `_currentOffset` hasn't been updated. At this point condition at 4) is not met because `lastUpdatedOffset` is equal to `_currentOffset`. Instead condition at 5) is met and `_currentOffset` is updated with the last offset in the batch, skipping all the records in the unprocessed batch. Graphic example if this makes sense ``` offset current offset lastupdated offset batch # (start,last) 0 0 0 1 (0,1) 1 1 0 1 (0,1) -> _currentOffset.compareTo(lastUpdatedOffset) != 0 2 2 2 2 (2,2) === end criteria reached inside processStreamEvents() -> messageBatch.getUnfilteredMessageCount() > 0 => lastUpdatedOffset = _currentOffset = nextOffset = 2+1 3 4 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org