mumrah commented on code in PR #16921: URL: https://github.com/apache/kafka/pull/16921#discussion_r1727201011
########## server-common/src/main/java/org/apache/kafka/server/group/share/SharePartitionKey.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.group.share; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * Common immutable share partition key class. This class is + * placed in server-common so that it can be freely used across + * various modules. + */ +public class SharePartitionKey { + private final String groupId; + private final Uuid topicId; + private final int partition; + + public SharePartitionKey(String groupId, Uuid topicId, int partition) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + } + + public static class Builder { + private String groupId; + private Uuid topicId; + private int partition; + + public Builder setGroupId(String groupId) { + this.groupId = groupId; + return this; + } + + public Builder setTopicId(Uuid topicId) { + this.topicId = topicId; + return this; + } + + public Builder setPartition(int partition) { + this.partition = partition; + return this; + } + + public SharePartitionKey build() { + return new SharePartitionKey(groupId, topicId, partition); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof SharePartitionKey)) return false; + SharePartitionKey that = (SharePartitionKey) o; + return partition == that.partition && Objects.equals(groupId, that.groupId) && Objects.equals(topicId, that.topicId); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, topicId, partition); + } + + @Override + public String toString() { + return String.format("%s:%s:%d", groupId, topicId, partition); Review Comment: Is there a reason we don't want the usual "toString" format here? ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.group.share.PersisterStateBatch; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Container class to represent data encapsulated in @link ShareSnapshotValue and @link ShareUpdateValue + * This class is effectively immutable (state batches is not modified out of context). + */ +public class ShareGroupOffset { + private final int snapshotEpoch; + private final int stateEpoch; + private final int leaderEpoch; + private final long startOffset; + private final List<? extends PersisterStateBatch> stateBatches; + + public ShareGroupOffset(int snapshotEpoch, + int stateEpoch, + int leaderEpoch, + long startOffset, Review Comment: Methods/constructors with many sequential arguments of the same type are prone to swapped argument types of bugs. I would suggest making this constructor private and forcing callers to use the builder. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterOffsetsStateBatch.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.group.share.PersisterStateBatch; + +import java.util.Objects; + +/** + * This is a helper class which overrides the equals and hashcode + * methods to only focus on the first and last offset fields of the + * state batch. This is useful when combining batches. + */ +public class PersisterOffsetsStateBatch extends PersisterStateBatch { + public PersisterOffsetsStateBatch(long firstOffset, long lastOffset, byte deliveryState, short deliveryCount) { + super(firstOffset, lastOffset, deliveryState, deliveryCount); + } + + public static PersisterOffsetsStateBatch of(ShareSnapshotValue.StateBatch batch) { + return new PersisterOffsetsStateBatch(batch.firstOffset(), batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()); + } + + public static PersisterOffsetsStateBatch of(ShareUpdateValue.StateBatch batch) { + return new PersisterOffsetsStateBatch(batch.firstOffset(), batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()); + } + + public static PersisterOffsetsStateBatch of(PersisterStateBatch batch) { + return new PersisterOffsetsStateBatch(batch.firstOffset(), batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()); + } + + public static PersisterOffsetsStateBatch of(WriteShareGroupStateRequestData.StateBatch batch) { + return new PersisterOffsetsStateBatch(batch.firstOffset(), batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PersisterOffsetsStateBatch)) { + return false; + } + PersisterOffsetsStateBatch that = (PersisterOffsetsStateBatch) o; + return this.firstOffset() == that.firstOffset() && this.lastOffset() == that.lastOffset(); + } + + @Override + public int hashCode() { + return Objects.hash(firstOffset(), lastOffset()); + } +} Review Comment: nit: missing newline ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java: ########## @@ -53,90 +49,11 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; -import org.apache.kafka.server.common.ApiMessageAndVersion; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; - -/** - * Serializer/Deserializer for {@link CoordinatorRecord}. The format is defined below: - * <pre> - * record_key = [record_type key_message] - * record_value = [value_version value_message] - * - * record_type : The record type is currently define as the version of the key - * {@link ApiMessageAndVersion} object. - * key_message : The serialized message of the key {@link ApiMessageAndVersion} object. - * value_version : The value version is currently define as the version of the value - * {@link ApiMessageAndVersion} object. - * value_message : The serialized message of the value {@link ApiMessageAndVersion} object. - * </pre> - */ @SuppressWarnings({ "ClassDataAbstractionCoupling", "CyclomaticComplexity" }) Review Comment: There is a better way to suppress checkstyle warnings. Look at "checkstyle/suppressions.xml" ########## server-common/src/main/java/org/apache/kafka/server/group/share/SharePartitionKey.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.group.share; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * Common immutable share partition key class. This class is + * placed in server-common so that it can be freely used across + * various modules. + */ +public class SharePartitionKey { + private final String groupId; + private final Uuid topicId; + private final int partition; + + public SharePartitionKey(String groupId, Uuid topicId, int partition) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + } + + public static class Builder { Review Comment: Why do we need a builder for this class? Three arguments seems fine to deal with, and I doubt it will evolve over time. How about a since private constructor, and two public static factories: one (String, Uuid, int) and one (String, TopicIdPartition) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
