Hi Jun, RE JR12: Dynamic quorums can technically not require formatting, but I fear it will cause cluster misconfigurations more easily than the static quorum case. The main difference between the two quorum deployments is that static quorum cannot recover from data loss on a voter, whereas dynamic quorum can. Below is my reasoning for why maintaining this requirement for dynamic clusters is better for Kafka operators given that formatting each node's disk is currently required.
The KRaft voter set in both static and dynamic deployments is a piece of bootstrapping data that requires some orchestration to manage. Currently, the static voter set is managed by the supplier of a `.properties` file which contains `controller.quorum.voters`. In the static quorum case, the requirements from kafka on an orchestration layer to safely manage the value of `controller.quorum.voters` config is pretty straightforward: supply the same value on all nodes all the time forever. I think it is more obvious to operators, without needing to know too much about how KRaft works, that having different values for `controller.quorum.voters` is incorrect and unsafe. The dynamic voter set's contents is initially managed by the caller of `kafka-storage format`, and then it is managed by KRaft itself. I will just focus on what a standalone dynamic controller deployment would look like without formatting, but bootstrapping a dynamic quorum with multiple controllers is unsafe for the same reasons. In order to remove the formatting requirement for dynamic clusters, we could imagine having something like `controller.quorum.standalone.enabled`. When that config is defined during startup, kafka writes the bootstrapping VotersRecord and KRaftVersion that would be done during formatting. However, the requirements from kafka on an orchestration layer to safely manage this "standalone" config are more complicated than the static quorum case. They are also not obvious without prior knowledge of bootstrapping quorums. This config should only be set on one node whenever the cluster has not been "bootstrapped," and it cannot be present on any nodes during startup if the cluster has already been "bootstrapped." If this config is present after the cluster has a voter set, it can result in multiple KRaft leaders if a node with the standalone config defined experiences data loss and tries to restart. This "unsafeness" WRT dynamic quorum also applies to `kafka-storage format,` but that CLI does not bootstrap with dynamic quorum by default, as the user has to specify one of `--standalone`, `--initial-controllers`, or `--no-initial-controllers` when `controller.quorum.voters` is not defined. Maybe that is a sufficient argument that it is okay to introduce the `controller.quorum.standalone.enabled` static config as part of this KIP. Interested to know what you think about this. Best, Kevin Wu On Tue, Mar 31, 2026 at 11:34 AM Jun Rao via dev <[email protected]> wrote: > Hi, Kevin, > > Thanks for the reply. > > JR12. https://kafka.apache.org/42/operations/kraft/ specifies two > deployment methods for KRaft: static and dynamic. Are you saying that > dynamic still requires formatting while static doesn't? Could you explain > why there is difference? BTW, which method do we recommend? > > Jun > > On Mon, Mar 30, 2026 at 9:03 AM Kevin Wu <[email protected]> wrote: > > > Hi Jun, > > > > Thanks for the reply. > > > > RE JR10: Yes, I will update the KIP to reflect that. > > > > RE JR11: Yeah, I think it is fine to write V2. > > > > RE JR 12: Is this in reference to KIP-853: Dynamic Quorum > Reconfiguration? > > If so, see: "However, operators still have the option to format nodes to > > set the MV, feature versions, scram credentials, or to properly > provision a > > kraft.version=1 cluster." In order to bootstrap any dynamic quorum (i.e. > > kraft.version=1) with an initial voter set, it is required to format a > > controller(s) with either `--standalone` or `--initial-controllers` so > that > > a KRaft VotersRecord is part of the 0-0.checkpoint. Formatting > controllers > > is still needed if you want to specify a non-default feature level or > > metadata version, and kraft.version=1 would be a "non-default" KRaft > > version (mainly because it is not correct without formatting, described > > below). > > > > I'm not sure if removing this formatting requirement for new KIP-853 > > clusters is in-scope for this KIP. The main issue with this is: How does > a > > node know it can safely write a "bootstrapping" 0-0.checkpoint with the > > KRaft VotersRecord on startup of the kafka process without knowing any > > state of the cluster? This can lead to split-brain when a node writes > this > > for a cluster who has already elected a leader. Currently, the caller of > > the kafka-storage format command is responsible for writing this exactly > > once for the lifetime of the cluster. > > > > Operators still have the option of starting kafka without formatting, and > > then upgrading the kraft version to kraft.version=1. This path allows > for a > > dynamic quorum without formatting the cluster. > > > > Thanks, > > Kevin Wu > > > > On Fri, Mar 27, 2026 at 4:20 PM Jun Rao via dev <[email protected]> > > wrote: > > > > > Hi, Kevin, > > > > > > Thanks for the updated KIP. It's better if we can remove the formatting > > > requirements for all nodes. > > > > > > JR10. "The reason for this KIP is to remove the requirement of brokers > > > needing to run kafka-storage format before starting Kafka." > > > Should we change brokers to nodes? > > > > > > JR11. "When --cluster-id is specified, the formatter writes > > > meta.properties V1." > > > It's a bit weird for the new code to write in V1 format. Could it write > > in > > > V2 format? > > > > > > JR12. Without formatting, is it true that one can only bootstrap a > > > standalone controller? In other words, does bootstrapping with multiple > > > controllers still require formatting? > > > > > > Jun > > > > > > On Thu, Mar 19, 2026 at 1:39 AM Kevin Wu <[email protected]> > wrote: > > > > > > > Hi José, > > > > > > > > Thanks for the replies and questions. > > > > > > > > RE JS1: "Can you clarify that this KIP removes the need for all Kafka > > > nodes > > > > to be formatted prior to starting Kafka." Hmmm, I guess in the static > > > > cluster case that skips formatting having a newer software version + > > > older > > > > MV is not a possible case, so I will remove that mention from the > KIP. > > We > > > > should default to the latest MV if we skip formatting, which will > > support > > > > writing a ClusterIdRecord. > > > > > > > > Right now, it is not completely clear to me how we can allow > bootstrap > > > > controllers (this applies mainly for kraft.version=0, since > > > kraft.version=1 > > > > cannot elect a leader without proper formatting) to also skip > > formatting. > > > > That is why I said in the proposed changes: "*Remove the requirement > of > > > > brokers and observer controllers to format before starting kafka"*. I > > > agree > > > > that KRaft can still elect a leader without clusterId in this case, > but > > > I'm > > > > not completely sure how a QuorumController with an "empty" clusterId > > > which > > > > needs to be set later, should behave. My working idea is detailed in > RE > > > > JS6. This is required because the active controller needs to > generate a > > > > clusterId and write it back to KRaft upon activation in order for the > > > > committed `ClusterIdRecord` to appear in records passed to > > > > `RaftListener#handleCommit()`, so we cannot block its initialization. > > > > Keeping the assumption that QuorumController.clusterId is final and > > > > non-null would be nice, but that requires all KRaft voters to format > > > with a > > > > cluster.id. Let me know what you think about the best way to remove > > this > > > > requirement. > > > > > > > > RE JS2: My plan was to continue to write meta.properties V1 during > > > > formatting with a `cluster.id` field like today, but also write a > > > > `ClusterIdRecord` to the bootstrap snapshot for redundancy if the MV > > > > supports it (I'm not sure if kafka is expected to handle only partial > > log > > > > directory corruption/destruction). If the "bootstrap controller > cluster > > > id > > > > check" from JS4 is correct, then the initial active controller is > > > > guaranteed to have a non-null `cluster.id` in meta.properties. So > long > > > as > > > > the MV supports it, the active controller would then write > > > ClusterIdRecord > > > > as part of the bootstrap records. > > > > > > > > RE JS3: When I said this, I meant that the restriction of waiting for > > the > > > > discovery of cluster.id to persist it to meta.properties during > broker > > > > startup is no more restrictive than what already currently exists, > > which > > > is > > > > being caught up to the HWM in order to register with the active > > > controller. > > > > > > > > RE JS 4: Yeah, I thought about this, specifically around the > > > > kraft.version=1 case since it is less straightforward what a > "bootstrap > > > > controller" is. Under the current design, in kraft.version=0, any > node > > > who > > > > is part of the `controller.quorum.voters` config must have > > > > `meta.properties` with `cluster.id`. In kraft.version=1, any node > who > > > has > > > > a > > > > `0-0.checkpoint` is considered a "bootstrap controller." This is a > > > > heuristic, but I believe it is correct, since in order for the > > > > 0-0.checkpoint to not exist on a node which formatted with > --standalone > > > or > > > > --initial-controllers, there must have either been another checkpoint > > > with > > > > committed records, which imply an elected initial leader, or a disk > > loss. > > > > Whenever a voter with id X and initial directory-id A comes back as > (X, > > > B), > > > > this process incarnation is an observer from the perspective of > KRaft, > > > and > > > > I think we can assume it has neither `meta.properties` or > > > `0-0.checkpoint` > > > > if the operator did not format it (assumption from RE JS2 about the > > kinds > > > > of storage failures we expect to handle are not partial directory > > > > failures). In this case, the "bootstrap controller" check does not > > apply > > > to > > > > (X, B), and if auto-join is enabled, it will follow the steps > detailed > > in > > > > RE JS5 to recover and rejoin the voter set. If we remove the > > requirement > > > on > > > > all nodes to format, then we would not need to implement these > checks. > > > > > > > > RE JS5: An observer without clusterId who can auto-join will fetch > > until > > > > its KafkaRaftClient updates the cluster id in-memory (basically, > > > auto-join > > > > is off until it discovers the leader's clusterId). If the observer > has > > > > clusterId, it needs to match the leader's to perform a successful > > fetch, > > > > which is required for successfully adding a voter via auto-join. > > > > > > > > RE JS6: Apologies, I meant to say a MetadataPublisher registered to > the > > > > MetadataLoader. Although, looking at this again, maybe this > discovery + > > > > persistence of clusterId can be handled by a new RaftListener > instead. > > I > > > > don't think we need the overhead of the MetadataImage + MetadataDelta > > for > > > > this feature since a RaftListener's `handleCommit()` and > > > > `handleLoadSnapshot()` contain `ClusterIdRecord`. However, this means > > > > needing a third listener besides the MetadataLoader and > > > QuorumMetaListener, > > > > and therefore an additional call to log#read() when handling KRaft > > > commits > > > > + snapshots. From my reading, it seems like the Kafka log layer does > > not > > > > attempt any caching, and instead we rely on the OS page cache. > Because > > of > > > > this, I think we should be using MetadataPublisher, but let me know > > what > > > > you think. > > > > > > > > I am thinking of using an AtomicReference<String> to represent the > > > > clusterId in-memory. This RaftListener/MetadataPublisher will be the > > only > > > > writer to this value if it is not already defined by meta.properties, > > but > > > > there are many readers of this value. The initial value of this > > reference > > > > is null or the cluster.id from meta.properties. Upon reading > > > > `ClusterIdRecord`, the listener will throw an exception if it has a > > > > non-null clusterId and reads a ClusterIdRecord with a different ID. > If > > it > > > > does not have cluster.id set and reads a ClusterIdRecord, it will > > update > > > > the AtomicReference and persist cluster.id to meta.properties. Let > me > > > know > > > > if this approach sounds reasonable to you. > > > > > > > > RE JS7: From what I understand about MetaPropertiesEnsemble and its > > > > verify() method, I think it is reasonable to say our > > > > RaftListener/MetadataPublisher will know how many (if any) > > > > `meta.properties` files it is responsible for persisting cluster.id > to > > > > during the current process incarnation when it starts up. Currently > we > > > only > > > > validate the MetaPropertiesEnsemble in two places: during formatting, > > and > > > > during node startup. From what I understand, scenarios 1 and 2 should > > > only > > > > occur alongside a restart of the kafka process (to generate a new > > > > directory-id and/or update log.dirs), but please correct me if this > > > > assumption is wrong. I'm not sure if scenario 3 is referring to a > > partial > > > > write of a given meta.properties (i.e. it does not contain > cluster.id > > ), > > > or > > > > not writing the discovered cluster.id to all meta.properties files > on > > > the > > > > node before a crash. If a meta.properties does not exist in a > > > log/metadata > > > > log directory during startup, we need to write a V2 one without a > > > > cluster.id, > > > > but we would be aware of this. If we succeed writing cluster.id to > at > > > > least > > > > one meta.properties via the ClusterIdRecord, I believe it is safe to > > > write > > > > that same value to the other meta.properties upon restart if they > exist > > > > because cluster.id does not change. > > > > > > > > I may have previously removed this from the KIP, but given this > > > discussion, > > > > I believe it is only safe to update the in-memory cluster.id only > > after > > > > writing this to all meta.properties on a node. > > > > > > > > RE JS8: Okay, maybe I will just rewrite the section. My point was to > > say > > > > something like: a node's discovery of the leader's committed > > cluster.id > > > > relies on the discovery of a HWM and our > RaftListener/MetadataPublisher > > > to > > > > be registered with the raft client, and that we need to wait for > these > > > > things before the startup logic in Controller/BrokerServer executes. > > > > However, if our listener does not see the ClusterIdRecord in > > > `handleCommit` > > > > or `handleLoadSnapshot`, it can't do anything meaningful, so it is > more > > > > accurate to say we need to wait until ClusterIdRecord is committed. > > > > > > > > On Thu, Mar 19, 2026 at 12:57 AM José Armando García Sancio via dev < > > > > [email protected]> wrote: > > > > > > > > > Hi Kevin, Thanks for the KIP and excuse my delay response. > > > > > > > > > > JS1: Can you clarify that this KIP removes the need for all Kafka > > > > > nodes to be formatted pior to starting Kafka. However, this doesn't > > > > > prevent users from formatting their broker with a cluster ID if > they > > > > > prefer. This is especially needed for Kafka nodes formatted for a > > > > > cluster using an MV that doesn't support this feature. > > > > > > > > > > JS2: How are you planning to implement "kafka-storage format > > > > > --clusterid YYY --standalone"? Is that going to behave like it does > > > > > today by writing the cluster id to the meta.properties files? Or > are > > > > > you planning to write the cluster id using the ClusterIdRecord to > the > > > > > bootstrap.checkpoint or 0-0.checkpoint (after KIP-1170)? > > > > > > > > > > JS3: In one of your replies you say "Discovering the cluster id > value > > > > > for the first time would only require a single FetchSnapshot or a > > > > > Fetch of the bootstrap metadata records." This is not entirely > > > > > accurate. The best we can say is that brokers need to catch up to > the > > > > > HWM before they can send a registration requests to the active > > > > > controller or it can start a few internal component. However, the > > > > > broker already had this requirement prior to this KIP, so it is not > > > > > new. > > > > > > > > > > JS4: In the KIP you mention "if meta.properties does not exist and > > the > > > > > node is a bootstrap controller, throw a runtime exception." Can you > > > > > explain how you plan to implement this? One important aspect to > > > > > consider is that in KRaft voters (controllers) are identified by > the > > > > > node ID and directory ID. A node can recover from a disk failure by > > > > > coming back with the same node ID but a different directory ID. In > > > > > this case, the controller should auto-recover if the auto-join > > feature > > > > > is enabled. > > > > > > > > > > JS5: In the KIP you mention "One detail here is that observer > > > > > controllers with auto-join must wait until they have a cluster id > > > > > before trying to add or remove themselves." I understand the reason > > > > > for this requirement. If a node auto-joins the controller cluster, > > you > > > > > must guarantee that it knows the cluster id in case it becomes the > > > > > leader and needs to write the ClusterIDRecord. Can you elaborate on > > > > > your implementation plan? > > > > > > > > > > JS6: In the KIP you mention "This can be implemented as a > > > > > MetadataPublisher that registers to the raft client alongside the > > > > > MetadataLoader." Metadata publishers don't register with the KRaft > > > > > client. RaftClient.Listener register with the KRaft client. > Metadata > > > > > publisher register with the metadata loader instead. > > > > > > > > > > JS7: One complexity is that there is a meta.properties per log > > > > > directory and metadata log directory. This means that in the stable > > > > > case the cluster ID exists in all the meta.properties files. > > > > > Unfortunately, this may not be the case for several reasons: 1) the > > > > > disk was replaced, 2) a new disk was added, or 3) the write > operation > > > > > was only partially successful. How do you plan to handle this case? > > > > > Consider that the controller and the broker can run on the same JVM > > > > > and use a log directory different from the metadata log directory. > > > > > Controllers only read and write to the metadata log directory. > > > > > > > > > > JS8: In the KIP you mention "Learning of a HWM from the leader, > which > > > > > the leader allows for because it will send valid fetch responses > back > > > > > to nodes that do not have a cluster id." One implementation > > complexity > > > > > is that KRaft can discover the HWM and send a handleCommit event > > > > > without having fetched all data up to the HWM. What KRaft > guarantees > > > > > is that the active leader will not receive a handleLeaderChange > event > > > > > until it has caught up to the leader's epoch. How do you plan to > > > > > implement this? > > > > > > > > > > Thanks, > > > > > -- > > > > > -José > > > > > > > > > > > > > > >
