tuna b created KAFKA-18713:
------------------------------
Summary: Kafka Streams Left-Join not always emitting the last value
Key: KAFKA-18713
URL: https://issues.apache.org/jira/browse/KAFKA-18713
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.8.0
Reporter: tuna b
There seems to be an issue when performing a left-join, the latest value with
of the join is not emitted.
{code:java}
var builder = new StreamsBuilder();
KTable<String, Department> departments = builder
.table("departments",
Materialized.<String,
Department>as(Stores.persistentKeyValueStore("departments"))
.withKeySerde(Serdes.String())
.withValueSerde(CustomJsonSerde.json(Department.class)));
KTable<String, Person> persons = builder
.table("persons",
Materialized.<String,
Person>as(Stores.persistentKeyValueStore("persons"))
.withKeySerde(Serdes.String())
.withValueSerde(CustomJsonSerde.json(Person.class)));
KTable<String, Person> joined = persons
.leftJoin(departments, Person::getDepartmentId, (person, department) ->
person.toBuilder()
.department(department)
.build(),
TableJoined.as("my-joiner"),
Materialized.<String,
Person>as(Stores.persistentKeyValueStore("joined-results"))
.withKeySerde(Serdes.String())
.withValueSerde(CustomJsonSerde.json(Person.class)));
joined
.toStream()
.to("joined-results", Produced.with(Serdes.String(),
CustomJsonSerde.json(Person.class))); {code}
How to reproduce:
Create two topics persons and departments, each with 10 partitions.
Pre-populate the departments topic with 2 departments.
Observation: * When i initially produce a Person {{p-1}} with a FK
{{{}dep-1{}}}, the join works .
** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
* When i change the FK to {{{}dep-2{}}}, the join updates .
** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
* When i change the FK back to {{{}dep-1{}}}, the join fails .
** output is an EnrichedResult with person {{p-1}} *but no department*
* However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), the
join works again .
** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
Also, even when you are not setting back to a previous FK, there can still be
an issue with the left join. Changing an FK means insert + delete operations,
but sometimes the result of the delete is emitted after the result of the
insert.
How to reproduce:
Create a departments topic and pre-populate it with 5 departments (dep-1 to
dep-5). Create a persons topic and create person p-1 with FK dep-1. Send an
update to the persons topic by changing the FK to dep-2 and repeat this step
until dep-5. Now you will see that the latest emitted value of the person does
not contain a department.
How to reproduce:
--
This message was sent by Atlassian Jira
(v8.20.10#820010)