mumrah commented on code in PR #16921:
URL: https://github.com/apache/kafka/pull/16921#discussion_r1727482508


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.group.share.PersisterStateBatch;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Container class to represent data encapsulated in {@link 
ShareSnapshotValue} and {@link ShareUpdateValue}
+ * This class is effectively immutable (state batches is not modified out of 
context).
+ */
+public class ShareGroupOffset {
+    private final int snapshotEpoch;
+    private final int stateEpoch;
+    private final int leaderEpoch;
+    private final long startOffset;
+    private final List<? extends PersisterStateBatch> stateBatches;
+
+    private ShareGroupOffset(int snapshotEpoch,
+                            int stateEpoch,
+                            int leaderEpoch,
+                            long startOffset,
+                            List<? extends PersisterStateBatch> stateBatches) {
+        this.snapshotEpoch = snapshotEpoch;
+        this.stateEpoch = stateEpoch;
+        this.leaderEpoch = leaderEpoch;
+        this.startOffset = startOffset;
+        this.stateBatches = stateBatches;
+    }
+
+    public int snapshotEpoch() {
+        return snapshotEpoch;
+    }
+
+    public int stateEpoch() {
+        return stateEpoch;
+    }
+
+    public int leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public long startOffset() {
+        return startOffset;
+    }
+
+    public List<? extends PersisterStateBatch> stateBatches() {
+        return Collections.unmodifiableList(stateBatches);
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareUpdateValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    public static ShareGroupOffset fromRecord(ShareSnapshotValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), 
record.stateEpoch(), record.leaderEpoch(), record.startOffset(), 
record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset fromRecord(ShareUpdateValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), -1, 
record.leaderEpoch(), record.startOffset(), record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data) {
+        return fromRequest(data, 0);
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch) {
+        return new ShareGroupOffset(snapshotEpoch,
+            data.stateEpoch(),
+            data.leaderEpoch(),
+            data.startOffset(),
+            data.stateBatches().stream()
+                .map(PersisterStateBatch::from)
+                .collect(Collectors.toList()));
+    }
+
+    public Set<PersisterOffsetsStateBatch> stateBatchAsSet() {
+        return this.stateBatches().stream()
+            .map(PersisterOffsetsStateBatch::of)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    public static class Builder {
+        private int snapshotEpoch;
+        private int stateEpoch;
+        private int leaderEpoch;
+        private long startOffset;
+        private List<? extends PersisterStateBatch> stateBatches;
+
+        public Builder setSnapshotEpoch(int snapshotEpoch) {
+            this.snapshotEpoch = snapshotEpoch;
+            return this;
+        }
+
+        public Builder setStateEpoch(int stateEpoch) {
+            this.stateEpoch = stateEpoch;
+            return this;
+        }
+
+        public Builder setLeaderEpoch(int leaderEpoch) {
+            this.leaderEpoch = leaderEpoch;
+            return this;
+        }
+
+        public Builder setStartOffset(long startOffset) {
+            this.startOffset = startOffset;
+            return this;
+        }
+
+        public Builder setStateBatches(List<? extends PersisterStateBatch> 
stateBatches) {
+            this.stateBatches = stateBatches;
+            return this;
+        }
+
+        public ShareGroupOffset build() {
+            return new ShareGroupOffset(snapshotEpoch, stateEpoch, 
leaderEpoch, startOffset, stateBatches);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ShareGroupOffset that = (ShareGroupOffset) o;
+        return snapshotEpoch == that.snapshotEpoch && stateEpoch == 
that.stateEpoch && leaderEpoch == that.leaderEpoch && startOffset == 
that.startOffset && Objects.equals(stateBatches, that.stateBatches);

Review Comment:
   nit: break this long line



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.group.share.PersisterStateBatch;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Container class to represent data encapsulated in {@link 
ShareSnapshotValue} and {@link ShareUpdateValue}
+ * This class is effectively immutable (state batches is not modified out of 
context).
+ */
+public class ShareGroupOffset {
+    private final int snapshotEpoch;
+    private final int stateEpoch;
+    private final int leaderEpoch;
+    private final long startOffset;
+    private final List<? extends PersisterStateBatch> stateBatches;
+
+    private ShareGroupOffset(int snapshotEpoch,
+                            int stateEpoch,
+                            int leaderEpoch,
+                            long startOffset,
+                            List<? extends PersisterStateBatch> stateBatches) {
+        this.snapshotEpoch = snapshotEpoch;
+        this.stateEpoch = stateEpoch;
+        this.leaderEpoch = leaderEpoch;
+        this.startOffset = startOffset;
+        this.stateBatches = stateBatches;
+    }
+
+    public int snapshotEpoch() {
+        return snapshotEpoch;
+    }
+
+    public int stateEpoch() {
+        return stateEpoch;
+    }
+
+    public int leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public long startOffset() {
+        return startOffset;
+    }
+
+    public List<? extends PersisterStateBatch> stateBatches() {
+        return Collections.unmodifiableList(stateBatches);
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareUpdateValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    public static ShareGroupOffset fromRecord(ShareSnapshotValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), 
record.stateEpoch(), record.leaderEpoch(), record.startOffset(), 
record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset fromRecord(ShareUpdateValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), -1, 
record.leaderEpoch(), record.startOffset(), record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data) {
+        return fromRequest(data, 0);
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch) {
+        return new ShareGroupOffset(snapshotEpoch,
+            data.stateEpoch(),
+            data.leaderEpoch(),
+            data.startOffset(),
+            data.stateBatches().stream()
+                .map(PersisterStateBatch::from)
+                .collect(Collectors.toList()));
+    }
+
+    public Set<PersisterOffsetsStateBatch> stateBatchAsSet() {
+        return this.stateBatches().stream()
+            .map(PersisterOffsetsStateBatch::of)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    public static class Builder {
+        private int snapshotEpoch;
+        private int stateEpoch;
+        private int leaderEpoch;
+        private long startOffset;
+        private List<? extends PersisterStateBatch> stateBatches;
+
+        public Builder setSnapshotEpoch(int snapshotEpoch) {
+            this.snapshotEpoch = snapshotEpoch;
+            return this;
+        }
+
+        public Builder setStateEpoch(int stateEpoch) {
+            this.stateEpoch = stateEpoch;
+            return this;
+        }
+
+        public Builder setLeaderEpoch(int leaderEpoch) {
+            this.leaderEpoch = leaderEpoch;
+            return this;
+        }
+
+        public Builder setStartOffset(long startOffset) {
+            this.startOffset = startOffset;
+            return this;
+        }
+
+        public Builder setStateBatches(List<? extends PersisterStateBatch> 
stateBatches) {
+            this.stateBatches = stateBatches;
+            return this;
+        }
+
+        public ShareGroupOffset build() {
+            return new ShareGroupOffset(snapshotEpoch, stateEpoch, 
leaderEpoch, startOffset, stateBatches);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ShareGroupOffset that = (ShareGroupOffset) o;
+        return snapshotEpoch == that.snapshotEpoch && stateEpoch == 
that.stateEpoch && leaderEpoch == that.leaderEpoch && startOffset == 
that.startOffset && Objects.equals(stateBatches, that.stateBatches);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(snapshotEpoch, stateEpoch, leaderEpoch, 
startOffset, stateBatches);
+    }

Review Comment:
   Can you add a toString()? These objects could end up in log messages 



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.group.share.PersisterStateBatch;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Container class to represent data encapsulated in {@link 
ShareSnapshotValue} and {@link ShareUpdateValue}
+ * This class is effectively immutable (state batches is not modified out of 
context).
+ */
+public class ShareGroupOffset {
+    private final int snapshotEpoch;
+    private final int stateEpoch;
+    private final int leaderEpoch;
+    private final long startOffset;
+    private final List<? extends PersisterStateBatch> stateBatches;
+
+    private ShareGroupOffset(int snapshotEpoch,
+                            int stateEpoch,
+                            int leaderEpoch,
+                            long startOffset,
+                            List<? extends PersisterStateBatch> stateBatches) {
+        this.snapshotEpoch = snapshotEpoch;
+        this.stateEpoch = stateEpoch;
+        this.leaderEpoch = leaderEpoch;
+        this.startOffset = startOffset;
+        this.stateBatches = stateBatches;
+    }
+
+    public int snapshotEpoch() {
+        return snapshotEpoch;
+    }
+
+    public int stateEpoch() {
+        return stateEpoch;
+    }
+
+    public int leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public long startOffset() {
+        return startOffset;
+    }
+
+    public List<? extends PersisterStateBatch> stateBatches() {
+        return Collections.unmodifiableList(stateBatches);
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    private static PersisterStateBatch 
toPersisterStateBatch(ShareUpdateValue.StateBatch stateBatch) {
+        return new PersisterStateBatch(stateBatch.firstOffset(), 
stateBatch.lastOffset(), stateBatch.deliveryState(), 
stateBatch.deliveryCount());
+    }
+
+    public static ShareGroupOffset fromRecord(ShareSnapshotValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), 
record.stateEpoch(), record.leaderEpoch(), record.startOffset(), 
record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset fromRecord(ShareUpdateValue record) {
+        return new ShareGroupOffset(record.snapshotEpoch(), -1, 
record.leaderEpoch(), record.startOffset(), record.stateBatches().stream()
+            
.map(ShareGroupOffset::toPersisterStateBatch).collect(Collectors.toList()));
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data) {
+        return fromRequest(data, 0);
+    }
+
+    public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch) {
+        return new ShareGroupOffset(snapshotEpoch,
+            data.stateEpoch(),
+            data.leaderEpoch(),
+            data.startOffset(),
+            data.stateBatches().stream()
+                .map(PersisterStateBatch::from)
+                .collect(Collectors.toList()));
+    }
+
+    public Set<PersisterOffsetsStateBatch> stateBatchAsSet() {
+        return this.stateBatches().stream()
+            .map(PersisterOffsetsStateBatch::of)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    public static class Builder {
+        private int snapshotEpoch;
+        private int stateEpoch;
+        private int leaderEpoch;
+        private long startOffset;

Review Comment:
   These will default to 0. Is that okay? 
   
   Another approach is to use the boxed types, then a null check in `build()`



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterOffsetsStateBatch.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.group.share.PersisterStateBatch;
+
+import java.util.Objects;
+
+/**
+ * This is a helper class which overrides the equals and hashcode
+ * methods to only focus on the first and last offset fields of the
+ * state batch. This is useful when combining batches.
+ */
+public class PersisterOffsetsStateBatch extends PersisterStateBatch {

Review Comment:
   Using the type hierarchy for this is a bit unintuative and likely to cause 
surprises down the road. Imagine you have a `PersisterStateBatch` reference but 
it's actually a `PersisterOffsetsStateBatch` object and the behavior is 
different. 
   
   A better approach is to create a wrapper class that has the custom equalsTo 
and hashCode. In other words, let this class hold a PersisterStateBatch rather 
than extending that type. 



##########
server-common/src/main/java/org/apache/kafka/server/group/share/SharePartitionKey.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.group.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+
+/**
+ * Common immutable share partition key class. This class is
+ * placed in server-common so that it can be freely used across
+ * various modules.
+ */
+public class SharePartitionKey {
+    private final String groupId;
+    private final Uuid topicId;
+    private final int partition;

Review Comment:
   we need accessors for these values. I think a `TopicIdPartition 
topicIdPartition()` accessor would be useful too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to