aokolnychyi commented on code in PR #10288:
URL: https://github.com/apache/iceberg/pull/10288#discussion_r1723732386


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    validateColumns();
+    LOG.info(
+        "Computing stats for columns {} in {} (snapshot {})",
+        columns(),
+        table.name(),
+        snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile = writeStatsFile(blobs);
+    table.updateStatistics().setStatistics(snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshotId());
+    OutputFile outputFile = table.io().newOutputFile(outputPath());
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshotId(),
+          outputFile.location(),
+          writer.fileSize(),
+          writer.footerSize(),
+          GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchUtil.generateSketches(spark(), table, snapshot, columns());
+  }
+
+  private List<String> columns() {
+    if (columns == null) {
+      Schema schema = snapshot == null ? table.schema() : 
table.schemas().get(snapshot.schemaId());

Review Comment:
   If I understand correctly, we never expect snapshot to be null at this 
point. If so, something went wrong as we must have returned earlier.
   
   



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);

Review Comment:
   We can still dedup columns here: 
`ImmutableList.copyOf(ImmutableSet.copyOf(newColumns))`



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    validateColumns();
+    LOG.info(
+        "Computing stats for columns {} in {} (snapshot {})",
+        columns(),
+        table.name(),
+        snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile = writeStatsFile(blobs);
+    table.updateStatistics().setStatistics(snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshotId());
+    OutputFile outputFile = table.io().newOutputFile(outputPath());
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshotId(),
+          outputFile.location(),
+          writer.fileSize(),
+          writer.footerSize(),
+          GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchUtil.generateSketches(spark(), table, snapshot, columns());
+  }
+
+  private List<String> columns() {
+    if (columns == null) {
+      Schema schema = snapshot == null ? table.schema() : 
table.schemas().get(snapshot.schemaId());
+      columns =

Review Comment:
   Minor: `columns` -> `this.columns` as this initializes an instance field.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    validateColumns();
+    LOG.info(
+        "Computing stats for columns {} in {} (snapshot {})",
+        columns(),
+        table.name(),
+        snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile = writeStatsFile(blobs);
+    table.updateStatistics().setStatistics(snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshotId());
+    OutputFile outputFile = table.io().newOutputFile(outputPath());
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshotId(),
+          outputFile.location(),
+          writer.fileSize(),
+          writer.footerSize(),
+          GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchUtil.generateSketches(spark(), table, snapshot, columns());
+  }
+
+  private List<String> columns() {
+    if (columns == null) {
+      Schema schema = snapshot == null ? table.schema() : 
table.schemas().get(snapshot.schemaId());
+      columns =
+          schema.columns().stream()
+              .filter(nestedField -> nestedField.type().isPrimitiveType())
+              .map(Types.NestedField::name)
+              .collect(Collectors.toList());

Review Comment:
   This will not be enough as we have to dive into nested fields and collect 
all leaf columns. That said, I am OK addressing this in a follow-up PR to limit 
the scope and get this in. At least, it will no longer fail.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.PuffinCompressionCodec;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.stats.ThetaSketchAgg;
+
+public class NDVSketchUtil {
+
+  private NDVSketchUtil() {}
+
+  public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";
+
+  static List<Blob> generateSketches(
+      SparkSession spark, Table table, Snapshot snapshot, List<String> 
columns) {
+    Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, 
snapshot, columns);
+    return generateBlobs(table, columns, columnToSketchMap, snapshot);
+  }
+
+  private static List<Blob> generateBlobs(
+      Table table, List<String> columns, Map<Integer, Sketch> sketchMap, 
Snapshot snapshot) {
+    Schema schema = table.schemas().get(snapshot.schemaId());
+    return columns.stream()
+        .map(
+            columnName -> {
+              Types.NestedField field = schema.findField(columnName);
+              Sketch sketch = sketchMap.get(field.fieldId());
+              long ndv = (long) sketch.getEstimate();
+              return new Blob(
+                  StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+                  ImmutableList.of(field.fieldId()),
+                  snapshot.snapshotId(),
+                  snapshot.sequenceNumber(),
+                  ByteBuffer.wrap(sketch.toByteArray()),
+                  PuffinCompressionCodec.ZSTD,
+                  ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, 
String.valueOf(ndv)));
+            })
+        .collect(Collectors.toList());
+  }
+
+  private static Map<Integer, Sketch> computeNDVSketches(
+      SparkSession spark, Table table, Snapshot snapshot, List<String> 
columnsToBeAnalyzed) {
+    Map<Integer, Sketch> sketchMap = Maps.newHashMap();
+    String tableName = table.name();
+    List<String> columns = ImmutableList.copyOf(columnsToBeAnalyzed);
+
+    Column[] aggregateColumns =
+        columns.stream()

Review Comment:
   What about splitting this logic into a few helper methods:
   
   ```
   private static Column[] toAggColumns(List<String> colNames) {
     return 
colNames.stream().map(NDVSketchUtil::toAggColumn).toArray(Column[]::new);
   }
   
   private static Column toAggColumn(String colName) {
     ThetaSketchAgg agg = new ThetaSketchAgg(colName);
     return new Column(agg.toAggregateExpression());
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;

Review Comment:
   Let's add a log message. Will help us with debugging later.
   
   ```
   LOG.info("No snapshot to compute stats for table {}", table.name());
   ```



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static 
org.apache.iceberg.spark.actions.NDVSketchUtil.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.BlobMetadata;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestComputeTableStatsAction extends CatalogTestBase {
+
+  private static final Types.StructType LEAF_STRUCT_TYPE =
+      Types.StructType.of(
+          optional(1, "leafLongCol", Types.LongType.get()),
+          optional(2, "leafDoubleCol", Types.DoubleType.get()));
+
+  private static final Types.StructType NESTED_STRUCT_TYPE =
+      Types.StructType.of(required(3, "leafStructCol", LEAF_STRUCT_TYPE));
+
+  private static final Schema NESTED_SCHEMA =
+      new Schema(required(4, "nestedStructCol", NESTED_STRUCT_TYPE));
+
+  private static final Schema SCHEMA_WITH_NESTED_COLUMN =
+      new Schema(
+          required(4, "nestedStructCol", NESTED_STRUCT_TYPE),
+          required(5, "stringCol", Types.StringType.get()));
+
+  @TestTemplate
+  public void testComputeTableStatsAction() throws NoSuchTableException, 
ParseException {
+    assumeTrue(catalogName.equals("spark_catalog"));

Review Comment:
   Are these tests only work with the session catalog or we are saving test 
time? If it is about time, we can simply overload `parameters` in this class.



##########
spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAgg.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.stats
+
+import java.nio.ByteBuffer
+import org.apache.datasketches.common.Family
+import org.apache.datasketches.memory.Memory
+import org.apache.datasketches.theta.CompactSketch
+import org.apache.datasketches.theta.SetOperationBuilder
+import org.apache.datasketches.theta.Sketch
+import org.apache.datasketches.theta.UpdateSketch
+import org.apache.iceberg.spark.SparkSchemaUtil
+import org.apache.iceberg.types.Conversions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.BinaryType
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * ThetaSketchAgg generates Alpha family sketch with default seed.
+ * The values fed to the sketch are converted to bytes using Iceberg's single 
value serialization.
+ * The result returned is an array of bytes of Compact Theta sketch of 
Datasketches library,
+ * which should be deserialized to Compact sketch before using.
+ *
+ * See [[https://iceberg.apache.org/puffin-spec/]] for more information.
+ *
+ */
+case class ThetaSketchAgg(

Review Comment:
   Looks great to me!



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.PuffinCompressionCodec;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.stats.ThetaSketchAgg;
+
+public class NDVSketchUtil {
+
+  private NDVSketchUtil() {}
+
+  public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";
+
+  static List<Blob> generateSketches(
+      SparkSession spark, Table table, Snapshot snapshot, List<String> 
columns) {
+    Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, 
snapshot, columns);
+    return generateBlobs(table, columns, columnToSketchMap, snapshot);
+  }
+
+  private static List<Blob> generateBlobs(
+      Table table, List<String> columns, Map<Integer, Sketch> sketchMap, 
Snapshot snapshot) {
+    Schema schema = table.schemas().get(snapshot.schemaId());
+    return columns.stream()
+        .map(
+            columnName -> {
+              Types.NestedField field = schema.findField(columnName);
+              Sketch sketch = sketchMap.get(field.fieldId());
+              long ndv = (long) sketch.getEstimate();
+              return new Blob(
+                  StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+                  ImmutableList.of(field.fieldId()),
+                  snapshot.snapshotId(),
+                  snapshot.sequenceNumber(),
+                  ByteBuffer.wrap(sketch.toByteArray()),
+                  PuffinCompressionCodec.ZSTD,
+                  ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, 
String.valueOf(ndv)));
+            })
+        .collect(Collectors.toList());
+  }
+
+  private static Map<Integer, Sketch> computeNDVSketches(
+      SparkSession spark, Table table, Snapshot snapshot, List<String> 
columnsToBeAnalyzed) {
+    Map<Integer, Sketch> sketchMap = Maps.newHashMap();
+    String tableName = table.name();
+    List<String> columns = ImmutableList.copyOf(columnsToBeAnalyzed);

Review Comment:
   Why extra copy?



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java:
##########
@@ -77,6 +77,7 @@ public static void startMetastoreAndSpark() {
             .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
             .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
             
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+            .config("spark.files.maxPartitionBytes", "100")

Review Comment:
   Is this needed? If it is for splitting tasks, I don't think it will work for 
Iceberg. We should use the equivalent property in Iceberg.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private List<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableList.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    validateColumns();
+    LOG.info(
+        "Computing stats for columns {} in {} (snapshot {})",
+        columns(),
+        table.name(),
+        snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile = writeStatsFile(blobs);
+    table.updateStatistics().setStatistics(snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshotId());
+    OutputFile outputFile = table.io().newOutputFile(outputPath());
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshotId(),
+          outputFile.location(),
+          writer.fileSize(),
+          writer.footerSize(),
+          GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchUtil.generateSketches(spark(), table, snapshot, columns());
+  }
+
+  private List<String> columns() {
+    if (columns == null) {
+      Schema schema = snapshot == null ? table.schema() : 
table.schemas().get(snapshot.schemaId());
+      columns =
+          schema.columns().stream()
+              .filter(nestedField -> nestedField.type().isPrimitiveType())
+              .map(Types.NestedField::name)
+              .collect(Collectors.toList());
+    }
+    return columns;
+  }
+
+  private void validateColumns() {
+    Schema schema = table.schemas().get(snapshot.schemaId());
+    Preconditions.checkArgument(!columns().isEmpty(), "No columns found to 
compute stats");
+    for (String columnName : columns()) {
+      Types.NestedField field = schema.findField(columnName);
+      Preconditions.checkArgument(field != null, "Can't find column %s in %s", 
columnName, schema);
+      Preconditions.checkArgument(
+          field.type().isPrimitiveType(),
+          "Can't compute stats on non-primitive type column: %s (%s)",
+          columnName,
+          field.type());
+    }
+  }
+
+  private String appIdentifier() {
+    String icebergVersion = IcebergBuild.fullVersion();
+    String sparkVersion = spark().version();
+    return String.format("Iceberg %s Spark %s", icebergVersion, sparkVersion);
+  }
+
+  private Long snapshotId() {

Review Comment:
   Optional: I think this method returns `Long` and not `long` only because 
`jobDesc()` may be called prior to validation. If you want, you can make this 
method return `long` and move the validation and early return statements to 
`execute`. Up to you, though. I am OK either way.
   
   ```
   @Override
   public Result execute() {
     if (snapshot == null) {
       LOG.info("No snapshot to compute stats for table {}", table.name());
       return EMPTY_RESULT;
     }
     validateColumns();
     JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
     return withJobGroupInfo(info, this::doExecute);
   }
   ```



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static 
org.apache.iceberg.spark.actions.NDVSketchGenerator.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.BlobMetadata;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestComputeTableStatsAction extends CatalogTestBase {

Review Comment:
   @karuppayya, I mean referencing leaf primitive columns like 
`top_struct_field.nested_int_col`.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.PuffinCompressionCodec;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.stats.ThetaSketchAgg;
+
+public class NDVSketchUtil {
+
+  private NDVSketchUtil() {}
+
+  public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";
+
+  static List<Blob> generateSketches(
+      SparkSession spark, Table table, Snapshot snapshot, List<String> 
columns) {
+    Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, 
snapshot, columns);

Review Comment:
   Optional: I am not sure there is a lot of value in a temp map. I'd consider 
creating `Blob` objects directly.
   
   ```
   public static List<Blob> generateSketches(
       SparkSession spark, Table table, Snapshot snapshot, List<String> 
colNames) {
     Row sketches = ...
     Schema schema = table.schemas().get(snapshot.schemaId());
     List<Blob> blobs = Lists.newArrayList();
     for (int i = 0; i < colNames.size(); i++) {
       Types.NestedField field = schema.findField(colNames.get(i));
       Sketch sketch = CompactSketch.wrap(Memory.wrap((byte[]) 
sketches.get(i)));
       blobs.add(toBlob(field, sketch, snapshot));
     }
     return blobs;
   }
   
   private static Blob toBlob(Types.NestedField field, Sketch sketch, Snapshot 
snapshot) {
     ...
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to