nastra commented on code in PR #9728:
URL: https://github.com/apache/iceberg/pull/9728#discussion_r1524825126


##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");

Review Comment:
   ```suggestion
         Preconditions.checkArgument(value != null, "Invalid task type: null");
   ```



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),

Review Comment:
   I think this should have an `UNKNOWN` for forward/backward compatibility. 
Imagine a client/server that use different Iceberg versions and new task types 
are being added over time.



##########
core/src/main/java/org/apache/iceberg/DataTaskParser.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTED_SCHEMA = "projection";
+  private static final String METADATA_FILE = "metadata-file";
+  private static final String ROWS = "rows";
+
+  private DataTaskParser() {}
+
+  static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws 
IOException {
+    Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(dataTask.schema(), generator);
+
+    generator.writeFieldName(PROJECTED_SCHEMA);
+    SchemaParser.toJson(dataTask.projectedSchema(), generator);
+
+    generator.writeFieldName(METADATA_FILE);
+    ContentFileParser.toJson(dataTask.metadataFile(), 
PartitionSpec.unpartitioned(), generator);
+
+    generator.writeArrayFieldStart(ROWS);
+    for (StructLike row : dataTask.tableRows()) {
+      SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
+    }
+
+    generator.writeEndArray();
+  }
+
+  static StaticDataTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data 
task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for data task: non-object 
(%s)", jsonNode);
+
+    Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+    Schema projectedSchema = 
SchemaParser.fromJson(JsonUtil.get(PROJECTED_SCHEMA, jsonNode));
+    DataFile metadataFile =
+        (DataFile)
+            ContentFileParser.fromJson(
+                JsonUtil.get(METADATA_FILE, jsonNode), 
PartitionSpec.unpartitioned());
+
+    JsonNode rowsArray = jsonNode.get(ROWS);

Review Comment:
   please use `JsonUtil.get` in such places, as it is a required field and 
`JsonUtil.get` will throw a descriptive error msg if this field is missing



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");
+      if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.value().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);

Review Comment:
   this probably shouldn't fail but rather return `UNKNOWN`. See also #7145 
where a similar issue has been addressed and I think we need to do the same 
thing here



##########
core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java:
##########
@@ -27,29 +27,41 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestFileScanTaskParser {
+
   @Test
   public void testNullArguments() {
-    Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null))
+    Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))

Review Comment:
   I don't think this test class should actually change at all. Since you added 
a new `ScanTaskParser`, it's best to also add a `TestScanTaskParser`. This test 
should stay the same, since someone could still use the Parser and the existing 
behavior of the parser shouldn't change IMO



##########
core/src/main/java/org/apache/iceberg/SnapshotsTable.java:
##########
@@ -27,7 +28,8 @@
  * <p>This does not include snapshots that have been expired using {@link 
ExpireSnapshots}.
  */
 public class SnapshotsTable extends BaseMetadataTable {
-  private static final Schema SNAPSHOT_SCHEMA =
+  @VisibleForTesting
+  static final Schema SNAPSHOT_SCHEMA =

Review Comment:
   could we avoid making this visible?



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");
+      if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.value().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);
+      }
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  private ScanTaskParser() {}
+
+  public static String toJson(FileScanTask fileScanTask) {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    return JsonUtil.generate(generator -> toJson(fileScanTask, generator), 
false);
+  }
+
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
+  }
+
+  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
+      throws IOException {
+    generator.writeStartObject();
+
+    if (fileScanTask instanceof StaticDataTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.value());
+      DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseFileScanTask
+        || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
+      FileScanTaskParser.toJson(fileScanTask, generator);
+    } else {
+      throw new UnsupportedOperationException(

Review Comment:
   I don't think this should fail for the same reason I mentioned further 
above. If you e.g. take a look at the `ReportMetricsRequestParser`, then it 
doesn't fail if if sees an unknown type and we would need to do the same 
handling here



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");
+      if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.value().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);
+      }
+    }
+
+    public String value() {

Review Comment:
   nit: maybe `typeName()`?



##########
core/src/test/java/org/apache/iceberg/TestDataTaskParser.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {

Review Comment:
   this parser currently only tests the happy path. Please also add tests that 
   * check for null/empty json when calling to/from json methods
   * check for missing fields
   * check for invalid task type(s)
   * check a full roundtrip (which is what you have currently here)
   
   You can take a look at the testing patterns in 
`TestReportMetricsRequestParser` / `TestLoadViewResponseParser`



##########
core/src/main/java/org/apache/iceberg/DataTaskParser.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTED_SCHEMA = "projection";
+  private static final String METADATA_FILE = "metadata-file";
+  private static final String ROWS = "rows";
+
+  private DataTaskParser() {}
+
+  static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws 
IOException {
+    Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(dataTask.schema(), generator);
+
+    generator.writeFieldName(PROJECTED_SCHEMA);
+    SchemaParser.toJson(dataTask.projectedSchema(), generator);
+
+    generator.writeFieldName(METADATA_FILE);
+    ContentFileParser.toJson(dataTask.metadataFile(), 
PartitionSpec.unpartitioned(), generator);
+
+    generator.writeArrayFieldStart(ROWS);
+    for (StructLike row : dataTask.tableRows()) {

Review Comment:
   can `tableRows()` be empty?



##########
core/src/main/java/org/apache/iceberg/FileScanTaskParser.java:
##########
@@ -40,16 +40,32 @@ public class FileScanTaskParser {
 
   private FileScanTaskParser() {}
 
+  /**
+   * Serialize file scan task to JSON string
+   *
+   * @deprecated Use {@link ScanTaskParser#toJson(FileScanTask)} instead; will 
be removed in 1.7.0
+   */
+  @Deprecated
   public static String toJson(FileScanTask fileScanTask) {
-    return JsonUtil.generate(
-        generator -> FileScanTaskParser.toJson(fileScanTask, generator), 
false);
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    return JsonUtil.generate(generator -> toJson(fileScanTask, generator), 
false);
+  }
+
+  /**
+   * Deserialize file scan task from JSON string
+   *
+   * @deprecated Use {@link ScanTaskParser#fromJson(String, boolean)} instead; 
will be removed in
+   *     1.7.0
+   */
+  @Deprecated
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
   }
 
-  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
-      throws IOException {
+  static void toJson(FileScanTask fileScanTask, JsonGenerator generator) 
throws IOException {
     Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
     Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
-    generator.writeStartObject();

Review Comment:
   isn't this changing behavior when the parser is being used elsewhere?



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");
+      if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.value().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);
+      }
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  private ScanTaskParser() {}
+
+  public static String toJson(FileScanTask fileScanTask) {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    return JsonUtil.generate(generator -> toJson(fileScanTask, generator), 
false);
+  }
+
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
+  }
+
+  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
+      throws IOException {
+    generator.writeStartObject();
+
+    if (fileScanTask instanceof StaticDataTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.value());
+      DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseFileScanTask
+        || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
+      FileScanTaskParser.toJson(fileScanTask, generator);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported task type: " + 
fileScanTask.getClass().getCanonicalName());
+    }
+
+    generator.writeEndObject();
+  }
+
+  private static FileScanTask fromJson(JsonNode jsonNode, boolean 
caseSensitive) {
+    TaskType taskType = TaskType.FILE_SCAN_TASK;
+    String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
+    if (!Strings.isNullOrEmpty(taskTypeStr)) {
+      taskType = TaskType.fromValue(taskTypeStr);
+    }
+
+    switch (taskType) {
+      case FILE_SCAN_TASK:
+        return FileScanTaskParser.fromJson(jsonNode, caseSensitive);
+      case DATA_TASK:
+        return DataTaskParser.fromJson(jsonNode);
+      default:
+        throw new UnsupportedOperationException("Unsupported task type: " + 
taskType.value());

Review Comment:
   this shouldn't fail and should be handled similarly to how it's done in 
`ReportMetricsRequestParser`



##########
core/src/main/java/org/apache/iceberg/DataTaskParser.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTED_SCHEMA = "projection";
+  private static final String METADATA_FILE = "metadata-file";
+  private static final String ROWS = "rows";
+
+  private DataTaskParser() {}
+
+  static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws 
IOException {
+    Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");

Review Comment:
   `TestDataTaskParser` needs tests to cover these null checks



##########
core/src/main/java/org/apache/iceberg/ScanTaskParser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromValue(String value) {
+      Preconditions.checkArgument(value != null, "Task type is null");
+      if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.value().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);
+      }
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  private ScanTaskParser() {}
+
+  public static String toJson(FileScanTask fileScanTask) {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    return JsonUtil.generate(generator -> toJson(fileScanTask, generator), 
false);
+  }
+
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
+  }
+
+  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
+      throws IOException {
+    generator.writeStartObject();
+
+    if (fileScanTask instanceof StaticDataTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.value());
+      DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseFileScanTask
+        || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
+      FileScanTaskParser.toJson(fileScanTask, generator);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported task type: " + 
fileScanTask.getClass().getCanonicalName());
+    }
+
+    generator.writeEndObject();
+  }
+
+  private static FileScanTask fromJson(JsonNode jsonNode, boolean 
caseSensitive) {
+    TaskType taskType = TaskType.FILE_SCAN_TASK;
+    String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
+    if (!Strings.isNullOrEmpty(taskTypeStr)) {

Review Comment:
   ```suggestion
       if (null != taskTypeStr) {
   ```
   I don't think this should support having an empty string in the `TASK_TYPE`. 
If the `TASK_TYPE` field is sent, then it must always be a non-empty string



##########
core/src/main/java/org/apache/iceberg/DataTaskParser.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTED_SCHEMA = "projection";
+  private static final String METADATA_FILE = "metadata-file";
+  private static final String ROWS = "rows";
+
+  private DataTaskParser() {}
+
+  static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws 
IOException {
+    Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(dataTask.schema(), generator);
+
+    generator.writeFieldName(PROJECTED_SCHEMA);
+    SchemaParser.toJson(dataTask.projectedSchema(), generator);
+
+    generator.writeFieldName(METADATA_FILE);
+    ContentFileParser.toJson(dataTask.metadataFile(), 
PartitionSpec.unpartitioned(), generator);
+
+    generator.writeArrayFieldStart(ROWS);
+    for (StructLike row : dataTask.tableRows()) {
+      SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
+    }
+
+    generator.writeEndArray();
+  }
+
+  static StaticDataTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data 
task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for data task: non-object 
(%s)", jsonNode);

Review Comment:
   `TestDataTaskParser` needs tests to cover these null/object checks



##########
core/src/test/java/org/apache/iceberg/TestDataTaskParser.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {
+  @Test
+  public void testStaticDataTask() {
+    StaticDataTask dataTask = (StaticDataTask) createDataTask();
+    String jsonStr = ScanTaskParser.toJson(dataTask);
+    Assertions.assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
+    StaticDataTask deserializedTask = (StaticDataTask) 
ScanTaskParser.fromJson(jsonStr, true);
+    assertDataTaskEquals(dataTask, deserializedTask);
+  }
+
+  private DataTask createDataTask() {
+    Map<String, String> summary1 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "1",
+            "total-files-size", "10",
+            "total-data-files", "1",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    Map<String, String> summary2 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "2",
+            "total-files-size", "20",
+            "total-data-files", "2",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    List<Snapshot> snapshots =
+        Arrays.asList(
+            new BaseSnapshot(
+                1L, 1L, null, 1234567890000L, "append", summary1, 1, 
"file:/tmp/manifest1.avro"),
+            new BaseSnapshot(
+                2L, 2L, 1L, 9876543210000L, "append", summary2, 1, 
"file:/tmp/manifest2.avro"));
+
+    return StaticDataTask.of(
+        Files.localInput("file:/tmp/metadata2.json"),
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        snapshots,
+        SnapshotsTable::snapshotToRow);
+  }
+
+  private String snapshotsDataTaskJson() {
+    return "{\"task-type\":\"data-task\","
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+        + "\"file-path\":\"/tmp/metadata2.json\","
+        + "\"file-format\":\"METADATA\",\"partition\":{},"
+        + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0},"
+        + 
"\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest1.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"1\",\"10\",\"1\",\"0\",\"0\",\"0\"]}},"
+        + 
"{\"1\":\"2282-12-22T20:13:30+00:00\",\"2\":2,\"3\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest2.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"2\",\"20\",\"2\",\"0\",\"0\",\"0\"]}}]}";
+  }
+
+  private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask 
actual) {
+    Assertions.assertThat(expected.schema().sameSchema(actual.schema()))
+        .as("Schema should match")
+        .isTrue();
+
+    
Assertions.assertThat(expected.projectedSchema().sameSchema(actual.projectedSchema()))

Review Comment:
   same as above



##########
core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java:
##########
@@ -27,29 +27,41 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestFileScanTaskParser {
+

Review Comment:
   nit: unnecessary change



##########
core/src/test/java/org/apache/iceberg/TestDataTaskParser.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {
+  @Test
+  public void testStaticDataTask() {
+    StaticDataTask dataTask = (StaticDataTask) createDataTask();
+    String jsonStr = ScanTaskParser.toJson(dataTask);
+    Assertions.assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
+    StaticDataTask deserializedTask = (StaticDataTask) 
ScanTaskParser.fromJson(jsonStr, true);
+    assertDataTaskEquals(dataTask, deserializedTask);
+  }
+
+  private DataTask createDataTask() {
+    Map<String, String> summary1 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "1",
+            "total-files-size", "10",
+            "total-data-files", "1",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    Map<String, String> summary2 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "2",
+            "total-files-size", "20",
+            "total-data-files", "2",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    List<Snapshot> snapshots =
+        Arrays.asList(
+            new BaseSnapshot(
+                1L, 1L, null, 1234567890000L, "append", summary1, 1, 
"file:/tmp/manifest1.avro"),
+            new BaseSnapshot(
+                2L, 2L, 1L, 9876543210000L, "append", summary2, 1, 
"file:/tmp/manifest2.avro"));
+
+    return StaticDataTask.of(
+        Files.localInput("file:/tmp/metadata2.json"),
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        snapshots,
+        SnapshotsTable::snapshotToRow);
+  }
+
+  private String snapshotsDataTaskJson() {
+    return "{\"task-type\":\"data-task\","
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+        + "\"file-path\":\"/tmp/metadata2.json\","
+        + "\"file-format\":\"METADATA\",\"partition\":{},"
+        + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0},"
+        + 
"\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest1.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"1\",\"10\",\"1\",\"0\",\"0\",\"0\"]}},"
+        + 
"{\"1\":\"2282-12-22T20:13:30+00:00\",\"2\":2,\"3\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest2.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"2\",\"20\",\"2\",\"0\",\"0\",\"0\"]}}]}";
+  }
+
+  private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask 
actual) {
+    Assertions.assertThat(expected.schema().sameSchema(actual.schema()))

Review Comment:
   ```suggestion
       
assertThat(expected.schema().asStruct()).isEqualTo(atual.schema().asStruct()))
   ```
   
   this is slightly better, because it will actually show the difference in the 
schema if the assertion ever fails, whereas `.isTrue()` will only say that the 
schema doesn't match



##########
core/src/test/java/org/apache/iceberg/TestDataTaskParser.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {
+  @Test
+  public void testStaticDataTask() {
+    StaticDataTask dataTask = (StaticDataTask) createDataTask();
+    String jsonStr = ScanTaskParser.toJson(dataTask);
+    Assertions.assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
+    StaticDataTask deserializedTask = (StaticDataTask) 
ScanTaskParser.fromJson(jsonStr, true);
+    assertDataTaskEquals(dataTask, deserializedTask);
+  }
+
+  private DataTask createDataTask() {
+    Map<String, String> summary1 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "1",
+            "total-files-size", "10",
+            "total-data-files", "1",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    Map<String, String> summary2 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "2",
+            "total-files-size", "20",
+            "total-data-files", "2",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    List<Snapshot> snapshots =
+        Arrays.asList(
+            new BaseSnapshot(
+                1L, 1L, null, 1234567890000L, "append", summary1, 1, 
"file:/tmp/manifest1.avro"),
+            new BaseSnapshot(
+                2L, 2L, 1L, 9876543210000L, "append", summary2, 1, 
"file:/tmp/manifest2.avro"));
+
+    return StaticDataTask.of(
+        Files.localInput("file:/tmp/metadata2.json"),
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        SnapshotsTable.SNAPSHOT_SCHEMA,
+        snapshots,
+        SnapshotsTable::snapshotToRow);
+  }
+
+  private String snapshotsDataTaskJson() {
+    return "{\"task-type\":\"data-task\","

Review Comment:
   it would be good to have this pretty-formatted so that it's easier to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to