AndrewJSchofield commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1759079182
##########
server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateBatch.java:
##########
@@ -25,7 +25,7 @@
/**
* This class contains the information for a single batch of state information
for use by the {@link Persister}.
*/
-public class PersisterStateBatch {
+public class PersisterStateBatch implements Comparable {
private final long firstOffset;
private final long lastOffset;
private final byte deliveryState;
Review Comment:
In sorting terms, this is <firstOffset, lastOffset, deliveryCount,
deliveryState>. I suggest putting the member variables in the same order.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
+ sortedBatches.remove(last);
+ if (candidate.firstOffset() == last.firstOffset()) {
+ if (candidate.lastOffset() == last.lastOffset()) { //
case 1
+ if (compareBatchState(candidate, last) < 0) { //
candidate is lower priority
+ sortedBatches.add(last);
+ } else { // last is lower priority
+ sortedBatches.add(candidate);
Review Comment:
And doesn't this duplicate `candidate`?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
Review Comment:
If I understand correctly, at line 676, both of `last` and `candidate` are
members of `sortedBatches`.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
+ sortedBatches.remove(last);
+ if (candidate.firstOffset() == last.firstOffset()) {
+ if (candidate.lastOffset() == last.lastOffset()) { //
case 1
+ if (compareBatchState(candidate, last) < 0) { //
candidate is lower priority
+ sortedBatches.add(last);
+ } else { // last is lower priority
+ sortedBatches.add(candidate);
+ }
+ } else if (candidate.lastOffset() < last.lastOffset()) {
// case 2
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.firstOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else { // case 3
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ }
+ }
+ } else { // candidate.firstOffset() > last.lastOffset()
+ if (candidate.lastOffset() < last.lastOffset()) { //
case 4
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.lastOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else if (candidate.lastOffset() == last.lastOffset()) {
// case 5
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ } else { // case 6
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ }
+ }
+ }
+ overlapState = getOverlappingState(sortedBatches);
+ }
+ finalBatches.addAll(sortedBatches); // some non overlapping batches
might have remained
+ return finalBatches;
+ }
+
+ private static BatchOverlapState
getOverlappingState(TreeSet<PersisterStateBatch> batchSet) {
+ Iterator<PersisterStateBatch> iter = batchSet.iterator();
+ PersisterStateBatch last = iter.next();
+ List<PersisterStateBatch> nonOverlapping = new
ArrayList<>(batchSet.size());
+ while (iter.hasNext()) {
+ PersisterStateBatch candidate = iter.next();
+ if (candidate.firstOffset() <= last.lastOffset() || // overlap
+ last.lastOffset() + 1 == candidate.firstOffset() &&
compareBatchState(last, candidate) == 0) { // contiguous
+ return new BatchOverlapState(
+ last,
+ candidate,
+ nonOverlapping
+ );
+ }
+ nonOverlapping.add(last);
+ last = candidate;
+ }
+ return new BatchOverlapState(null, null, nonOverlapping);
+ }
+
+ private static int compareBatchState(PersisterStateBatch b1,
PersisterStateBatch b2) {
Review Comment:
Could do with a comment. This is approximately following the contract for
the methods like `Short.compare(short x, short y)`. If x > y then +ve, if x < y
then -ve, if x == y then 0.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
+ sortedBatches.remove(last);
+ if (candidate.firstOffset() == last.firstOffset()) {
+ if (candidate.lastOffset() == last.lastOffset()) { //
case 1
+ if (compareBatchState(candidate, last) < 0) { //
candidate is lower priority
+ sortedBatches.add(last);
+ } else { // last is lower priority
+ sortedBatches.add(candidate);
+ }
+ } else if (candidate.lastOffset() < last.lastOffset()) {
// case 2
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.firstOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else { // case 3
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ }
+ }
+ } else { // candidate.firstOffset() > last.lastOffset()
+ if (candidate.lastOffset() < last.lastOffset()) { //
case 4
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.lastOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else if (candidate.lastOffset() == last.lastOffset()) {
// case 5
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ } else { // case 6
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ }
+ }
+ }
+ overlapState = getOverlappingState(sortedBatches);
+ }
+ finalBatches.addAll(sortedBatches); // some non overlapping batches
might have remained
+ return finalBatches;
+ }
+
+ private static BatchOverlapState
getOverlappingState(TreeSet<PersisterStateBatch> batchSet) {
Review Comment:
This definitely needs a comment. For example, if the batch set is empty, it
will throw an exception, so don't call it with an empty set.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
Review Comment:
I don't believe that overlapState will ever be SENTINEL.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
Review Comment:
tiny nit: trailing spaces
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
+ sortedBatches.remove(last);
+ if (candidate.firstOffset() == last.firstOffset()) {
+ if (candidate.lastOffset() == last.lastOffset()) { //
case 1
+ if (compareBatchState(candidate, last) < 0) { //
candidate is lower priority
+ sortedBatches.add(last);
+ } else { // last is lower priority
+ sortedBatches.add(candidate);
+ }
+ } else if (candidate.lastOffset() < last.lastOffset()) {
// case 2
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.firstOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else { // case 3
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else { // last has lower priority
+ sortedBatches.add(candidate);
+ }
+ }
+ } else { // candidate.firstOffset() > last.lastOffset()
+ if (candidate.lastOffset() < last.lastOffset()) { //
case 4
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ candidate.lastOffset() + 1,
+ last.lastOffset(),
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+ }
+ } else if (candidate.lastOffset() == last.lastOffset()) {
// case 5
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ } else { // case 6
+ if (compareBatchState(candidate, last) < 0) {
+ sortedBatches.add(last);
+ sortedBatches.remove(candidate);
+
+ sortedBatches.add(new PersisterStateBatch(
+ last.lastOffset() + 1,
+ candidate.lastOffset(),
+ candidate.deliveryState(),
+ candidate.deliveryCount()
+ ));
+ } else {
+ sortedBatches.add(new PersisterStateBatch(
+ last.firstOffset(),
+ candidate.firstOffset() - 1,
+ last.deliveryState(),
+ last.deliveryCount()
+ ));
+
+ sortedBatches.add(candidate);
+ }
+ }
+ }
+ }
+ overlapState = getOverlappingState(sortedBatches);
+ }
+ finalBatches.addAll(sortedBatches); // some non overlapping batches
might have remained
+ return finalBatches;
+ }
+
+ private static BatchOverlapState
getOverlappingState(TreeSet<PersisterStateBatch> batchSet) {
Review Comment:
I would tend to call this argument `sortedBatches` for consistency with the
caller.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,308 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ List<PersisterStateBatch> combinedList = new
ArrayList<>(batchesSoFar.size() + newBatches.size());
+ combinedList.addAll(batchesSoFar);
+ combinedList.addAll(newBatches);
+
+ // sort keeping delivery state in mind
+ combinedList.sort(PersisterStateBatch::compareTo);
+
+ return mergeBatches(
+ pruneBatches(
+ combinedList,
+ startOffset
+ )
+ );
+ }
+
+ private static class BatchOverlapState {
+ private final PersisterStateBatch last;
+ private final PersisterStateBatch candidate;
+ private final List<PersisterStateBatch> nonOverlapping;
+ public static final BatchOverlapState SENTINEL = new
BatchOverlapState(null, null, Collections.emptyList());
+
+ public BatchOverlapState(
+ PersisterStateBatch last,
+ PersisterStateBatch candidate,
+ List<PersisterStateBatch> nonOverlapping
+ ) {
+ this.last = last;
+ this.candidate = candidate;
+ this.nonOverlapping = nonOverlapping;
+ }
+
+ public PersisterStateBatch last() {
+ return last;
+ }
+
+ public PersisterStateBatch candidate() {
+ return candidate;
+ }
+
+ public List<PersisterStateBatch> nonOverlapping() {
+ return nonOverlapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BatchOverlapState)) return false;
+ BatchOverlapState that = (BatchOverlapState) o;
+ return Objects.equals(last, that.last) &&
Objects.equals(candidate, that.candidate) && Objects.equals(nonOverlapping,
that.nonOverlapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(last, candidate, nonOverlapping);
+ }
+ }
+
+ static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch>
batches) {
+ if (batches.size() < 2) {
+ return batches;
+ }
+ TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+ List<PersisterStateBatch> finalBatches = new
ArrayList<>(batches.size() * 2); // heuristic size
+
+ BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+ while (overlapState != BatchOverlapState.SENTINEL) {
+ PersisterStateBatch last = overlapState.last();
+ PersisterStateBatch candidate = overlapState.candidate();
+
+ // remove non overlapping prefix from sortedBatches,
+ // will make getting next overlapping pair efficient
+ // as a prefix batch which is non overlapping will only
+ // be checked once.
+ if (overlapState.nonOverlapping() != null) {
+ overlapState.nonOverlapping().forEach(sortedBatches::remove);
+ finalBatches.addAll(overlapState.nonOverlapping());
+ }
+
+ if (candidate == null) {
+ break;
+ }
+
+ // overlap and same state (last.firstOffset <=
candidate.firstOffset due to sort
+ // covers:
+ // case: 1 2 3 4 5
6 7 (contiguous)
+ // last: ______ _______ _______ _______ _______
________ _______
+ // candidate: ______ ____ __________ ___ ____
_______ _______
+ if (compareBatchState(candidate, last) == 0) {
+ sortedBatches.remove(last); // remove older smaller interval
+ sortedBatches.remove(candidate);
+
+ last = new PersisterStateBatch(
+ last.firstOffset(),
+ // cover cases
+ // last: ______ ________ _________
+ // candidate: ___ __________ _____
+ Math.max(candidate.lastOffset(), last.lastOffset()),
+ last.deliveryState(),
+ last.deliveryCount()
+ );
+
+ sortedBatches.add(last);
+ } else if (candidate.firstOffset() <= last.lastOffset()) { // non
contiguous overlap
+ // overlap and different state
+ // covers:
+ // case: 1 2 3 4 5
6
+ // last: ______ _______ _______ _______
_______ ________
+ // candidate: ______ ____ _________ ____
____ _______
+ // max batches: 1 2 2 3
2 2
+ // min batches: 1 1 1 1
1 2
+
+ sortedBatches.remove(last);
+ if (candidate.firstOffset() == last.firstOffset()) {
+ if (candidate.lastOffset() == last.lastOffset()) { //
case 1
+ if (compareBatchState(candidate, last) < 0) { //
candidate is lower priority
+ sortedBatches.add(last);
Review Comment:
`candidate` has not been removed, but shouldn't it be?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]