wuchong commented on code in PR #2781:
URL: https://github.com/apache/fluss/pull/2781#discussion_r3035093684
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -122,7 +130,25 @@ public void isLeader() {
e);
}
try {
+ // to avoid split-brain
+ Optional<ZkEpoch> optionalEpoch =
+
zkClient.fenceBecomeCoordinatorLeader(serverId);
+ optionalEpoch.ifPresent(
+ integer ->
+ coordinatorContext
+
.setCoordinatorEpochAndZkVersion(
+
optionalEpoch
+
.get()
+
.getCoordinatorEpoch(),
+
optionalEpoch
+
.get()
+
.getCoordinatorEpochZkVersion()));
initLeaderServices.run();
+ } catch (CoordinatorEpochFencedException
e) {
+ LOG.warn(
+ "Coordinator server {} has
been fenced and not become leader successfully.",
+ serverId);
+ throw e;
Review Comment:
When encountering a `FencedException`, we need to transition the current
coordinator back to the standby state, for example, by invoking `notLeader()`.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -392,6 +392,12 @@ public class ConfigOptions {
+ " (“50100,50101”), ranges
(“50100-50200”) or a combination of both."
+ "This option is deprecated. Please use
bind.listeners instead, which provides a more flexible configuration for
multiple ports");
+ public static final ConfigOption<Integer> COORDINATOR_ID =
Review Comment:
unused, remove
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
Review Comment:
this partitioned-table reassignment path still updates ZK without a
coordinator epoch / version check, so a fenced coordinator can overwrite
`/partitionIds/<partitionId>` after leadership changes.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -122,7 +130,25 @@ public void isLeader() {
e);
}
try {
+ // to avoid split-brain
+ Optional<ZkEpoch> optionalEpoch =
+
zkClient.fenceBecomeCoordinatorLeader(serverId);
+ optionalEpoch.ifPresent(
+ integer ->
+ coordinatorContext
+
.setCoordinatorEpochAndZkVersion(
+
optionalEpoch
+
.get()
+
.getCoordinatorEpoch(),
+
optionalEpoch
+
.get()
+
.getCoordinatorEpochZkVersion()));
Review Comment:
The `coordinatorContext` is not thread-safe and must only be updated and
accessed within the `CoordinatorEventProcessor`. Since `leaderCallbackExecutor`
operates on an asynchronous thread, mutating the context there violates thread
safety.
To resolve this, move the `zkClient.fenceBecomeCoordinatorLeader` call into
`org.apache.fluss.server.coordinator.CoordinatorServer#initCoordinatorLeader`,
placing it before the synchronized lock block. Similarly, move
`registerCoordinatorLeader()` before the synchronized block. Capture the
`ZkEpoch` returned by `fenceBecomeCoordinatorLeader` and pass it to the
`CoordinatorEventProcessor` constructor to represent the current leader epoch.
This allows `CoordinatorEventProcessor.startup()` to safely initialize
`CoordinatorContext` with the `ZkEpoch`, as the operation remains within the
single-threaded execution scope of the processor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]