AndrewJSchofield commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1763696711


##########
server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateBatch.java:
##########
@@ -95,4 +95,22 @@ public String toString() {
             "deliveryCount=" + deliveryCount +

Review Comment:
   firstOffset, lastOffset, deliveryCount and deliveryState is the order, so 
probably ought to have them in that order in the toString too.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort
+            // covers:
+            // case:        1        2          3            4          5      
     6          7 (contiguous)
+            // last:        ______   _______    _______      _______   _______ 
  ________    _______
+            // candidate:   ______   ____       __________     ___        ____ 
      _______        _______
+            if (compareBatchState(candidate, last) == 0) {
+                sortedBatches.remove(last);  // remove older smaller interval
+                sortedBatches.remove(candidate);
+
+                last = new PersisterStateBatch(
+                    last.firstOffset(),
+                    // cover cases
+                    // last:      ______   ________       _________
+                    // candidate:   ___       __________           _____
+                    Math.max(candidate.lastOffset(), last.lastOffset()),
+                    last.deliveryState(),
+                    last.deliveryCount()
+                );
+
+                sortedBatches.add(last);
+            } else if (candidate.firstOffset() <= last.lastOffset()) { // non 
contiguous overlap
+                // overlap and different state
+                // covers:
+                // case:        1        2          3            4          5  
         6
+                // last:        ______   _______    _______      _______    
_______     ________
+                // candidate:   ______   ____       _________      ____        
____          _______
+                // max batches: 1           2       2                3         
 2            2
+                // min batches: 1           1       1                1         
 1            2
+
+                sortedBatches.remove(last);
+                if (candidate.firstOffset() == last.firstOffset()) {
+                    // No special handling for case 1
+                    // here candidate can never have lower priority
+                    // since the treeset order takes that into account
+                    // and candidate is already in the treeset.
+
+                    if (candidate.lastOffset() < last.lastOffset()) { // case 2
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);

Review Comment:
   Doesn't `candidate` need to be removed in this case? `last` "won".



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort
+            // covers:
+            // case:        1        2          3            4          5      
     6          7 (contiguous)
+            // last:        ______   _______    _______      _______   _______ 
  ________    _______
+            // candidate:   ______   ____       __________     ___        ____ 
      _______        _______
+            if (compareBatchState(candidate, last) == 0) {
+                sortedBatches.remove(last);  // remove older smaller interval
+                sortedBatches.remove(candidate);
+
+                last = new PersisterStateBatch(
+                    last.firstOffset(),
+                    // cover cases
+                    // last:      ______   ________       _________
+                    // candidate:   ___       __________           _____
+                    Math.max(candidate.lastOffset(), last.lastOffset()),
+                    last.deliveryState(),
+                    last.deliveryCount()
+                );
+
+                sortedBatches.add(last);
+            } else if (candidate.firstOffset() <= last.lastOffset()) { // non 
contiguous overlap
+                // overlap and different state
+                // covers:
+                // case:        1        2          3            4          5  
         6
+                // last:        ______   _______    _______      _______    
_______     ________
+                // candidate:   ______   ____       _________      ____        
____          _______
+                // max batches: 1           2       2                3         
 2            2
+                // min batches: 1           1       1                1         
 1            2
+
+                sortedBatches.remove(last);
+                if (candidate.firstOffset() == last.firstOffset()) {
+                    // No special handling for case 1

Review Comment:
   OK, but I suppose case 1 ends up going down the branch at line 708 then. 
Personally, I would prefer an if test to eliminate case 1 up front.



##########
server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateBatch.java:
##########
@@ -95,4 +95,22 @@ public String toString() {
             "deliveryCount=" + deliveryCount +
             ")";
     }
+
+    @Override
+    public int compareTo(Object o) {
+        PersisterStateBatch that = (PersisterStateBatch) o;
+        int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset());
+        if (deltaFirst == 0) {
+            int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
+            if (deltaLast == 0) {
+                int countDelta = this.deliveryCount() - that.deliveryCount();

Review Comment:
   Surely `deltaCount` given the pattern you established for all of the other 
comparisons.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort
+            // covers:
+            // case:        1        2          3            4          5      
     6          7 (contiguous)
+            // last:        ______   _______    _______      _______   _______ 
  ________    _______
+            // candidate:   ______   ____       __________     ___        ____ 
      _______        _______
+            if (compareBatchState(candidate, last) == 0) {
+                sortedBatches.remove(last);  // remove older smaller interval
+                sortedBatches.remove(candidate);
+
+                last = new PersisterStateBatch(
+                    last.firstOffset(),
+                    // cover cases
+                    // last:      ______   ________       _________
+                    // candidate:   ___       __________           _____
+                    Math.max(candidate.lastOffset(), last.lastOffset()),
+                    last.deliveryState(),
+                    last.deliveryCount()
+                );
+
+                sortedBatches.add(last);
+            } else if (candidate.firstOffset() <= last.lastOffset()) { // non 
contiguous overlap
+                // overlap and different state
+                // covers:
+                // case:        1        2          3            4          5  
         6
+                // last:        ______   _______    _______      _______    
_______     ________
+                // candidate:   ______   ____       _________      ____        
____          _______
+                // max batches: 1           2       2                3         
 2            2
+                // min batches: 1           1       1                1         
 1            2
+
+                sortedBatches.remove(last);
+                if (candidate.firstOffset() == last.firstOffset()) {
+                    // No special handling for case 1
+                    // here candidate can never have lower priority
+                    // since the treeset order takes that into account
+                    // and candidate is already in the treeset.
+
+                    if (candidate.lastOffset() < last.lastOffset()) { // case 2
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);
+                        } else {    // last has lower priority
+                            sortedBatches.add(candidate);
+                            sortedBatches.add(new PersisterStateBatch(
+                                candidate.lastOffset() + 1,
+                                last.firstOffset(),
+                                last.deliveryState(),
+                                last.deliveryCount()
+                            ));
+                        }
+                    } else {    // case 3
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);
+                            sortedBatches.remove(candidate);
+                            sortedBatches.add(new PersisterStateBatch(
+                                last.lastOffset() + 1,
+                                candidate.lastOffset(),
+                                candidate.deliveryState(),
+                                candidate.deliveryCount()
+                            ));
+                        } else {    // last has lower priority
+                            sortedBatches.add(candidate);
+                        }
+                    }
+                } else {    // candidate.firstOffset() > last.lastOffset()

Review Comment:
   Shouldn't this `candidate.firstOffset() > last.firstOffset()` in the comment?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort

Review Comment:
   nit: Missing ')' in your comment ;) 



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort
+            // covers:
+            // case:        1        2          3            4          5      
     6          7 (contiguous)
+            // last:        ______   _______    _______      _______   _______ 
  ________    _______
+            // candidate:   ______   ____       __________     ___        ____ 
      _______        _______
+            if (compareBatchState(candidate, last) == 0) {
+                sortedBatches.remove(last);  // remove older smaller interval
+                sortedBatches.remove(candidate);
+
+                last = new PersisterStateBatch(
+                    last.firstOffset(),
+                    // cover cases
+                    // last:      ______   ________       _________
+                    // candidate:   ___       __________           _____
+                    Math.max(candidate.lastOffset(), last.lastOffset()),
+                    last.deliveryState(),
+                    last.deliveryCount()
+                );
+
+                sortedBatches.add(last);
+            } else if (candidate.firstOffset() <= last.lastOffset()) { // non 
contiguous overlap
+                // overlap and different state
+                // covers:
+                // case:        1        2          3            4          5  
         6
+                // last:        ______   _______    _______      _______    
_______     ________
+                // candidate:   ______   ____       _________      ____        
____          _______
+                // max batches: 1           2       2                3         
 2            2
+                // min batches: 1           1       1                1         
 1            2
+
+                sortedBatches.remove(last);
+                if (candidate.firstOffset() == last.firstOffset()) {
+                    // No special handling for case 1
+                    // here candidate can never have lower priority
+                    // since the treeset order takes that into account
+                    // and candidate is already in the treeset.
+
+                    if (candidate.lastOffset() < last.lastOffset()) { // case 2
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);
+                        } else {    // last has lower priority
+                            sortedBatches.add(candidate);
+                            sortedBatches.add(new PersisterStateBatch(
+                                candidate.lastOffset() + 1,
+                                last.firstOffset(),
+                                last.deliveryState(),
+                                last.deliveryCount()
+                            ));
+                        }
+                    } else {    // case 3
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);
+                            sortedBatches.remove(candidate);
+                            sortedBatches.add(new PersisterStateBatch(
+                                last.lastOffset() + 1,
+                                candidate.lastOffset(),
+                                candidate.deliveryState(),
+                                candidate.deliveryCount()
+                            ));
+                        } else {    // last has lower priority
+                            sortedBatches.add(candidate);

Review Comment:
   This line is redundant. Before line 678, both `last` and `candidate` are in 
the set. You remove `last` at 678, and then `candidate` "wins". It's already in 
the set. It doesn't need adding. I know that TreeSet will handle it. It's still 
a waste of effort and logically unnecessary.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,331 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
             .build();
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        // sort keeping delivery state in mind
+        combinedList.sort(PersisterStateBatch::compareTo);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    private static class BatchOverlapState {
+        private final PersisterStateBatch last;
+        private final PersisterStateBatch candidate;
+        private final List<PersisterStateBatch> nonOverlapping;
+        public static final BatchOverlapState SENTINEL = new 
BatchOverlapState(null, null, Collections.emptyList());
+
+        public BatchOverlapState(
+            PersisterStateBatch last,
+            PersisterStateBatch candidate,
+            List<PersisterStateBatch> nonOverlapping
+        ) {
+            this.last = last;
+            this.candidate = candidate;
+            this.nonOverlapping = nonOverlapping;
+        }
+
+        public PersisterStateBatch last() {
+            return last;
+        }
+
+        public PersisterStateBatch candidate() {
+            return candidate;
+        }
+
+        public List<PersisterStateBatch> nonOverlapping() {
+            return nonOverlapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof BatchOverlapState)) return false;
+            BatchOverlapState that = (BatchOverlapState) o;
+            return Objects.equals(last, that.last) && 
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping, 
that.nonOverlapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(last, candidate, nonOverlapping);
+        }
+    }
+
+    static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> 
batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {
+            PersisterStateBatch last = overlapState.last();
+            PersisterStateBatch candidate = overlapState.candidate();
+            
+            // remove non overlapping prefix from sortedBatches,
+            // will make getting next overlapping pair efficient
+            // as a prefix batch which is non overlapping will only
+            // be checked once.
+            if (overlapState.nonOverlapping() != null) {
+                overlapState.nonOverlapping().forEach(sortedBatches::remove);
+                finalBatches.addAll(overlapState.nonOverlapping());
+            }
+
+            if (candidate == null) {
+                overlapState = BatchOverlapState.SENTINEL;
+                continue;
+            }
+
+            // overlap and same state (last.firstOffset <= 
candidate.firstOffset due to sort
+            // covers:
+            // case:        1        2          3            4          5      
     6          7 (contiguous)
+            // last:        ______   _______    _______      _______   _______ 
  ________    _______
+            // candidate:   ______   ____       __________     ___        ____ 
      _______        _______
+            if (compareBatchState(candidate, last) == 0) {
+                sortedBatches.remove(last);  // remove older smaller interval
+                sortedBatches.remove(candidate);
+
+                last = new PersisterStateBatch(
+                    last.firstOffset(),
+                    // cover cases
+                    // last:      ______   ________       _________
+                    // candidate:   ___       __________           _____
+                    Math.max(candidate.lastOffset(), last.lastOffset()),
+                    last.deliveryState(),
+                    last.deliveryCount()
+                );
+
+                sortedBatches.add(last);
+            } else if (candidate.firstOffset() <= last.lastOffset()) { // non 
contiguous overlap
+                // overlap and different state
+                // covers:
+                // case:        1        2          3            4          5  
         6
+                // last:        ______   _______    _______      _______    
_______     ________
+                // candidate:   ______   ____       _________      ____        
____          _______
+                // max batches: 1           2       2                3         
 2            2
+                // min batches: 1           1       1                1         
 1            2
+
+                sortedBatches.remove(last);
+                if (candidate.firstOffset() == last.firstOffset()) {
+                    // No special handling for case 1
+                    // here candidate can never have lower priority
+                    // since the treeset order takes that into account
+                    // and candidate is already in the treeset.
+
+                    if (candidate.lastOffset() < last.lastOffset()) { // case 2
+                        if (compareBatchState(candidate, last) < 0) {
+                            sortedBatches.add(last);
+                        } else {    // last has lower priority
+                            sortedBatches.add(candidate);
+                            sortedBatches.add(new PersisterStateBatch(
+                                candidate.lastOffset() + 1,
+                                last.firstOffset(),

Review Comment:
   Should be `last.lastOffset()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to