soarez commented on code in PR #15810:
URL: https://github.com/apache/kafka/pull/15810#discussion_r1581762636
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) {
}
/**
- * Trigger a leader epoch bump if one is needed.
- *
- * We need to bump the leader epoch if:
- * 1. The leader changed, or
- * 2. The new replica list does not contain all the nodes that the old
replica list did.
- *
- * Changes that do NOT fall in any of these categories will increase the
partition epoch, but
- * not the leader epoch. Note that if the leader epoch increases, the
partition epoch will
- * always increase as well; there is no case where the partition epoch
increases more slowly
- * than the leader epoch.
+ * Trigger a leader epoch bump if one is needed because of replica
reassignment.
*
- * If the PartitionChangeRecord sets the leader field to something other
than
- * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That
takes care of
- * case 1. In this function, we check for cases 2 and 3, and handle them
by manually
- * setting record.leader to the current leader.
- *
- * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica
manager
- * that required that the leader epoch be bump whenever the ISR shrank. In
MV 3.6 this leader
- * bump is not required when the ISR shrinks. Note, that the leader epoch
is never increased if
- * the ISR expanded.
+ * Note that if the leader epoch increases, the partition epoch will
always increase as well; there is no
+ * case where the partition epoch increases more slowly than the leader
epoch.
+ */
+ void
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord
record) {
Review Comment:
This issue predates the PR, but I want to say it anyway: It's not great that
both these method are named and documented in a way that conveys a leader epoch
bump but what they do is merely set the leader value in the change record. One
needs to know about `PartitionRegistration#merge(PartitionChangeRecord)` to
make the link between setting the leader on the PartitionChangeRecord and
provoking a leader epoch bump.
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) {
}
/**
- * Trigger a leader epoch bump if one is needed.
- *
- * We need to bump the leader epoch if:
- * 1. The leader changed, or
- * 2. The new replica list does not contain all the nodes that the old
replica list did.
- *
- * Changes that do NOT fall in any of these categories will increase the
partition epoch, but
- * not the leader epoch. Note that if the leader epoch increases, the
partition epoch will
- * always increase as well; there is no case where the partition epoch
increases more slowly
- * than the leader epoch.
+ * Trigger a leader epoch bump if one is needed because of replica
reassignment.
*
- * If the PartitionChangeRecord sets the leader field to something other
than
- * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That
takes care of
- * case 1. In this function, we check for cases 2 and 3, and handle them
by manually
- * setting record.leader to the current leader.
- *
- * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica
manager
- * that required that the leader epoch be bump whenever the ISR shrank. In
MV 3.6 this leader
- * bump is not required when the ISR shrinks. Note, that the leader epoch
is never increased if
- * the ISR expanded.
+ * Note that if the leader epoch increases, the partition epoch will
always increase as well; there is no
+ * case where the partition epoch increases more slowly than the leader
epoch.
+ */
+ void
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord
record) {
+ if (record.leader() != NO_LEADER_CHANGE) {
+ // The leader is already changing, so there will already be a
leader epoch bump.
+ return;
+ }
+ if (!Replicas.contains(targetReplicas, partition.replicas)) {
+ // If the new replica list does not contain all the brokers that
the old one did,
+ // ensure that there will be a leader epoch bump by setting the
leader field.
+ record.setLeader(partition.leader);
+ }
+ }
+
+ /**
+ * Trigger a leader epoch bump if one is needed because of an ISR shrink.
*
- * In MV 3.6 and beyond, if the controller is in ZK migration mode, the
leader epoch must
- * be bumped during ISR shrink for compatability with ZK brokers.
+ * Note that it's important to call this function only after we have set
the ISR field in
+ * the PartitionChangeRecord.
*/
- void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
- if (record.leader() == NO_LEADER_CHANGE) {
- boolean bumpLeaderEpochOnIsrShrink =
metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
-
- if (!Replicas.contains(targetReplicas, partition.replicas)) {
- // Reassignment
- record.setLeader(partition.leader);
- } else if (bumpLeaderEpochOnIsrShrink &&
!Replicas.contains(targetIsr, partition.isr)) {
- // ISR shrink
- record.setLeader(partition.leader);
- }
+ void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord
record) {
+ if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() ||
zkMigrationEnabled)) {
+ // We only need to bump the leader epoch on an ISR shrink in two
cases:
+ //
+ // 1. In older metadata versions before 3.6, there was a bug
(KAFKA-15021) in the
+ // broker replica manager that required that the leader epoch
be bumped whenever
+ // the ISR shrank. (This was never necessary for EXPANSIONS,
only SHRINKS.)
+ //
+ // 2. During ZK migration, we bump the leader epoch during all ISR
shrinks, in order
+ // to maintain compatibility with migrating brokers that are still
in ZK mode.
+ //
+ // If we're not in either case, we can exit here.
+ return;
+ }
+ if (record.leader() != NO_LEADER_CHANGE) {
+ // The leader is already changing, so there will already be a
leader epoch bump.
+ return;
+ }
+ if (record.isr() == null) {
+ // The ISR is not changing.
+ return;
+ }
+ if (!Replicas.contains(record.isr(), partition.isr)) {
+ // If the new ISR list does not contain all the brokers that the
old one did,
+ // ensure that there will be a leader epoch bump by setting the
leader field.
+ record.setLeader(partition.leader);
}
}
- private void completeReassignmentIfNeeded() {
+ /**
+ * @return true if the reassignment was completed; false otherwise.
+ */
+ private boolean completeReassignmentIfNeeded() {
Review Comment:
This change doesn't seem to be necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]