[
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853815#comment-17853815
]
Matthias J. Sax commented on KAFKA-16925:
-----------------------------------------
Thanks for filing this ticket – it's a know problem, not limited to
stream-table join.
I don't think that initializing with `context.currentStreamTime()` is the right
fix though, and would in general prefer to fix this issue across the board, to
avoid building operator specific island solutions.
> stream-table join does not immediately forward expired records on restart
> -------------------------------------------------------------------------
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Ayoub Omari
> Assignee: Ayoub Omari
> Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
> introduced grace period for KStreamKTableJoin. This allows to join a stream
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed.
> When the grace period elapses, the record is joined with its most recent
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and
> will dequeue when the record timestamp is less than or equal to stream time
> minus the grace period. Late records, out of the grace period, will be
> executed right as they come in. (KIP-923){code}
>
> However, this is not the case today on rebalance or restart. The reason is
> that observedStreamTime is taken from the underlying state store which looses
> this information on rebalance/restart:
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>
> If the task restarts and receives an expired record, the buffer considers
> that this record has the maximum stream time observed so far, and puts it in
> the buffer instead of immediately joining it.
>
> {*}Example{*}:
> * Grace period = 60s
> * KTable contains (key, rightValue)
>
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined //
> streamTime = T{code}
>
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime
> = T - 60s{code}
>
> The processor should use currentStreamTime from Context instead. Which is
> recovered on restart.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)