Copilot commented on code in PR #2781:
URL: https://github.com/apache/fluss/pull/2781#discussion_r3035045013
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -127,11 +129,19 @@
import static java.util.stream.Collectors.toMap;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
+import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
* This class includes methods for write/read various metadata (leader
address, tablet server
* registration, table assignment, table, schema) in Zookeeper.
+ *
+ * <p>In some method, 'expectedZkVersion' is used to execute an epoch
Zookeeper version check. We
+ * have the following principals to judge if it's necessary to execute epoch
Zookeeper version
Review Comment:
Spelling: "principals" should be "principles" in the class-level JavaDoc
describing the epoch version check rules.
```suggestion
* have the following principles to judge if it's necessary to execute epoch
Zookeeper version
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1781,6 +1878,109 @@ public static <K> Map<K, List<String>>
processGetChildrenResponses(
return result;
}
+ /**
+ * create a node (recursively if parent path not exists) with Zk epoch
version check.
+ *
+ * @param path the path to create
+ * @param data the data to write
+ * @param throwIfPathExists whether to throw exception if path exist
+ * @throws Exception if any error occurs
+ */
+ public void createRecursiveWithEpochCheck(
+ String path, byte[] data, int expectedZkVersion, boolean
throwIfPathExists)
+ throws Exception {
+ CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
Review Comment:
`createRecursiveWithEpochCheck` recursively calls itself with `data = null`
for parent paths, but `ZooKeeperOp.createOp(...).forPath(path, data)` typically
expects a non-null byte array. This can cause NPE/IAE during parent creation.
Consider using Curator's create builder that omits data for parent nodes (or
pass an explicit empty byte array) and only attach `data` to the final target
node.
```suggestion
byte[] nodeData = data == null ? new byte[0] : data;
CuratorOp createOp = zkOp.createOp(path, nodeData,
CreateMode.PERSISTENT);
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1781,6 +1878,109 @@ public static <K> Map<K, List<String>>
processGetChildrenResponses(
return result;
}
+ /**
+ * create a node (recursively if parent path not exists) with Zk epoch
version check.
+ *
+ * @param path the path to create
+ * @param data the data to write
+ * @param throwIfPathExists whether to throw exception if path exist
+ * @throws Exception if any error occurs
+ */
+ public void createRecursiveWithEpochCheck(
+ String path, byte[] data, int expectedZkVersion, boolean
throwIfPathExists)
+ throws Exception {
+ CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
+ List<CuratorOp> ops = wrapRequestWithEpochCheck(createOp,
expectedZkVersion);
+
+ try {
+ // try to directly create
+ zkClient.transaction().forOperations(ops);
+ } catch (KeeperException.NodeExistsException e) {
+ // should not exist
+ if (throwIfPathExists) {
+ throw e;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // if parent does not exist, create parent first
+ int indexOfLastSlash = path.lastIndexOf("/");
+ if (indexOfLastSlash == -1) {
+ throw new IllegalArgumentException("Invalid path: " + path);
+ } else if (indexOfLastSlash == 0) {
+ // root path can be directly create without fence
+ try {
+ zkClient.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
+ } catch (KeeperException.NodeExistsException ignored) {
+ // ignore
+ }
+ } else {
+ // indexOfLastSlash > 0
+ String parentPath = path.substring(0, indexOfLastSlash);
+ createRecursiveWithEpochCheck(
+ parentPath, null, expectedZkVersion,
throwIfPathExists);
Review Comment:
`createRecursiveWithEpochCheck` propagates `throwIfPathExists` into the
recursive parent creation call. If `throwIfPathExists` is true for the leaf
node, this will also throw when *parent* paths already exist, which breaks the
usual contract of recursive-create (only the target path should be subject to
the existence check). Consider passing `false` when creating parent paths, and
applying `throwIfPathExists` only to the final `path` creation attempt.
```suggestion
createRecursiveWithEpochCheck(parentPath, null,
expectedZkVersion, false);
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -366,6 +366,24 @@ public static CoordinatorAddress decode(byte[] json) {
}
}
+ /**
+ * The znode for the coordinator epoch. The znode path is:
+ *
+ * <p>/coordinators/epoch
+ */
+ public static final class CoordinatorEpochZNode {
+ public static String path() {
+ return "/coordinators/epoch";
+ }
+
+ public static byte[] encode(int epoch) {
+ return String.valueOf(epoch).getBytes();
+ }
+
+ public static int decode(byte[] bytes) {
+ return Integer.parseInt(new String(bytes));
Review Comment:
`CoordinatorEpochZNode.encode/decode` uses the platform default charset via
`String.getBytes()` / `new String(bytes)`, which can be non-deterministic
across environments. Prefer an explicit charset (e.g., UTF-8) for stable
encoding/decoding of ZK node contents.
```suggestion
return String.valueOf(epoch).getBytes(StandardCharsets.UTF_8);
}
public static int decode(byte[] bytes) {
return Integer.parseInt(new String(bytes,
StandardCharsets.UTF_8));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]