wuchong commented on code in PR #2781:
URL: https://github.com/apache/fluss/pull/2781#discussion_r3035276900
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -337,7 +338,9 @@ public void completeDeleteTable(long tableId) {
// delete bucket assignments node, which will also delete the bucket
state node,
// so that all the zk nodes related to this table are deleted.
rethrowIfIsNotNoNodeException(
- () -> zookeeperClient.deleteTableAssignment(tableId),
+ () ->
+ zookeeperClient.deleteTableAssignment(
+ tableId,
ZkVersion.MATCH_ANY_VERSION.getVersion()),
Review Comment:
This async cleanup can run after delete work has already been queued.
Passing MATCH_ANY_VERSION disables coordinator epoch fencing, so a stale
coordinator can still delete the table-assignment znode after leadership
changes. We should pass in the coordinator zk version for a safe deletion. The
coordiantor zk version can be got from CoordinatorContext when calling
MetadataManager.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -399,7 +404,8 @@ public long createTable(
long tableId = zookeeperClient.getTableIdAndIncrement();
if (tableAssignment != null) {
// register table assignment
- zookeeperClient.registerTableAssignment(tableId,
tableAssignment);
+ zookeeperClient.registerTableAssignment(
+ tableId, tableAssignment,
ZkVersion.MATCH_ANY_VERSION.getVersion());
Review Comment:
ditto
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -127,11 +129,19 @@
import static java.util.stream.Collectors.toMap;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
+import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
* This class includes methods for write/read various metadata (leader
address, tablet server
* registration, table assignment, table, schema) in Zookeeper.
+ *
+ * <p>In some method, 'expectedZkVersion' is used to execute an epoch
Zookeeper version check. We
+ * have the following principals to judge if it's necessary to execute epoch
Zookeeper version
+ * check. If all condition met, we need to execute epoch Zookeeper version
check. 1. The method
+ * create/modify/delete Zk node. 2. It's executed by coordinator server. 3. It
is about
+ * metadata(table/partition/leaderAndIsr) rather than server info or ACL info.
4. The Zk node is
+ * persistent rather than ephemeral.
Review Comment:
I think current epoch check is not clear about which type of operations
should be added epoch check. I think not all zk nodes need the epoch check,
only the assignment and LeaderAndIsr nodes need. So, could you update the
principles with following?
```
Conditions requiring epoch checks (all must be met):
┌─────────────────────────────────────────────────────────┐
│ 1. Invoked by the Coordinator (not the TabletServer) │
│ 2. Operates on persistent nodes (not ephemeral) │
│ 3. Constitutes a "control plane" operation: │
│ partition assignment or LeaderAndIsr election │
│ 4. Concurrent access to the same path by old and new │
│ leaders during leader failover │
│ 5. No other mechanisms (optimistic locking, │
│ idempotency, or reloading) provide fallback │
└─────────────────────────────────────────────────────────┘
In practice, only two types of operations truly require this:
- CRUD for Table/Partition Assignment (assignment decisions).
- CRUD for LeaderAndIsr (leader election results).
These operations are inevitably executed concurrently by the old and new
coordinators during failover (as the new leader immediately reassigns
partitions), and overwrites cannot be automatically recovered.
All other operations do not require epoch checks because:
- DDL operations are protected against concurrency by client reconnection
mechanisms.
- TabletServer operations are unaffected by coordinator failovers.
- ACLs and Configs have their own version control or idempotency guarantees.
- Ephemeral nodes are managed via session lifecycle.
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -346,7 +349,9 @@ public void completeDeletePartition(long partitionId) {
// delete partition assignments node, which will also delete the
bucket state node,
// so that all the zk nodes related to this partition are deleted.
rethrowIfIsNotNoNodeException(
- () -> zookeeperClient.deletePartitionAssignment(partitionId),
+ () ->
+ zookeeperClient.deletePartitionAssignment(
+ partitionId,
ZkVersion.MATCH_ANY_VERSION.getVersion()),
Review Comment:
ditto, we need the true zk version here.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -197,6 +209,43 @@ public void registerCoordinatorServer(CoordinatorAddress
coordinatorAddress) thr
LOG.info("Registered Coordinator server {} at path {}.",
coordinatorAddress, path);
}
+ /**
+ * Become coordinator leader. This method is a step after
electCoordinatorLeader() and before
+ * registerCoordinatorLeader(). This is to ensure the coordinator get and
update the coordinator
+ * epoch and coordinator epoch zk version.
+ */
+ public Optional<ZkEpoch> fenceBecomeCoordinatorLeader(String
coordinatorId) throws Exception {
+ ensureEpochZnodeExists();
+
+ try {
+ ZkEpoch getEpoch = getCurrentEpoch();
+ int currentCoordinatorEpoch = getEpoch.getCoordinatorEpoch();
+ int currentCoordinatorEpochZkVersion =
getEpoch.getCoordinatorEpochZkVersion();
+ int newCoordinatorEpoch = currentCoordinatorEpoch + 1;
+ LOG.info(
+ "Coordinator leader {} tries to update epoch. Current
epoch={}, Zookeeper version={}, new epoch={}",
+ coordinatorId,
+ currentCoordinatorEpoch,
+ currentCoordinatorEpochZkVersion,
+ newCoordinatorEpoch);
+
+ // atomically update epoch
+ zkClient.setData()
+ .withVersion(currentCoordinatorEpochZkVersion)
+ .forPath(
+ ZkData.CoordinatorEpochZNode.path(),
+
ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch));
+
+ return Optional.of(getEpoch.nextZkEpoch());
Review Comment:
The `ZkEpoch.nextZkEpoch()` simply increments ZK version by exactly 1. While
this is guaranteed by ZK spec for a successful conditional `setData`, it would
be more defensive to capture
the `Stat` returned by `setData()`.
```java
Stat stat =
zkClient.setData()
.withVersion(currentCoordinatorEpochZkVersion)
.forPath(
ZkData.CoordinatorEpochZNode.path(),
ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch));
return Optional.of(new ZkEpoch(newCoordinatorEpoch,
stat.getVersion()));
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -346,15 +428,16 @@ public void updatePartitionAssignment(long partitionId,
PartitionAssignment part
partitionId);
}
- public void deleteTableAssignment(long tableId) throws Exception {
+ public void deleteTableAssignment(long tableId, int expectedZkVersion)
throws Exception {
String path = TableIdZNode.path(tableId);
- zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+ deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
LOG.info("Deleted table assignment for table id {}.", tableId);
}
- public void deletePartitionAssignment(long partitionId) throws Exception {
+ public void deletePartitionAssignment(long partitionId, int
expectedZkVersion)
+ throws Exception {
String path = PartitionIdZNode.path(partitionId);
- zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+ deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
Review Comment:
`deleteTableAssignment` and `deletePartitionAssignment` don't need epoch
check. The only production caller is
`MetadataManager.completeDeleteTable/completeDeletePartition`, which already
passes `MATCH_ANY_VERSION` — so the epoch check is effectively a no-op today.
More importantly, these are **idempotent cleanup operations** at the end of
the DDL delete flow. By the time they're called, all replicas are already in
`DeletionSuccessful` state. Even if a stale coordinator executes this cleanup,
it won't corrupt any live metadata — the table/partition is already marked for
deletion, and the new leader won't recreate the assignment for it.
Since epoch check provides no safety benefit here, I'd suggest removing the
`expectedZkVersion` parameter from these two methods and reverting to the
original `zkClient.delete().deletingChildrenIfNeeded().forPath(path)`. This
avoids the significant performance overhead of `deleteRecursiveWithEpochCheck`
(which turns 1 ZK call into O(2N) calls for a path with N children) without
sacrificing correctness.
If we can remove epoch check on these 2 methods, we can also remove the
`deleteRecursiveWithEpochCheck()` method.
##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -200,6 +200,10 @@ public void afterEach(ExtensionContext extensionContext)
throws Exception {
}
}
CompletableFuture.allOf(dropFutures.toArray(new
CompletableFuture[0])).join();
+
+ for (TabletServer tabletServer : tabletServers.values()) {
+ tabletServer.getReplicaManager().resetCoordinatorEpoch();
Review Comment:
Why is it necessary to manually reset the coordinator epoch? Would tests
fail if this step were omitted? Since there are no coordinator restarts before
or after each test, the coordinator epoch should remain unchanged. Even in the
event of a coordinator restart, the TabletServer should receive the updated
coordinator epoch via `UpdateMetadata`, rendering manual epoch resets
unnecessary.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -709,11 +720,12 @@ private void clearTablesState() {
public void resetContext() {
tablesToBeDeleted.clear();
coordinatorEpoch = 0;
Review Comment:
nit: use `INITIAL_COORDINATOR_EPOCH` like how you updated
`coordinatorEpochZkVersion`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -122,7 +130,25 @@ public void isLeader() {
e);
}
try {
+ // to avoid split-brain
+ Optional<ZkEpoch> optionalEpoch =
+
zkClient.fenceBecomeCoordinatorLeader(serverId);
+ optionalEpoch.ifPresent(
+ integer ->
+ coordinatorContext
+
.setCoordinatorEpochAndZkVersion(
+
optionalEpoch
+
.get()
+
.getCoordinatorEpoch(),
+
optionalEpoch
+
.get()
+
.getCoordinatorEpochZkVersion()));
initLeaderServices.run();
+ } catch (CoordinatorEpochFencedException
e) {
Review Comment:
The catch block below is dead code because `fenceBecomeCoordinatorLeader()`
never throws `CoordinatorEpochFencedException` -- it returns `Optional.empty()`
on `BadVersionException` instead.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -471,18 +565,21 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket,
LeaderAndIsr> leaderAndIsrL
CuratorOp updateOp =
zkClient.transactionOp().setData().forPath(path, data);
ops.add(updateOp);
if (ops.size() == MAX_BATCH_SIZE) {
- zkClient.transaction().forOperations(ops);
+ List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops,
expectedZkVersion);
+ zkClient.transaction().forOperations(wrapOps);
ops.clear();
}
}
if (!ops.isEmpty()) {
- zkClient.transaction().forOperations(ops);
+ List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops,
expectedZkVersion);
+ zkClient.transaction().forOperations(wrapOps);
}
}
- public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
+ public void deleteLeaderAndIsr(TableBucket tableBucket, int
expectedZkVersion)
Review Comment:
`deleteLeaderAndIsr` is only used in test, so we don't need epoch check, and
we can change the visible modifier to `protected`.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
Review Comment:
`updatePartitionAssignment` is not epoch-fenced, A stale coordinator can
overwrite partition assignments after leadership changes.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -274,13 +352,13 @@ public int[] getSortedTabletServerList() throws Exception
{
//
--------------------------------------------------------------------------------------------
/** Register table assignment to ZK. */
- public void registerTableAssignment(long tableId, TableAssignment
tableAssignment)
- throws Exception {
+ public void registerTableAssignment(
+ long tableId, TableAssignment tableAssignment, int
expectedZkVersion) throws Exception {
Review Comment:
This is a special case that we don't need the epoch check, because it is
only invoked when table creation and the unique tableId can has natural
concurrent guard (table ID conflicts, node existence).
##########
fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java:
##########
@@ -193,23 +197,26 @@ void testLeaderAndIsr() throws Exception {
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty();
// try to register bucket leaderAndIsr
- LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1,
2, 3), 100, 1000);
- LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4,
5, 6), 100, 1000);
+ LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1,
2, 3), 0, 1000);
+ LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4,
5, 6), 0, 1000);
- zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1);
- zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2);
+ zookeeperClient.registerLeaderAndIsr(
+ tableBucket1, leaderAndIsr1,
ZkVersion.MATCH_ANY_VERSION.getVersion());
+ zookeeperClient.registerLeaderAndIsr(
+ tableBucket2, leaderAndIsr2,
ZkVersion.MATCH_ANY_VERSION.getVersion());
Review Comment:
It appears that all tests involving ZK epoch versions currently use
`MATCH_ANY_VERSION`, which fails to validate the behavior of the production
code. I suggest creating the ZK epoch node in `beforeAll()` and using the
actual ZK epoch version in the test logic to ensure accurate verification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]