Copilot commented on code in PR #2781:
URL: https://github.com/apache/fluss/pull/2781#discussion_r3035045013


##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -127,11 +129,19 @@
 
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
+import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
  * This class includes methods for write/read various metadata (leader 
address, tablet server
  * registration, table assignment, table, schema) in Zookeeper.
+ *
+ * <p>In some method, 'expectedZkVersion' is used to execute an epoch 
Zookeeper version check. We
+ * have the following principals to judge if it's necessary to execute epoch 
Zookeeper version

Review Comment:
   Spelling: "principals" should be "principles" in the class-level JavaDoc 
describing the epoch version check rules.
   ```suggestion
    * have the following principles to judge if it's necessary to execute epoch 
Zookeeper version
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1781,6 +1878,109 @@ public static <K> Map<K, List<String>> 
processGetChildrenResponses(
         return result;
     }
 
+    /**
+     * create a node (recursively if parent path not exists) with Zk epoch 
version check.
+     *
+     * @param path the path to create
+     * @param data the data to write
+     * @param throwIfPathExists whether to throw exception if path exist
+     * @throws Exception if any error occurs
+     */
+    public void createRecursiveWithEpochCheck(
+            String path, byte[] data, int expectedZkVersion, boolean 
throwIfPathExists)
+            throws Exception {
+        CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);

Review Comment:
   `createRecursiveWithEpochCheck` recursively calls itself with `data = null` 
for parent paths, but `ZooKeeperOp.createOp(...).forPath(path, data)` typically 
expects a non-null byte array. This can cause NPE/IAE during parent creation. 
Consider using Curator's create builder that omits data for parent nodes (or 
pass an explicit empty byte array) and only attach `data` to the final target 
node.
   ```suggestion
           byte[] nodeData = data == null ? new byte[0] : data;
           CuratorOp createOp = zkOp.createOp(path, nodeData, 
CreateMode.PERSISTENT);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1781,6 +1878,109 @@ public static <K> Map<K, List<String>> 
processGetChildrenResponses(
         return result;
     }
 
+    /**
+     * create a node (recursively if parent path not exists) with Zk epoch 
version check.
+     *
+     * @param path the path to create
+     * @param data the data to write
+     * @param throwIfPathExists whether to throw exception if path exist
+     * @throws Exception if any error occurs
+     */
+    public void createRecursiveWithEpochCheck(
+            String path, byte[] data, int expectedZkVersion, boolean 
throwIfPathExists)
+            throws Exception {
+        CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
+        List<CuratorOp> ops = wrapRequestWithEpochCheck(createOp, 
expectedZkVersion);
+
+        try {
+            // try to directly create
+            zkClient.transaction().forOperations(ops);
+        } catch (KeeperException.NodeExistsException e) {
+            // should not exist
+            if (throwIfPathExists) {
+                throw e;
+            }
+        } catch (KeeperException.NoNodeException e) {
+            // if parent does not exist, create parent first
+            int indexOfLastSlash = path.lastIndexOf("/");
+            if (indexOfLastSlash == -1) {
+                throw new IllegalArgumentException("Invalid path: " + path);
+            } else if (indexOfLastSlash == 0) {
+                // root path can be directly create without fence
+                try {
+                    zkClient.create()
+                            .creatingParentsIfNeeded()
+                            .withMode(CreateMode.PERSISTENT)
+                            .forPath(path);
+                } catch (KeeperException.NodeExistsException ignored) {
+                    // ignore
+                }
+            } else {
+                // indexOfLastSlash > 0
+                String parentPath = path.substring(0, indexOfLastSlash);
+                createRecursiveWithEpochCheck(
+                        parentPath, null, expectedZkVersion, 
throwIfPathExists);

Review Comment:
   `createRecursiveWithEpochCheck` propagates `throwIfPathExists` into the 
recursive parent creation call. If `throwIfPathExists` is true for the leaf 
node, this will also throw when *parent* paths already exist, which breaks the 
usual contract of recursive-create (only the target path should be subject to 
the existence check). Consider passing `false` when creating parent paths, and 
applying `throwIfPathExists` only to the final `path` creation attempt.
   ```suggestion
                   createRecursiveWithEpochCheck(parentPath, null, 
expectedZkVersion, false);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -366,6 +366,24 @@ public static CoordinatorAddress decode(byte[] json) {
         }
     }
 
+    /**
+     * The znode for the coordinator epoch. The znode path is:
+     *
+     * <p>/coordinators/epoch
+     */
+    public static final class CoordinatorEpochZNode {
+        public static String path() {
+            return "/coordinators/epoch";
+        }
+
+        public static byte[] encode(int epoch) {
+            return String.valueOf(epoch).getBytes();
+        }
+
+        public static int decode(byte[] bytes) {
+            return Integer.parseInt(new String(bytes));

Review Comment:
   `CoordinatorEpochZNode.encode/decode` uses the platform default charset via 
`String.getBytes()` / `new String(bytes)`, which can be non-deterministic 
across environments. Prefer an explicit charset (e.g., UTF-8) for stable 
encoding/decoding of ZK node contents.
   ```suggestion
               return String.valueOf(epoch).getBytes(StandardCharsets.UTF_8);
           }
   
           public static int decode(byte[] bytes) {
               return Integer.parseInt(new String(bytes, 
StandardCharsets.UTF_8));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to