Hi Jun, Sounds good. I will make an explicit section to document when formatting is still required. Thanks again for the feedback and questions.
Best, Kevin Wu On Wed, Apr 1, 2026 at 12:02 PM Jun Rao via dev <[email protected]> wrote: > Hi, Kevin, > > Thanks for the explanation. I guess the special thing with dynamic > deployment is the need to write the VotersRecord. We can keep the design in > the KIP. Could you document when formatting is still required in the KIP? > > Jun > > On Tue, Mar 31, 2026 at 2:49 PM Kevin Wu <[email protected]> wrote: > > > Hi Jun, > > > > RE JR12: > > Dynamic quorums can technically not require formatting, but I fear it > will > > cause cluster misconfigurations more easily than the static quorum case. > > The main difference between the two quorum deployments is that static > > quorum cannot recover from data loss on a voter, whereas dynamic quorum > > can. Below is my reasoning for why maintaining this requirement for > dynamic > > clusters is better for Kafka operators given that formatting each node's > > disk is currently required. > > > > The KRaft voter set in both static and dynamic deployments is a piece of > > bootstrapping data that requires some orchestration to manage. Currently, > > the static voter set is managed by the supplier of a `.properties` file > > which contains `controller.quorum.voters`. In the static quorum case, the > > requirements from kafka on an orchestration layer to safely manage the > > value of `controller.quorum.voters` config is pretty straightforward: > > supply the same value on all nodes all the time forever. I think it is > more > > obvious to operators, without needing to know too much about how KRaft > > works, that having different values for `controller.quorum.voters` is > > incorrect and unsafe. > > > > The dynamic voter set's contents is initially managed by the caller of > > `kafka-storage format`, and then it is managed by KRaft itself. I will > just > > focus on what a standalone dynamic controller deployment would look like > > without formatting, but bootstrapping a dynamic quorum with multiple > > controllers is unsafe for the same reasons. In order to remove the > > formatting requirement for dynamic clusters, we could imagine having > > something like `controller.quorum.standalone.enabled`. When that config > is > > defined during startup, kafka writes the bootstrapping VotersRecord and > > KRaftVersion that would be done during formatting. However, the > > requirements from kafka on an orchestration layer to safely manage this > > "standalone" config are more complicated than the static quorum case. > They > > are also not obvious without prior knowledge of bootstrapping quorums. > This > > config should only be set on one node whenever the cluster has not been > > "bootstrapped," and it cannot be present on any nodes during startup if > the > > cluster has already been "bootstrapped." If this config is present after > > the cluster has a voter set, it can result in multiple KRaft leaders if a > > node with the standalone config defined experiences data loss and tries > to > > restart. > > > > This "unsafeness" WRT dynamic quorum also applies to `kafka-storage > > format,` but that CLI does not bootstrap with dynamic quorum by default, > as > > the user has to specify one of `--standalone`, `--initial-controllers`, > or > > `--no-initial-controllers` when `controller.quorum.voters` is not > defined. > > Maybe that is a sufficient argument that it is okay to introduce the > > `controller.quorum.standalone.enabled` static config as part of this KIP. > > Interested to know what you think about this. > > > > Best, > > Kevin Wu > > > > On Tue, Mar 31, 2026 at 11:34 AM Jun Rao via dev <[email protected]> > > wrote: > > > > > Hi, Kevin, > > > > > > Thanks for the reply. > > > > > > JR12. https://kafka.apache.org/42/operations/kraft/ specifies two > > > deployment methods for KRaft: static and dynamic. Are you saying that > > > dynamic still requires formatting while static doesn't? Could you > explain > > > why there is difference? BTW, which method do we recommend? > > > > > > Jun > > > > > > On Mon, Mar 30, 2026 at 9:03 AM Kevin Wu <[email protected]> > wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks for the reply. > > > > > > > > RE JR10: Yes, I will update the KIP to reflect that. > > > > > > > > RE JR11: Yeah, I think it is fine to write V2. > > > > > > > > RE JR 12: Is this in reference to KIP-853: Dynamic Quorum > > > Reconfiguration? > > > > If so, see: "However, operators still have the option to format nodes > > to > > > > set the MV, feature versions, scram credentials, or to properly > > > provision a > > > > kraft.version=1 cluster." In order to bootstrap any dynamic quorum > > (i.e. > > > > kraft.version=1) with an initial voter set, it is required to format > a > > > > controller(s) with either `--standalone` or `--initial-controllers` > so > > > that > > > > a KRaft VotersRecord is part of the 0-0.checkpoint. Formatting > > > controllers > > > > is still needed if you want to specify a non-default feature level or > > > > metadata version, and kraft.version=1 would be a "non-default" KRaft > > > > version (mainly because it is not correct without formatting, > described > > > > below). > > > > > > > > I'm not sure if removing this formatting requirement for new KIP-853 > > > > clusters is in-scope for this KIP. The main issue with this is: How > > does > > > a > > > > node know it can safely write a "bootstrapping" 0-0.checkpoint with > the > > > > KRaft VotersRecord on startup of the kafka process without knowing > any > > > > state of the cluster? This can lead to split-brain when a node writes > > > this > > > > for a cluster who has already elected a leader. Currently, the caller > > of > > > > the kafka-storage format command is responsible for writing this > > exactly > > > > once for the lifetime of the cluster. > > > > > > > > Operators still have the option of starting kafka without formatting, > > and > > > > then upgrading the kraft version to kraft.version=1. This path allows > > > for a > > > > dynamic quorum without formatting the cluster. > > > > > > > > Thanks, > > > > Kevin Wu > > > > > > > > On Fri, Mar 27, 2026 at 4:20 PM Jun Rao via dev < > [email protected]> > > > > wrote: > > > > > > > > > Hi, Kevin, > > > > > > > > > > Thanks for the updated KIP. It's better if we can remove the > > formatting > > > > > requirements for all nodes. > > > > > > > > > > JR10. "The reason for this KIP is to remove the requirement of > > brokers > > > > > needing to run kafka-storage format before starting Kafka." > > > > > Should we change brokers to nodes? > > > > > > > > > > JR11. "When --cluster-id is specified, the formatter writes > > > > > meta.properties V1." > > > > > It's a bit weird for the new code to write in V1 format. Could it > > write > > > > in > > > > > V2 format? > > > > > > > > > > JR12. Without formatting, is it true that one can only bootstrap a > > > > > standalone controller? In other words, does bootstrapping with > > multiple > > > > > controllers still require formatting? > > > > > > > > > > Jun > > > > > > > > > > On Thu, Mar 19, 2026 at 1:39 AM Kevin Wu <[email protected]> > > > wrote: > > > > > > > > > > > Hi José, > > > > > > > > > > > > Thanks for the replies and questions. > > > > > > > > > > > > RE JS1: "Can you clarify that this KIP removes the need for all > > Kafka > > > > > nodes > > > > > > to be formatted prior to starting Kafka." Hmmm, I guess in the > > static > > > > > > cluster case that skips formatting having a newer software > version > > + > > > > > older > > > > > > MV is not a possible case, so I will remove that mention from the > > > KIP. > > > > We > > > > > > should default to the latest MV if we skip formatting, which will > > > > support > > > > > > writing a ClusterIdRecord. > > > > > > > > > > > > Right now, it is not completely clear to me how we can allow > > > bootstrap > > > > > > controllers (this applies mainly for kraft.version=0, since > > > > > kraft.version=1 > > > > > > cannot elect a leader without proper formatting) to also skip > > > > formatting. > > > > > > That is why I said in the proposed changes: "*Remove the > > requirement > > > of > > > > > > brokers and observer controllers to format before starting > > kafka"*. I > > > > > agree > > > > > > that KRaft can still elect a leader without clusterId in this > case, > > > but > > > > > I'm > > > > > > not completely sure how a QuorumController with an "empty" > > clusterId > > > > > which > > > > > > needs to be set later, should behave. My working idea is detailed > > in > > > RE > > > > > > JS6. This is required because the active controller needs to > > > generate a > > > > > > clusterId and write it back to KRaft upon activation in order for > > the > > > > > > committed `ClusterIdRecord` to appear in records passed to > > > > > > `RaftListener#handleCommit()`, so we cannot block its > > initialization. > > > > > > Keeping the assumption that QuorumController.clusterId is final > and > > > > > > non-null would be nice, but that requires all KRaft voters to > > format > > > > > with a > > > > > > cluster.id. Let me know what you think about the best way to > > remove > > > > this > > > > > > requirement. > > > > > > > > > > > > RE JS2: My plan was to continue to write meta.properties V1 > during > > > > > > formatting with a `cluster.id` field like today, but also write > a > > > > > > `ClusterIdRecord` to the bootstrap snapshot for redundancy if the > > MV > > > > > > supports it (I'm not sure if kafka is expected to handle only > > partial > > > > log > > > > > > directory corruption/destruction). If the "bootstrap controller > > > cluster > > > > > id > > > > > > check" from JS4 is correct, then the initial active controller is > > > > > > guaranteed to have a non-null `cluster.id` in meta.properties. > So > > > long > > > > > as > > > > > > the MV supports it, the active controller would then write > > > > > ClusterIdRecord > > > > > > as part of the bootstrap records. > > > > > > > > > > > > RE JS3: When I said this, I meant that the restriction of waiting > > for > > > > the > > > > > > discovery of cluster.id to persist it to meta.properties during > > > broker > > > > > > startup is no more restrictive than what already currently > exists, > > > > which > > > > > is > > > > > > being caught up to the HWM in order to register with the active > > > > > controller. > > > > > > > > > > > > RE JS 4: Yeah, I thought about this, specifically around the > > > > > > kraft.version=1 case since it is less straightforward what a > > > "bootstrap > > > > > > controller" is. Under the current design, in kraft.version=0, any > > > node > > > > > who > > > > > > is part of the `controller.quorum.voters` config must have > > > > > > `meta.properties` with `cluster.id`. In kraft.version=1, any > node > > > who > > > > > has > > > > > > a > > > > > > `0-0.checkpoint` is considered a "bootstrap controller." This is > a > > > > > > heuristic, but I believe it is correct, since in order for the > > > > > > 0-0.checkpoint to not exist on a node which formatted with > > > --standalone > > > > > or > > > > > > --initial-controllers, there must have either been another > > checkpoint > > > > > with > > > > > > committed records, which imply an elected initial leader, or a > disk > > > > loss. > > > > > > Whenever a voter with id X and initial directory-id A comes back > as > > > (X, > > > > > B), > > > > > > this process incarnation is an observer from the perspective of > > > KRaft, > > > > > and > > > > > > I think we can assume it has neither `meta.properties` or > > > > > `0-0.checkpoint` > > > > > > if the operator did not format it (assumption from RE JS2 about > the > > > > kinds > > > > > > of storage failures we expect to handle are not partial directory > > > > > > failures). In this case, the "bootstrap controller" check does > not > > > > apply > > > > > to > > > > > > (X, B), and if auto-join is enabled, it will follow the steps > > > detailed > > > > in > > > > > > RE JS5 to recover and rejoin the voter set. If we remove the > > > > requirement > > > > > on > > > > > > all nodes to format, then we would not need to implement these > > > checks. > > > > > > > > > > > > RE JS5: An observer without clusterId who can auto-join will > fetch > > > > until > > > > > > its KafkaRaftClient updates the cluster id in-memory (basically, > > > > > auto-join > > > > > > is off until it discovers the leader's clusterId). If the > observer > > > has > > > > > > clusterId, it needs to match the leader's to perform a successful > > > > fetch, > > > > > > which is required for successfully adding a voter via auto-join. > > > > > > > > > > > > RE JS6: Apologies, I meant to say a MetadataPublisher registered > to > > > the > > > > > > MetadataLoader. Although, looking at this again, maybe this > > > discovery + > > > > > > persistence of clusterId can be handled by a new RaftListener > > > instead. > > > > I > > > > > > don't think we need the overhead of the MetadataImage + > > MetadataDelta > > > > for > > > > > > this feature since a RaftListener's `handleCommit()` and > > > > > > `handleLoadSnapshot()` contain `ClusterIdRecord`. However, this > > means > > > > > > needing a third listener besides the MetadataLoader and > > > > > QuorumMetaListener, > > > > > > and therefore an additional call to log#read() when handling > KRaft > > > > > commits > > > > > > + snapshots. From my reading, it seems like the Kafka log layer > > does > > > > not > > > > > > attempt any caching, and instead we rely on the OS page cache. > > > Because > > > > of > > > > > > this, I think we should be using MetadataPublisher, but let me > know > > > > what > > > > > > you think. > > > > > > > > > > > > I am thinking of using an AtomicReference<String> to represent > the > > > > > > clusterId in-memory. This RaftListener/MetadataPublisher will be > > the > > > > only > > > > > > writer to this value if it is not already defined by > > meta.properties, > > > > but > > > > > > there are many readers of this value. The initial value of this > > > > reference > > > > > > is null or the cluster.id from meta.properties. Upon reading > > > > > > `ClusterIdRecord`, the listener will throw an exception if it > has a > > > > > > non-null clusterId and reads a ClusterIdRecord with a different > ID. > > > If > > > > it > > > > > > does not have cluster.id set and reads a ClusterIdRecord, it > will > > > > update > > > > > > the AtomicReference and persist cluster.id to meta.properties. > Let > > > me > > > > > know > > > > > > if this approach sounds reasonable to you. > > > > > > > > > > > > RE JS7: From what I understand about MetaPropertiesEnsemble and > its > > > > > > verify() method, I think it is reasonable to say our > > > > > > RaftListener/MetadataPublisher will know how many (if any) > > > > > > `meta.properties` files it is responsible for persisting > > cluster.id > > > to > > > > > > during the current process incarnation when it starts up. > Currently > > > we > > > > > only > > > > > > validate the MetaPropertiesEnsemble in two places: during > > formatting, > > > > and > > > > > > during node startup. From what I understand, scenarios 1 and 2 > > should > > > > > only > > > > > > occur alongside a restart of the kafka process (to generate a new > > > > > > directory-id and/or update log.dirs), but please correct me if > this > > > > > > assumption is wrong. I'm not sure if scenario 3 is referring to a > > > > partial > > > > > > write of a given meta.properties (i.e. it does not contain > > > cluster.id > > > > ), > > > > > or > > > > > > not writing the discovered cluster.id to all meta.properties > files > > > on > > > > > the > > > > > > node before a crash. If a meta.properties does not exist in a > > > > > log/metadata > > > > > > log directory during startup, we need to write a V2 one without a > > > > > > cluster.id, > > > > > > but we would be aware of this. If we succeed writing cluster.id > to > > > at > > > > > > least > > > > > > one meta.properties via the ClusterIdRecord, I believe it is safe > > to > > > > > write > > > > > > that same value to the other meta.properties upon restart if they > > > exist > > > > > > because cluster.id does not change. > > > > > > > > > > > > I may have previously removed this from the KIP, but given this > > > > > discussion, > > > > > > I believe it is only safe to update the in-memory cluster.id > only > > > > after > > > > > > writing this to all meta.properties on a node. > > > > > > > > > > > > RE JS8: Okay, maybe I will just rewrite the section. My point was > > to > > > > say > > > > > > something like: a node's discovery of the leader's committed > > > > cluster.id > > > > > > relies on the discovery of a HWM and our > > > RaftListener/MetadataPublisher > > > > > to > > > > > > be registered with the raft client, and that we need to wait for > > > these > > > > > > things before the startup logic in Controller/BrokerServer > > executes. > > > > > > However, if our listener does not see the ClusterIdRecord in > > > > > `handleCommit` > > > > > > or `handleLoadSnapshot`, it can't do anything meaningful, so it > is > > > more > > > > > > accurate to say we need to wait until ClusterIdRecord is > committed. > > > > > > > > > > > > On Thu, Mar 19, 2026 at 12:57 AM José Armando García Sancio via > > dev < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > Hi Kevin, Thanks for the KIP and excuse my delay response. > > > > > > > > > > > > > > JS1: Can you clarify that this KIP removes the need for all > Kafka > > > > > > > nodes to be formatted pior to starting Kafka. However, this > > doesn't > > > > > > > prevent users from formatting their broker with a cluster ID if > > > they > > > > > > > prefer. This is especially needed for Kafka nodes formatted > for a > > > > > > > cluster using an MV that doesn't support this feature. > > > > > > > > > > > > > > JS2: How are you planning to implement "kafka-storage format > > > > > > > --clusterid YYY --standalone"? Is that going to behave like it > > does > > > > > > > today by writing the cluster id to the meta.properties files? > Or > > > are > > > > > > > you planning to write the cluster id using the ClusterIdRecord > to > > > the > > > > > > > bootstrap.checkpoint or 0-0.checkpoint (after KIP-1170)? > > > > > > > > > > > > > > JS3: In one of your replies you say "Discovering the cluster id > > > value > > > > > > > for the first time would only require a single FetchSnapshot > or a > > > > > > > Fetch of the bootstrap metadata records." This is not entirely > > > > > > > accurate. The best we can say is that brokers need to catch up > to > > > the > > > > > > > HWM before they can send a registration requests to the active > > > > > > > controller or it can start a few internal component. However, > the > > > > > > > broker already had this requirement prior to this KIP, so it is > > not > > > > > > > new. > > > > > > > > > > > > > > JS4: In the KIP you mention "if meta.properties does not exist > > and > > > > the > > > > > > > node is a bootstrap controller, throw a runtime exception." Can > > you > > > > > > > explain how you plan to implement this? One important aspect to > > > > > > > consider is that in KRaft voters (controllers) are identified > by > > > the > > > > > > > node ID and directory ID. A node can recover from a disk > failure > > by > > > > > > > coming back with the same node ID but a different directory ID. > > In > > > > > > > this case, the controller should auto-recover if the auto-join > > > > feature > > > > > > > is enabled. > > > > > > > > > > > > > > JS5: In the KIP you mention "One detail here is that observer > > > > > > > controllers with auto-join must wait until they have a cluster > id > > > > > > > before trying to add or remove themselves." I understand the > > reason > > > > > > > for this requirement. If a node auto-joins the controller > > cluster, > > > > you > > > > > > > must guarantee that it knows the cluster id in case it becomes > > the > > > > > > > leader and needs to write the ClusterIDRecord. Can you > elaborate > > on > > > > > > > your implementation plan? > > > > > > > > > > > > > > JS6: In the KIP you mention "This can be implemented as a > > > > > > > MetadataPublisher that registers to the raft client alongside > the > > > > > > > MetadataLoader." Metadata publishers don't register with the > > KRaft > > > > > > > client. RaftClient.Listener register with the KRaft client. > > > Metadata > > > > > > > publisher register with the metadata loader instead. > > > > > > > > > > > > > > JS7: One complexity is that there is a meta.properties per log > > > > > > > directory and metadata log directory. This means that in the > > stable > > > > > > > case the cluster ID exists in all the meta.properties files. > > > > > > > Unfortunately, this may not be the case for several reasons: 1) > > the > > > > > > > disk was replaced, 2) a new disk was added, or 3) the write > > > operation > > > > > > > was only partially successful. How do you plan to handle this > > case? > > > > > > > Consider that the controller and the broker can run on the same > > JVM > > > > > > > and use a log directory different from the metadata log > > directory. > > > > > > > Controllers only read and write to the metadata log directory. > > > > > > > > > > > > > > JS8: In the KIP you mention "Learning of a HWM from the leader, > > > which > > > > > > > the leader allows for because it will send valid fetch > responses > > > back > > > > > > > to nodes that do not have a cluster id." One implementation > > > > complexity > > > > > > > is that KRaft can discover the HWM and send a handleCommit > event > > > > > > > without having fetched all data up to the HWM. What KRaft > > > guarantees > > > > > > > is that the active leader will not receive a handleLeaderChange > > > event > > > > > > > until it has caught up to the leader's epoch. How do you plan > to > > > > > > > implement this? > > > > > > > > > > > > > > Thanks, > > > > > > > -- > > > > > > > -José > > > > > > > > > > > > > > > > > > > > > > > > > > > >
