Suriya Vijayaraghavan created KAFKA-13180:
---------------------------------------------
Summary: Data Distribution among partitions not working as Expected
Key: KAFKA-13180
URL: https://issues.apache.org/jira/browse/KAFKA-13180
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.8.0
Reporter: Suriya Vijayaraghavan
Hi team, we are facing a weird issue. not sure if anyone else faced this same.
But we are able to identify the flow.
Issue
Using RoundiRobin partitioner with even number of partitions n, resulting in
always produce to only n/2 number of partitions
Is Reproducible: yes
Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying
to produce to a topic with RoundRobin partitioner.
The RoundRobin partitioner is working based on the index of an ArrayList of
partition info. For our case lest assume the order of the partitions is
populated as below in the array list.
{1,2,3,4,5,0}
Expected flow: Even distribution to 6 partitions
How it worked: Data was produced only to partition 2,4,0.
Why:
On debugging further with the producer flow, we noticed below highlighted
method in doSend method of KafkaProducer.
{quote}int partition = *partition*(record, serializedKey, serializedValue,
cluster);
tp = new TopicPartition(record.topic(), partition);
.....
RecordAccumulator.RecordAppendResult result = accumulator.append(tp,
timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, *true*, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = *partition*(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
.....
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, *false*, nowMs);
{quote}
here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque
that is derived in the RecordAccumulator.append method will always be empty on
the first message too. Which will try to create a new batch.
And for the new batch, a new TopicPartition Object is being created, which will
have partition 2. And in this flow, the abortOnNewBatch is passed as false, so
the record will get added in the DeQueue for this topicpartition.
How ever this will get distributed properly if the total number of partitions
are odd, as the first record is getting addition will only succed when the
abordOnNewbatch is passed as false (lets say it as second invoke).
the order of the invoke will be as follows for an even number of odd number of
partitions and even.
ODD: \{1,2,3,4,0}
Iteration set untill all partitions gets populated:
1 - 2
3 - 4
0 - 1
2 - 3
4 - 0
Dequeue populated partitions = \{2,4,1,3,0}
EVEN: \{1,2,3,4,5,0}
Iteration set untill all partitions gets populated:
1 - 2
3 - 4
5 - 0
1 - 2
3 - 4
5 - 0
1 - 2
3 - 4
5 - 0.........
Dequeue populated partitions = \{2,4,0}
will go on continuosly as all partitions will never be initated.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)