wuchong commented on code in PR #2781:
URL: https://github.com/apache/fluss/pull/2781#discussion_r3035276900


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -337,7 +338,9 @@ public void completeDeleteTable(long tableId) {
         // delete bucket assignments node, which will also delete the bucket 
state node,
         // so that all the zk nodes related to this table are deleted.
         rethrowIfIsNotNoNodeException(
-                () -> zookeeperClient.deleteTableAssignment(tableId),
+                () ->
+                        zookeeperClient.deleteTableAssignment(
+                                tableId, 
ZkVersion.MATCH_ANY_VERSION.getVersion()),

Review Comment:
   This async cleanup can run after delete work has already been queued.
   Passing MATCH_ANY_VERSION disables coordinator epoch fencing, so a stale 
coordinator can still delete the table-assignment znode after leadership 
changes. We should pass in the coordinator zk version for a safe deletion. The 
coordiantor zk version can be got from CoordinatorContext when calling 
MetadataManager. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -399,7 +404,8 @@ public long createTable(
                     long tableId = zookeeperClient.getTableIdAndIncrement();
                     if (tableAssignment != null) {
                         // register table assignment
-                        zookeeperClient.registerTableAssignment(tableId, 
tableAssignment);
+                        zookeeperClient.registerTableAssignment(
+                                tableId, tableAssignment, 
ZkVersion.MATCH_ANY_VERSION.getVersion());

Review Comment:
   ditto



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -127,11 +129,19 @@
 
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
+import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
  * This class includes methods for write/read various metadata (leader 
address, tablet server
  * registration, table assignment, table, schema) in Zookeeper.
+ *
+ * <p>In some method, 'expectedZkVersion' is used to execute an epoch 
Zookeeper version check. We
+ * have the following principals to judge if it's necessary to execute epoch 
Zookeeper version
+ * check. If all condition met, we need to execute epoch Zookeeper version 
check. 1. The method
+ * create/modify/delete Zk node. 2. It's executed by coordinator server. 3. It 
is about
+ * metadata(table/partition/leaderAndIsr) rather than server info or ACL info. 
4. The Zk node is
+ * persistent rather than ephemeral.

Review Comment:
   I think current epoch check is not clear about which type of operations 
should be added epoch check. I think not all zk nodes need the epoch check, 
only the assignment and LeaderAndIsr nodes need. So, could you update the 
principles with following? 
   
   
   ```
   Conditions requiring epoch checks (all must be met):
   ┌─────────────────────────────────────────────────────────┐
   │ 1. Invoked by the Coordinator (not the TabletServer)    │
   │ 2. Operates on persistent nodes (not ephemeral)         │
   │ 3. Constitutes a "control plane" operation:             │
   │    partition assignment or LeaderAndIsr election        │
   │ 4. Concurrent access to the same path by old and new    │
   │    leaders during leader failover                       │
   │ 5. No other mechanisms (optimistic locking,             │
   │    idempotency, or reloading) provide fallback          │
   └─────────────────────────────────────────────────────────┘
   
   In practice, only two types of operations truly require this:
   - CRUD for Table/Partition Assignment (assignment decisions).
   - CRUD for LeaderAndIsr (leader election results).
   
   These operations are inevitably executed concurrently by the old and new 
coordinators during failover (as the new leader immediately reassigns 
partitions), and overwrites cannot be automatically recovered.
   
   All other operations do not require epoch checks because:
   - DDL operations are protected against concurrency by client reconnection 
mechanisms.
   - TabletServer operations are unaffected by coordinator failovers.
   - ACLs and Configs have their own version control or idempotency guarantees.
   - Ephemeral nodes are managed via session lifecycle.
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -346,7 +349,9 @@ public void completeDeletePartition(long partitionId) {
         // delete partition assignments node, which will also delete the 
bucket state node,
         // so that all the zk nodes related to this partition are deleted.
         rethrowIfIsNotNoNodeException(
-                () -> zookeeperClient.deletePartitionAssignment(partitionId),
+                () ->
+                        zookeeperClient.deletePartitionAssignment(
+                                partitionId, 
ZkVersion.MATCH_ANY_VERSION.getVersion()),

Review Comment:
   ditto, we need the true zk version here. 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -197,6 +209,43 @@ public void registerCoordinatorServer(CoordinatorAddress 
coordinatorAddress) thr
         LOG.info("Registered Coordinator server {} at path {}.", 
coordinatorAddress, path);
     }
 
+    /**
+     * Become coordinator leader. This method is a step after 
electCoordinatorLeader() and before
+     * registerCoordinatorLeader(). This is to ensure the coordinator get and 
update the coordinator
+     * epoch and coordinator epoch zk version.
+     */
+    public Optional<ZkEpoch> fenceBecomeCoordinatorLeader(String 
coordinatorId) throws Exception {
+        ensureEpochZnodeExists();
+
+        try {
+            ZkEpoch getEpoch = getCurrentEpoch();
+            int currentCoordinatorEpoch = getEpoch.getCoordinatorEpoch();
+            int currentCoordinatorEpochZkVersion = 
getEpoch.getCoordinatorEpochZkVersion();
+            int newCoordinatorEpoch = currentCoordinatorEpoch + 1;
+            LOG.info(
+                    "Coordinator leader {} tries to update epoch. Current 
epoch={}, Zookeeper version={}, new epoch={}",
+                    coordinatorId,
+                    currentCoordinatorEpoch,
+                    currentCoordinatorEpochZkVersion,
+                    newCoordinatorEpoch);
+
+            // atomically update epoch
+            zkClient.setData()
+                    .withVersion(currentCoordinatorEpochZkVersion)
+                    .forPath(
+                            ZkData.CoordinatorEpochZNode.path(),
+                            
ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch));
+
+            return Optional.of(getEpoch.nextZkEpoch());

Review Comment:
   The `ZkEpoch.nextZkEpoch()` simply increments ZK version by exactly 1. While 
this is guaranteed by ZK spec for a successful conditional `setData`, it would 
be more defensive to capture
   the `Stat` returned by `setData()`.
   
   
   ```java
   Stat stat =
                       zkClient.setData()
                               .withVersion(currentCoordinatorEpochZkVersion)
                               .forPath(
                                       ZkData.CoordinatorEpochZNode.path(),
                                       
ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch));
   
               return Optional.of(new ZkEpoch(newCoordinatorEpoch, 
stat.getVersion()));
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -346,15 +428,16 @@ public void updatePartitionAssignment(long partitionId, 
PartitionAssignment part
                 partitionId);
     }
 
-    public void deleteTableAssignment(long tableId) throws Exception {
+    public void deleteTableAssignment(long tableId, int expectedZkVersion) 
throws Exception {
         String path = TableIdZNode.path(tableId);
-        zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+        deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
         LOG.info("Deleted table assignment for table id {}.", tableId);
     }
 
-    public void deletePartitionAssignment(long partitionId) throws Exception {
+    public void deletePartitionAssignment(long partitionId, int 
expectedZkVersion)
+            throws Exception {
         String path = PartitionIdZNode.path(partitionId);
-        zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+        deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);

Review Comment:
   `deleteTableAssignment` and `deletePartitionAssignment` don't need epoch 
check. The only production caller is 
`MetadataManager.completeDeleteTable/completeDeletePartition`, which already 
passes `MATCH_ANY_VERSION` — so the epoch check is effectively a no-op today.
   
   More importantly, these are **idempotent cleanup operations** at the end of 
the DDL delete flow. By the time they're called, all replicas are already in 
`DeletionSuccessful` state. Even if a stale coordinator executes this cleanup, 
it won't corrupt any live metadata — the table/partition is already marked for 
deletion, and the new leader won't recreate the assignment for it.
   
   Since epoch check provides no safety benefit here, I'd suggest removing the 
`expectedZkVersion` parameter from these two methods and reverting to the 
original `zkClient.delete().deletingChildrenIfNeeded().forPath(path)`. This 
avoids the significant performance overhead of `deleteRecursiveWithEpochCheck` 
(which turns 1 ZK call into O(2N) calls for a path with N children) without 
sacrificing correctness.
   
   If we can remove epoch check on these 2 methods, we can also remove the 
`deleteRecursiveWithEpochCheck()` method. 
   



##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -200,6 +200,10 @@ public void afterEach(ExtensionContext extensionContext) 
throws Exception {
             }
         }
         CompletableFuture.allOf(dropFutures.toArray(new 
CompletableFuture[0])).join();
+
+        for (TabletServer tabletServer : tabletServers.values()) {
+            tabletServer.getReplicaManager().resetCoordinatorEpoch();

Review Comment:
   Why is it necessary to manually reset the coordinator epoch? Would tests 
fail if this step were omitted? Since there are no coordinator restarts before 
or after each test, the coordinator epoch should remain unchanged. Even in the 
event of a coordinator restart, the TabletServer should receive the updated 
coordinator epoch via `UpdateMetadata`, rendering manual epoch resets 
unnecessary.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -709,11 +720,12 @@ private void clearTablesState() {
     public void resetContext() {
         tablesToBeDeleted.clear();
         coordinatorEpoch = 0;

Review Comment:
   nit: use `INITIAL_COORDINATOR_EPOCH` like how you updated 
`coordinatorEpochZkVersion`.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -122,7 +130,25 @@ public void isLeader() {
                                                 e);
                                     }
                                     try {
+                                        // to avoid split-brain
+                                        Optional<ZkEpoch> optionalEpoch =
+                                                
zkClient.fenceBecomeCoordinatorLeader(serverId);
+                                        optionalEpoch.ifPresent(
+                                                integer ->
+                                                        coordinatorContext
+                                                                
.setCoordinatorEpochAndZkVersion(
+                                                                        
optionalEpoch
+                                                                               
 .get()
+                                                                               
 .getCoordinatorEpoch(),
+                                                                        
optionalEpoch
+                                                                               
 .get()
+                                                                               
 .getCoordinatorEpochZkVersion()));
                                         initLeaderServices.run();
+                                    } catch (CoordinatorEpochFencedException 
e) {

Review Comment:
   The catch block below is dead code because `fenceBecomeCoordinatorLeader()` 
never throws `CoordinatorEpochFencedException` -- it returns `Optional.empty()` 
on `BadVersionException` instead.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -471,18 +565,21 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket, 
LeaderAndIsr> leaderAndIsrL
             CuratorOp updateOp = 
zkClient.transactionOp().setData().forPath(path, data);
             ops.add(updateOp);
             if (ops.size() == MAX_BATCH_SIZE) {
-                zkClient.transaction().forOperations(ops);
+                List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, 
expectedZkVersion);
+                zkClient.transaction().forOperations(wrapOps);
                 ops.clear();
             }
         }
         if (!ops.isEmpty()) {
-            zkClient.transaction().forOperations(ops);
+            List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, 
expectedZkVersion);
+            zkClient.transaction().forOperations(wrapOps);
         }
     }
 
-    public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
+    public void deleteLeaderAndIsr(TableBucket tableBucket, int 
expectedZkVersion)

Review Comment:
   `deleteLeaderAndIsr` is only used in test, so we don't need epoch check, and 
we can change the visible modifier to `protected`. 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########


Review Comment:
   `updatePartitionAssignment` is not epoch-fenced, A stale coordinator can 
overwrite partition assignments after leadership changes.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -274,13 +352,13 @@ public int[] getSortedTabletServerList() throws Exception 
{
     // 
--------------------------------------------------------------------------------------------
 
     /** Register table assignment to ZK. */
-    public void registerTableAssignment(long tableId, TableAssignment 
tableAssignment)
-            throws Exception {
+    public void registerTableAssignment(
+            long tableId, TableAssignment tableAssignment, int 
expectedZkVersion) throws Exception {

Review Comment:
   This is a special case that we don't need the epoch check, because it is 
only invoked when table creation and the unique tableId can has natural 
concurrent guard (table ID conflicts, node existence).



##########
fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java:
##########
@@ -193,23 +197,26 @@ void testLeaderAndIsr() throws Exception {
         assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty();
 
         // try to register bucket leaderAndIsr
-        LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 
2, 3), 100, 1000);
-        LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 
5, 6), 100, 1000);
+        LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 
2, 3), 0, 1000);
+        LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 
5, 6), 0, 1000);
 
-        zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1);
-        zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2);
+        zookeeperClient.registerLeaderAndIsr(
+                tableBucket1, leaderAndIsr1, 
ZkVersion.MATCH_ANY_VERSION.getVersion());
+        zookeeperClient.registerLeaderAndIsr(
+                tableBucket2, leaderAndIsr2, 
ZkVersion.MATCH_ANY_VERSION.getVersion());

Review Comment:
   It appears that all tests involving ZK epoch versions currently use 
`MATCH_ANY_VERSION`, which fails to validate the behavior of the production 
code. I suggest creating the ZK epoch node in `beforeAll()` and using the 
actual ZK epoch version in the test logic to ensure accurate verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to