jsancio commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2738100509
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -178,6 +178,20 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
closeSasl()
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testNonDefaultKControllerDynamicConfiguration(groupProtocol: String):
Unit = {
+ val props = new Properties
+ props.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "9")
+ reconfigureServers(props, perBrokerConfig = false,
(ServerConfigs.NUM_IO_THREADS_CONFIG, "9"))
+
+ val controller = controllerServer
+ TestUtils.retry(60000) {
+ assertNotNull(controller.controllerApisHandlerPool)
+ assertEquals(9,
controller.controllerApisHandlerPool.threadPoolSize.get())
+ }
+ }
+
Review Comment:
What is this testing? Why is this part of this PR/change?
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -19,65 +19,33 @@
import org.apache.kafka.metadata.util.BatchFileReader;
import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
/**
- * A read-only class that holds the controller bootstrap metadata. A file
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
+ * Abstraction for reading controller bootstrap metadata from disk.
*/
-public class BootstrapDirectory {
- public static final String BINARY_BOOTSTRAP_FILENAME =
"bootstrap.checkpoint";
-
- private final String directoryPath;
+public interface BootstrapDirectory {
/**
- * Create a new BootstrapDirectory object.
+ * Read the bootstrap metadata from the configured location.
*
- * @param directoryPath The path to the directory with the bootstrap
file.
+ * @return the loaded {@link BootstrapMetadata}
+ * @throws Exception if the metadata cannot be read
*/
- public BootstrapDirectory(
- String directoryPath
- ) {
- this.directoryPath = Objects.requireNonNull(directoryPath);
- }
-
- public BootstrapMetadata read() throws Exception {
- Path path = Paths.get(directoryPath);
- if (!Files.isDirectory(path)) {
- if (Files.exists(path)) {
- throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
- "a directory.");
- } else {
- throw new RuntimeException("No such directory as " +
directoryPath);
- }
- }
- Path binaryBootstrapPath = Paths.get(directoryPath,
BINARY_BOOTSTRAP_FILENAME);
- if (!Files.exists(binaryBootstrapPath)) {
- return readFromConfiguration();
- } else {
- return readFromBinaryFile(binaryBootstrapPath.toString());
- }
- }
-
- BootstrapMetadata readFromConfiguration() {
- return
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default
bootstrap");
- }
+ BootstrapMetadata read() throws Exception;
- BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+ /**
+ * Read bootstrap metadata from the given binary file path.
+ *
+ * @param binaryPath the path to the binary bootstrap file
+ * @return the loaded {@link BootstrapMetadata}
+ * @throws Exception if the metadata cannot be read
+ */
+ default BootstrapMetadata readFromBinaryFile(String binaryPath) throws
Exception {
Review Comment:
Can this be done with static method in a utility package? Again, it is odd
to have an interface that abstract the construction of BootstrapMetadata. And
at the same time the interface has a defined method that constructs a bootstrap
metadata that all of the implementation use.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
Review Comment:
To me that sounds like a minor distinction that could be supported by
`LegacyBoostrapDirectory`. What do you think @mannoopj
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1174,35 +1174,6 @@ class KRaftClusterTest {
}
}
- @Test
- def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {
Review Comment:
Was this moved to `DynamicBrokerReconfigurationTest`? Why is that?
QuorumTestHarness was developed to migrate existing test from ZK to KRaft. New
tests should use the `KafkaClusterTestKit` utility.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
+ private static final String BINARY_BOOTSTRAP_CHECKPOINT_FILENAME =
"00000000000000000000-0000000000.checkpoint";
Review Comment:
I still don't get it. Again to me it looks like the file name used could be
parametrized.
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1436,7 +1446,7 @@ private void replay(ApiMessage message,
Optional<OffsetAndEpoch> snapshotId, lon
/**
* The bootstrap metadata to use for initialization if needed.
*/
- private final BootstrapMetadata bootstrapMetadata;
+ private volatile BootstrapMetadata bootstrapMetadata;
Review Comment:
Let's document why volatile is needed and why volatile is enough to be
correct.
##########
raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java:
##########
@@ -57,6 +57,12 @@ public interface SnapshotReader<T> extends AutoCloseable,
Iterator<Batch<T>> {
*/
long lastContainedLogTimestamp();
+ /**
+ * Returns true if the snapshot has been committed.
+ * Uncommitted bootstrap snapshots return false.
+ */
+ boolean isCommittedSnapshot();
Review Comment:
Thanks. I think that returning uncommitted snapshot through
`RaftListener.handleLoadSnapshot` has a high probability of the
user/application handling it wrong. Instead of changing the semantic of
`handleLoadSnapshot` to return uncommitted snapshots. I think that we should
add another callback to `RaftListener`, for example
`handleLoadBootstrap(SnapshotReader<T>)`. This make it clearer to the user the
semantic difference between the two snapshots.
If we don't this, all KRaft application need to remember to check if the
returned snapshot is uncommitted or committed. Which is not clear from the
`RaftListener` interface and the documentation of that interface.
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -123,6 +123,7 @@ object StorageTool extends Logging {
val formatter = new Formatter().
setPrintStream(printStream).
setNodeId(config.nodeId).
+
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
Review Comment:
I think I am fine with this change. We should document it in KIP-1170.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]