[
https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
huangyiming updated KAFKA-10903:
--------------------------------
Description:
if we need sort the producerBatch by sequence,now we use the new batch compare
with the first batch in deque, and if the first batch in deque is less than the
new batch,we will loop the deque and let the new batch insert the right
position.
like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() &&
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
// Either we have reached a point where there are batches without a
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming
batch.
deque.addFirst(batch);
// Now we have to re insert the previously queued batches in the right
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}
// At this point, the incoming batch has been queued in the correct place
according to its sequence.
}
{code}
and most of the time,the producerBatch the producerBatch in deque have the
sequence, and i think if we can compare the last producerBatch in the
deque,if the new batch more than the last producerBatch,just add the new Batch
to the last in the deque,
like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() &&
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
// Either we have reached a point where there are batches without a
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming
batch.
deque.addFirst(batch);
// Now we have to re insert the previously queued batches in the right
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}
// At this point, the incoming batch has been queued in the correct place
according to its sequence.
}
{code}
and if some producerBatch do not have sequence,we just judgment the last
sequence is also right,no problem
was:
if we need sort the producerBatch by sequence,now we use the new batch compare
with the first batch in deque, and if the first batch in deque is less than the
new batch,we will loop the deque and let the new batch insert the right
position.
like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() &&
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
// Either we have reached a point where there are batches without a
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming
batch.
deque.addFirst(batch);
// Now we have to re insert the previously queued batches in the right
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}
// At this point, the incoming batch has been queued in the correct place
according to its sequence.
}
{code}
and i think if we can compare the last producerBatch in the deque,if the new
batch more than the last producerBatch,just add the new Batch to the last in
the deque,
like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() &&
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition,
orderedBatches.size());
// Either we have reached a point where there are batches without a
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming
batch.
deque.addFirst(batch);
// Now we have to re insert the previously queued batches in the right
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}
// At this point, the incoming batch has been queued in the correct place
according to its sequence.
}
{code}
> Optimize producerBatch order performance
> ----------------------------------------
>
> Key: KAFKA-10903
> URL: https://issues.apache.org/jira/browse/KAFKA-10903
> Project: Kafka
> Issue Type: Improvement
> Reporter: huangyiming
> Priority: Minor
>
> if we need sort the producerBatch by sequence,now we use the new batch
> compare with the first batch in deque, and if the first batch in deque is
> less than the new batch,we will loop the deque and let the new batch insert
> the right position.
> like this :
> {code:java}
> // code placeholder
> if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() &&
> firstBatchInQueue.baseSequence() < batch.baseSequence()) {
> List<ProducerBatch> orderedBatches = new ArrayList<>();
> while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
> deque.peekFirst().baseSequence() < batch.baseSequence())
> orderedBatches.add(deque.pollFirst());
> log.debug("Reordered incoming batch with sequence {} for partition {}. It
> was placed in the queue at " +
> "position {}", batch.baseSequence(), batch.topicPartition,
> orderedBatches.size());
> // Either we have reached a point where there are batches without a
> sequence (ie. never been drained
> // and are hence in order by default), or the batch at the front of the
> queue has a sequence greater
> // than the incoming batch. This is the right place to add the incoming
> batch.
> deque.addFirst(batch);
> // Now we have to re insert the previously queued batches in the right
> order.
> for (int i = orderedBatches.size() - 1; i >= 0; --i) {
> deque.addFirst(orderedBatches.get(i));
> }
> // At this point, the incoming batch has been queued in the correct place
> according to its sequence.
> }
> {code}
> and most of the time,the producerBatch the producerBatch in deque have the
> sequence, and i think if we can compare the last producerBatch in the
> deque,if the new batch more than the last producerBatch,just add the new
> Batch to the last in the deque,
> like this:
> {code:java}
> // code placeholder
> ProducerBatch lastBatchInQueue = deque.peekLast();
> if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() &&
> lastBatchInQueue.baseSequence() <= batch.baseSequence()){
> deque.addLast(batch);
> } else {
> List<ProducerBatch> orderedBatches = new ArrayList<>();
> while (deque.peekFirst() != null && deque.peekFirst().hasSequence() &&
> deque.peekFirst().baseSequence() < batch.baseSequence())
> orderedBatches.add(deque.pollFirst());
> log.debug("Reordered incoming batch with sequence {} for partition {}. It
> was placed in the queue at " +
> "position {}", batch.baseSequence(), batch.topicPartition,
> orderedBatches.size());
> // Either we have reached a point where there are batches without a
> sequence (ie. never been drained
> // and are hence in order by default), or the batch at the front of the
> queue has a sequence greater
> // than the incoming batch. This is the right place to add the incoming
> batch.
> deque.addFirst(batch);
> // Now we have to re insert the previously queued batches in the right
> order.
> for (int i = orderedBatches.size() - 1; i >= 0; --i) {
> deque.addFirst(orderedBatches.get(i));
> }
> // At this point, the incoming batch has been queued in the correct place
> according to its sequence.
> }
> {code}
> and if some producerBatch do not have sequence,we just judgment the last
> sequence is also right,no problem
--
This message was sent by Atlassian Jira
(v8.3.4#803005)