AndrewJSchofield commented on code in PR #19861:
URL: https://github.com/apache/kafka/pull/19861#discussion_r2121064720
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -644,8 +632,7 @@ public CoordinatorResult<Void, CoordinatorRecord>
maybeCleanupShareState(Set<Uui
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for
a key, based on various conditions.
* <p>
- * If no snapshot has been created for the key => create a new
ShareSnapshot record
- * else if number of ShareUpdate records for key >= max allowed per
snapshot per key => create a new ShareSnapshot record
+ * Ff number of ShareUpdate records for key >= max allowed per snapshot
per key or stateEpoch is highest seen => create a new ShareSnapshot record
Review Comment:
nit: What's "Ff"?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -658,28 +645,14 @@ private CoordinatorRecord generateShareStateRecord(
) {
long timestamp = time.milliseconds();
int updatesPerSnapshotLimit =
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
- if (!shareStateMap.containsKey(key)) {
- // Since this is the first time we are getting a write request for
key, we should be creating a share snapshot record.
- // The incoming partition data could have overlapping state
batches, we must merge them
- return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
- key.groupId(), key.topicId(), partitionData.partition(),
- new ShareGroupOffset.Builder()
- .setSnapshotEpoch(0)
- .setStartOffset(partitionData.startOffset())
- .setLeaderEpoch(partitionData.leaderEpoch())
- .setStateEpoch(partitionData.stateEpoch())
- .setStateBatches(mergeBatches(List.of(), partitionData))
- .setCreateTimestamp(timestamp)
- .setWriteTimestamp(timestamp)
- .build());
- } else if (snapshotUpdateCount.getOrDefault(key, 0) >=
updatesPerSnapshotLimit || partitionData.stateEpoch() >
shareStateMap.get(key).stateEpoch()) {
+ if (snapshotUpdateCount.getOrDefault(key, 0) >=
updatesPerSnapshotLimit || partitionData.stateEpoch() >
shareStateMap.get(key).stateEpoch()) {
Review Comment:
I believe it's possible for `shareStateMap` not to contain the key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]