AndrewJSchofield commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1756707238
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(batchesSoFar),
+ startOffset
+ )
+ ));
+
+ // will take care of overlapping batches
+ List<PersisterStateBatch> modifiedNewBatches = new ArrayList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(newBatches),
+ startOffset
+ )
+ ));
+
+ for (PersisterStateBatch batch : modifiedNewBatches) {
+ for (int i = 0; i < batchQueue.size(); i++) {
Review Comment:
We will re-evaluate `batchQueue.size()` on each loop iteration, and it could
in theory mutate quite significantly. I don't believe this is a safe loop
condition.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(batchesSoFar),
+ startOffset
+ )
+ ));
+
+ // will take care of overlapping batches
+ List<PersisterStateBatch> modifiedNewBatches = new ArrayList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(newBatches),
+ startOffset
+ )
+ ));
+
+ for (PersisterStateBatch batch : modifiedNewBatches) {
+ for (int i = 0; i < batchQueue.size(); i++) {
+ PersisterStateBatch cur = batchQueue.poll();
+ // cur batch under inspection has no overlap with the new one
+ // we will need to add to our result
+ if (batch.lastOffset() < cur.firstOffset() ||
batch.firstOffset() > cur.lastOffset()) {
+ batchQueue.add(cur);
+ continue;
+ }
+
+ // Covers cases where we need to create a new interval
+ // from the current one such that they do not
+ // overlap with the new one.
+ // Following cases will not produce any new records so need
not be handled.
+ // cur: ____ ______ ______ _____
+ // new: ________ ______ _________ _________
+
+
+ // covers
+ // cur: ______ _____ _____ ______
+ // batch: ___ _____ ___ ___
+ if (batch.firstOffset() >= cur.firstOffset() &&
batch.lastOffset() <= cur.lastOffset()) {
+ // extra batch needs to be created
+ if (batch.firstOffset() > cur.firstOffset()) {
+ batchQueue.add(
+ new PersisterStateBatch(
+ cur.firstOffset(),
+ batch.firstOffset() - 1,
+ cur.deliveryState(),
+ cur.deliveryCount()
+ )
+ );
+ }
+
+ // extra batch needs to be created
+ if (batch.lastOffset() < cur.lastOffset()) {
+ batchQueue.add(
+ new PersisterStateBatch(
+ batch.lastOffset() + 1,
+ cur.lastOffset(),
+ cur.deliveryState(),
+ cur.deliveryCount()
+ )
+ );
+ }
+ } else if (batch.firstOffset() < cur.firstOffset() &&
batch.lastOffset() < cur.lastOffset()) {
+ // covers
+ // ______
+ //____
+ batchQueue.add(new PersisterStateBatch(
+ batch.lastOffset() + 1,
+ cur.lastOffset(),
+ cur.deliveryState(),
+ cur.deliveryCount()
+ ));
+ } else if (batch.firstOffset() > cur.firstOffset() &&
batch.lastOffset() > cur.lastOffset()) {
+ // covers
+ // ______
+ // _______
+ batchQueue.add(new PersisterStateBatch(
+ cur.firstOffset(),
+ batch.firstOffset() - 1,
+ cur.deliveryState(),
+ cur.deliveryCount()
+ ));
+ }
+ }
+ batchQueue.add(batch);
+ }
+
+ // sort, prune and merge intervals
+ return mergeBatches(
+ pruneBatches(
+ getSortedList(batchQueue),
+ startOffset
+ )
+ );
+ }
+
+ private static List<PersisterStateBatch>
getSortedList(Collection<PersisterStateBatch> collection) {
+ List<PersisterStateBatch> finalList = new ArrayList<>(collection);
+ finalList.sort(PersisterStateBatch::compareTo);
+ return finalList;
+ }
+
+ // assumes sorted input
+ private static List<PersisterStateBatch>
mergeBatches(List<PersisterStateBatch> batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ Stack<PersisterStateBatch> stack = new Stack<>();
+ stack.add(batches.get(0));
Review Comment:
Can't this be `batches.remove(0)`? Otherwise, the first element will be
added to the stack and also the initial candidate in the loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]