Ayoub Omari created KAFKA-16925:
-----------------------------------
Summary: stream-table join does not immediately forward expired
records on restart
Key: KAFKA-16925
URL: https://issues.apache.org/jira/browse/KAFKA-16925
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Ayoub Omari
Assignee: Ayoub Omari
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
introduced grace period for KStreamKTableJoin. This allows to join a stream to
a KTable backed by a Versioned state store.
Upon receiving a record, it is put in a buffer until grace period is elapsed.
When the grace period elapses, the record is joined with its most recent match
from the versioned state store.
+Late records+ are +not+ put in the buffer and are immediately joined.
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will
dequeue when the record timestamp is less than or equal to stream time minus
the grace period. Late records, out of the grace period, will be executed
right as they come in. (KIP-923){code}
However, this is not the case today on rebalance or restart. The reason is that
observedStreamTime is maintained within a variable which is lost on
rebalance/restart:
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]
If the task restarts and receives an expired record, it considers that it has
the maximum stream time observed so far, and puts it in the buffer instead of
immediately joining it.
{*}Example{*}:
* Grace period = 60s
* KTable contains (key, rightValue)
+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer
streamInput2 (key, value2) <--- time = T - 60s : immediately joined //
streamTime = T{code}
+Scenario with rebalance+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer
// --- rebalance ---
streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime =
T - 60s{code}
The processor should use currentStreamTime from Context instead. Which is
recovered on restart.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)