jsancio commented on code in PR #16058:
URL: https://github.com/apache/kafka/pull/16058#discussion_r1624911276
##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult
listClientMetricsResources(ListClientMet
return new ListClientMetricsResourcesResult(future);
}
+ @Override
+ public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints, AddRaftVoterOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid
voterDirectoryId, RemoveRaftVoterOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
Review Comment:
I suspect that we will never implement these. Do you agree?
##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -1080,4 +1083,39 @@ class ControllerApis(
requestThrottleMs => new
AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs)))
}
}
+
+ def handleAddRaftVoter(request: RequestChannel.Request):
CompletableFuture[Unit] = {
+ authHelper.authorizeClusterOperation(request, ALTER)
+ val addRequest = request.body[AddRaftVoterRequest]
+ raftManager.handleAddVoter(addRequest.data())
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AddRaftVoterResponse(new AddRaftVoterResponseData().
+ setThrottleTimeMs(requestThrottleMs).
+ setErrorCode(0.toShort).
+ setErrorMessage(null)))
+ CompletableFuture.completedFuture[Unit](())
Review Comment:
I think this is going to change in the real implementation. For KRaft
requests we simply forward them to the raft thread/actor using
`handleRaftRequest` and `raftManager.handleRequest`.
How about for now we just send an error response with "unsupported version"
and return a completed future?
This comment applies to all of the new methods added to this type.
##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -318,4 +329,28 @@ class KafkaRaftManager[T](
override def voterNode(id: Int, listener: String): Option[Node] = {
client.voterNode(id, listener).toScala
}
+
+ def verifyCommonFields(
+ providedClusterId: String,
+ ): Unit = {
+ if (!providedClusterId.equals(clusterId)) {
+ throw new InconsistentClusterIdException("Provided cluster ID " +
providedClusterId +
Review Comment:
This applies to all of the `throw new` added.
If we throw here, this will get caught here and print an error message,
right?
https://github.com/apache/kafka/pull/16058/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R152-R154
How about for now we just complete the requests with the unsupported version
error responses?
Also, I have to double check but I think cluster ids are check by the
`KafkaRaftClient` and not the `KafkaRaftManager`. We have this abstraction so
we can test that behavior in the `KafkaRaftClientTest` suite or if we add a new
test suite for this functionality.
Most our interesting tests are against `KafkaRaftClient`. The tests for
`KafkaRaftManager` are more basic and limited.
##########
clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * An endpoint for a raft quorum voter.
+ */
[email protected]
+public class RaftVoterEndpoint {
+ private final String name;
+ private final String host;
+ private final int port;
+ private final String securityProtocol;
Review Comment:
Did you consider using `o.a.k.c.Endpoints`? This struct has a similar
structure to `Endpoints`.
During the KIP discussion we agreed to not add security protocol to the
`AddRaftVoter` request. Mainly because it is not needed and that there is a
requirement that the listener name, security protocol and the protocol
configuration must match in all of the voters if they exist.
##########
core/src/test/resources/log4j.properties:
##########
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka=OFF
+log4j.logger.org.apache.kafka=OFF
Review Comment:
Why this change? Did you mean to change this?
##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -306,6 +309,113 @@ class RaftManagerTest {
}
+ class ReconfigurationTestContext(
Review Comment:
If you agree with my comments in `RaftManager`, I am okay removing these
tests. I think when we have the real implementation will want to add a
`KafkaRaftClientReconfigTest`, similar to `KafkaRaftClientTest` and
`KafkaRaftClientSnapshotTest` that uses `RaftClientTestContext` to test
`KafkaRaftClient`.
##########
clients/src/main/resources/common/message/AddRaftVoterRequest.json:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 80,
+ "type": "request",
+ "listeners": ["controller", "broker"],
+ "name": "AddRaftVoterRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "ClusterId", "type": "string", "versions": "0+" },
+ { "name": "TimeoutMs", "type": "int32", "versions": "0+" },
+ { "name": "VoterId", "type": "int32", "versions": "0+",
+ "about": "The replica id of the voter getting added to the topic
partition" },
+ { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",
+ "about": "The directory id of the voter getting added to the topic
partition" },
Review Comment:
@cmccabe and I discussed this offline and agreed to remove topic name, topic
id and topic partition from the requests. Kafka only supports one KRaft
partition (__cluster_metadata-0). If we decide to have more than one KRaft
partition we can revisit this decision with another KIP.
I will update KIP-853 to match these changes once this PR is merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]