apoorvmittal10 commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1756503735


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset 
soFar, ShareUpdateValue n
             .setStartOffset(newStartOffset)
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
-                .map(PersisterOffsetsStateBatch::from)
-                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
-            .build();
+                .map(ShareCoordinatorShard::toPersisterStateBatch)
+                .collect(Collectors.toList()), newStartOffset))
+            .build();   
     }
 
     /**
-     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * Util method which takes in 2 collections containing {@link 
PersisterStateBatch}
      * and the startOffset.
-     * It removes all batches from the 1st collection which have the same 
first and last offset
-     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
-     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
-     * which have same first and last offset in A).
-     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
-     * @param currentBatch - collection containing current soft state of 
batches
-     * @param newBatch - collection containing batches in incoming request
+     * This method checks any overlap between current state batches and new 
state batches.
+     * Based on various conditions it creates new non-overlapping records 
preferring new batches.
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1 and
+     * merges any contiguous intervals with same state.
+     * @param batchesSoFar - collection containing current soft state of 
batches
+     * @param newBatches - collection containing batches in incoming request
      * @param startOffset - startOffset to consider when removing old batches.
      * @return List containing combined batches
      */
-    private static List<PersisterOffsetsStateBatch> combineStateBatches(
-        Collection<PersisterOffsetsStateBatch> currentBatch,
-        Collection<PersisterOffsetsStateBatch> newBatch,
+    // visibility for testing
+    static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        currentBatch.removeAll(newBatch);
-        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
-        batchesToAdd.addAll(newBatch);
-        // Any batches where the last offset is < the current start offset
-        // are now expired. We should remove them from the persister.
+        // will take care of overlapping batches
+        Queue<PersisterStateBatch> batchQueue = new LinkedList<>(

Review Comment:
   For my understanding: Why it simply cannot be a List of type LinkedList, 
what queue operations we need here?



##########
server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateBatch.java:
##########
@@ -95,4 +95,18 @@ public String toString() {
             "deliveryCount=" + deliveryCount +
             ")";
     }
+
+    @Override
+    public int compareTo(Object o) {
+        PersisterStateBatch that = (PersisterStateBatch) o;
+        int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset());
+        if (deltaFirst == 0) {
+            int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
+            if (deltaLast == 0) {
+                return this.deliveryCount() - that.deliveryCount();
+            }
+            return deltaLast;
+        }
+        return deltaFirst;
+    }

Review Comment:
   What about `deliveryState` is that not important to compare for this class? 
If not then can we please write a comment regarding why it's skipped.



##########
checkstyle/suppressions.xml:
##########
@@ -340,6 +340,14 @@
     <suppress checks="NPathComplexity"
               files="CoordinatorRuntime.java"/>
 
+    <!-- share coordinator -->
+    <suppress checks="MethodLength"
+              files="ShareCoordinatorShardTest.java"/>
+    <suppress checks="NPathComplexity"
+              files="ShareCoordinatorShard.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="ShareCoordinatorShard.java"/>

Review Comment:
   nit: While it's fine to avoid suppressions but avoid if we have better way 
to handle the situation in code. For `MethodLength` you might want to have a 
method to generate `new TestAttributes` or something similar to avoid method 
length issue.



##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -791,6 +793,304 @@ public void testNonSequentialBatchUpdates() {
         verify(shard.getMetricsShard(), 
times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testStateBatchCombine() {

Review Comment:
   Might be better if you parametrize the test with thest input you created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to