apoorvmittal10 commented on code in PR #16921:
URL: https://github.com/apache/kafka/pull/16921#discussion_r1723912831
##########
checkstyle/import-control-share-coordinator.xml:
##########
@@ -30,11 +30,18 @@
<!-- anyone can use public classes -->
<subpackage name="coordinator">
<subpackage name="share">
+ <allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.common" />
+ <allow pkg="org.apache.kafka.coordinator.share.generated" />
<allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.group.share" />
+ <allow pkg="org.junit.jupiter.api" />
Review Comment:
You might want to consider moving it under `<!-- common library dependencies
-->`
##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShareCoordinatorRecordSerdeTest {
+ private ShareCoordinatorRecordSerde serde;
+
+ @BeforeEach
+ public void setup() {
+ serde = new ShareCoordinatorRecordSerde();
+ }
+
+ @Test
+ public void testSerializeKey() {
+ CoordinatorRecord record = getShareSnapshotRecord("groupId",
Uuid.randomUuid(), 1);
+
+ assertArrayEquals(
+ MessageUtil.toVersionPrefixedBytes(record.key().version(),
record.key().message()),
+ serde.serializeKey(record)
+ );
+ }
+
+ @Test
+ public void testSerializeValue() {
+ CoordinatorRecord record = getShareSnapshotRecord("groupId",
Uuid.randomUuid(), 1);
+
+ assertArrayEquals(
+ MessageUtil.toVersionPrefixedBytes(record.value().version(),
record.value().message()),
+ serde.serializeValue(record)
+ );
+ }
+
+ @Test
+ public void testSerializeNullValue() {
+ CoordinatorRecord record = new CoordinatorRecord(
+ new ApiMessageAndVersion(
+ new ShareSnapshotKey()
+ .setGroupId("group")
+ .setTopicId(Uuid.randomUuid())
+ .setPartition(1),
+ ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
+ ),
+ null
+ );
+
+ assertNull(serde.serializeValue(record));
+ }
+
+ @Test
+ public void testDeserialize() {
+ CoordinatorRecord record = getShareSnapshotRecord("groupId",
Uuid.randomUuid(), 1);
+ ApiMessageAndVersion key = record.key();
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ApiMessageAndVersion value = record.value();
+ ByteBuffer valueBuffer =
MessageUtil.toVersionPrefixedByteBuffer(value.version(), value.message());
+
+ CoordinatorRecord desRecord = serde.deserialize(keyBuffer,
valueBuffer);
+ assertEquals(key, desRecord.key());
+ assertEquals(value, desRecord.value());
+ }
+
+ @Test
+ public void testDeserializeWithTombstoneForValue() {
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ShareSnapshotKey()
+ .setGroupId("groupId")
+ .setTopicId(Uuid.randomUuid())
+ .setPartition(1),
+ ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ CoordinatorRecord record = serde.deserialize(keyBuffer, null);
+ assertEquals(key, record.key());
+ assertNull(record.value());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidRecordType() {
+ ByteBuffer keyBuffer = ByteBuffer.allocate(64);
+ keyBuffer.putShort((short) 255);
+ keyBuffer.rewind();
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+ CoordinatorLoader.UnknownRecordTypeException ex =
+
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals((short) 255, ex.unknownType());
+ }
+
+ @Test
+ public void testDeserializeWithKeyEmptyBuffer() {
+ ByteBuffer keyBuffer = ByteBuffer.allocate(0);
+ ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals("Could not read version from key's buffer.",
ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithValueEmptyBuffer() {
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ShareSnapshotKey()
+ .setGroupId("foo")
+ .setTopicId(Uuid.randomUuid())
+ .setPartition(1),
+ ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(0);
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals("Could not read version from value's buffer.",
ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidKeyBytes() {
+ ByteBuffer keyBuffer = ByteBuffer.allocate(2);
+ keyBuffer.putShort((short) 0);
+ keyBuffer.rewind();
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+ valueBuffer.putShort((short) 0);
+ valueBuffer.rewind();
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertTrue(ex.getMessage().startsWith("Could not read record with
version 0 from key's buffer due to"),
+ ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidValueBytes() {
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ShareSnapshotKey()
+ .setGroupId("foo")
+ .setTopicId(Uuid.randomUuid())
+ .setPartition(1),
+ ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+ valueBuffer.putShort((short) 0);
+ valueBuffer.rewind();
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertTrue(ex.getMessage().startsWith("Could not read record with
version 0 from value's buffer due to"),
+ ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeAllRecordTypes() {
+ roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue());
Review Comment:
Does this test ensure that value passed is also verified against the key
i.e. if I write mismatched key val `roundTrip((short) 0, new
ShareSnapshotKey(), new ShareUpdateValue());` then will it fail?
##########
checkstyle/import-control-share-coordinator.xml:
##########
@@ -30,11 +30,18 @@
<!-- anyone can use public classes -->
<subpackage name="coordinator">
<subpackage name="share">
+ <allow pkg="org.apache.kafka.common" />
Review Comment:
It might be better to use the way other import controls are structured
generally with
```
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
...
```
```
<!-- protocol, records and request/response utilities -->
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java:
##########
@@ -53,90 +49,11 @@
import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-
-/**
- * Serializer/Deserializer for {@link CoordinatorRecord}. The format is
defined below:
- * <pre>
- * record_key = [record_type key_message]
- * record_value = [value_version value_message]
- *
- * record_type : The record type is currently define as the version of
the key
- * {@link ApiMessageAndVersion} object.
- * key_message : The serialized message of the key {@link
ApiMessageAndVersion} object.
- * value_version : The value version is currently define as the version
of the value
- * {@link ApiMessageAndVersion} object.
- * value_message : The serialized message of the value {@link
ApiMessageAndVersion} object.
- * </pre>
- */
@SuppressWarnings({ "ClassDataAbstractionCoupling", "CyclomaticComplexity" })
Review Comment:
Do we still need these warnings after refactoring?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+
+public interface ShareCoordinator {
+ short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0;
+ short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0;
+ short SHARE_UPDATE_RECORD_KEY_VERSION = 1;
+ short SHARE_UPDATE_RECORD_VALUE_VERSION = 1;
+
+ /**
+ * Return the partition index for the given key.
+ *
+ * @param key key - groupId:topicId:partitionId.
+ * @return The partition index.
+ */
+ int partitionFor(String key);
+
+ /**
+ * Return the configuration properties of the share-group state topic.
+ *
+ * @return Properties of the share-group state topic.
+ */
+ Properties shareGroupStateTopicConfigs();
+
+ /**
+ * Start the share coordinator
+ *
+ * @param shareGroupTopicPartitionCount - supplier returning the number of
partitions for __share_group_state topic
+ */
+ void startup(IntSupplier shareGroupTopicPartitionCount);
+
+ /**
+ * Stop the share coordinator
+ */
+ void shutdown();
+
+ /**
+ * Handle write share state call
+ * @param context - represents the incoming write request context
+ * @param request - actual RPC request object
+ * @return completable future comprizing of write RPC response data
+ */
+ CompletableFuture<WriteShareGroupStateResponseData>
writeState(RequestContext context, WriteShareGroupStateRequestData request);
+
+
+ /**
+ * Handle read share state call
+ * @param context - represents the incoming read request context
+ * @param request - actual RPC request object
+ * @return completable future comprising of write RPC response data
Review Comment:
```suggestion
* @return completable future comprising of read RPC response data
```
##########
server-common/src/main/java/org/apache/kafka/server/group/share/SharePartitionKey.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.group.share;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+
+/**
+ * Common immutable share partition key class. This class is
+ * placed in server-common so that it can be freely used across
+ * various modules.
+ */
+public class SharePartitionKey {
+ public final String groupId;
+ public final Uuid topicId;
+ public final int partition;
Review Comment:
Are they required to be public?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+
+public interface ShareCoordinator {
+ short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0;
+ short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0;
+ short SHARE_UPDATE_RECORD_KEY_VERSION = 1;
+ short SHARE_UPDATE_RECORD_VALUE_VERSION = 1;
+
+ /**
+ * Return the partition index for the given key.
+ *
+ * @param key key - groupId:topicId:partitionId.
+ * @return The partition index.
+ */
+ int partitionFor(String key);
+
+ /**
+ * Return the configuration properties of the share-group state topic.
+ *
+ * @return Properties of the share-group state topic.
+ */
+ Properties shareGroupStateTopicConfigs();
+
+ /**
+ * Start the share coordinator
+ *
+ * @param shareGroupTopicPartitionCount - supplier returning the number of
partitions for __share_group_state topic
+ */
+ void startup(IntSupplier shareGroupTopicPartitionCount);
+
+ /**
+ * Stop the share coordinator
+ */
+ void shutdown();
+
+ /**
+ * Handle write share state call
+ * @param context - represents the incoming write request context
Review Comment:
Do we require `-`, I see it's being used sometimes for params and omitted in
some case. Shall we have consistency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]