aokolnychyi commented on code in PR #10288:
URL: https://github.com/apache/iceberg/pull/10288#discussion_r1708329708


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {

Review Comment:
   Question: Can we move `columns()` and `snapshot()` methods right below 
`self()`? That's the approach we frequently do in other actions, where we put 
simple public configuration methods to the top before going into the actual 
`execute`.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile;
+    try {
+      statisticsFile = writeStatsFile(blobs);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    table.updateStatistics().setStatistics(snapshot.snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) throws IOException {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    TableOperations operations = ((HasTableOperations) table).operations();
+    String path = operations.metadataFileLocation(String.format("%s.stats", 
UUID.randomUUID()));
+    OutputFile outputFile = operations.io().newOutputFile(path);
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshot.snapshotId(),
+          path,
+          writer.fileSize(),
+          writer.footerSize(),
+          writer.writtenBlobsMetadata().stream()
+              .map(GenericBlobMetadata::from)
+              .collect(ImmutableList.toImmutableList()));
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchGenerator.generateNDVSketchesAndBlobs(spark(), table, 
snapshot, columns());
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableSet.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  private Set<String> columns() {
+    Schema schema = table.schemas().get(snapshot.schemaId());
+    if (columns == null) {
+      columns = 
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
+    }
+    validateColumns(schema);

Review Comment:
   I am not sure it is a good idea to validate columns every time we call 
`columns()`. It should be a cheap method after the value is initialized. What 
about doing this in `doExecute`?
   
   ```
   if (snapshot == null) {
     return EMPTY_RESULT;
   }
   
   validateColumns();
   
   LOG.info(...)
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());

Review Comment:
   Minor: What about this?
   
   ```
   LOG.info(
       "Computing stats for columns {} in {} (snapshot {})",
       columns(),
       table.name(),
       snapshotId());
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile;
+    try {
+      statisticsFile = writeStatsFile(blobs);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    table.updateStatistics().setStatistics(snapshot.snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) throws IOException {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    TableOperations operations = ((HasTableOperations) table).operations();
+    String path = operations.metadataFileLocation(String.format("%s.stats", 
UUID.randomUUID()));
+    OutputFile outputFile = operations.io().newOutputFile(path);
+    try (PuffinWriter writer = 
Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshot.snapshotId(),
+          path,
+          writer.fileSize(),
+          writer.footerSize(),
+          writer.writtenBlobsMetadata().stream()
+              .map(GenericBlobMetadata::from)
+              .collect(ImmutableList.toImmutableList()));
+    }
+  }
+
+  private List<Blob> generateNDVBlobs() {
+    return NDVSketchGenerator.generateNDVSketchesAndBlobs(spark(), table, 
snapshot, columns());
+  }
+
+  @Override
+  public ComputeTableStats columns(String... newColumns) {
+    Preconditions.checkArgument(
+        newColumns != null && newColumns.length > 0, "Columns cannot be 
null/empty");
+    this.columns = ImmutableSet.copyOf(newColumns);
+    return this;
+  }
+
+  @Override
+  public ComputeTableStats snapshot(long newSnapshotId) {
+    Snapshot newSnapshot = table.snapshot(newSnapshotId);
+    Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", 
newSnapshotId);
+    this.snapshot = newSnapshot;
+    return this;
+  }
+
+  private Set<String> columns() {
+    Schema schema = table.schemas().get(snapshot.schemaId());
+    if (columns == null) {
+      columns = 
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
+    }
+    validateColumns(schema);
+    return columns;
+  }
+
+  private void validateColumns(Schema schema) {
+    for (String columnName : columns) {
+      Types.NestedField field = schema.findField(columnName);
+      if (field == null) {

Review Comment:
   What about using `Preconditions` here and adding a bit more context to the 
error messages?
   
   ```
   Preconditions.checkArgument(field != null, "Can't find column %s in %s", 
columnName, schema);
   Preconditions.checkArgument(
       field.type().isPrimitiveType(),
       "Can't compute stats on non-primitive type column: %s (%s)",
       columnName,
       field.type());
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+  private NDVSketchGenerator() {}
+
+  public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";
+
+  static List<Blob> generateNDVSketchesAndBlobs(
+      SparkSession spark, Table table, Snapshot snapshot, Set<String> 
columnsToBeAnalyzed) {
+    Map<Integer, ThetaSketchJavaSerializable> columnToSketchMap =
+        computeNDVSketches(spark, table, snapshot.snapshotId(), 
columnsToBeAnalyzed);
+    return generateBlobs(table, columnsToBeAnalyzed, columnToSketchMap, 
snapshot);
+  }
+
+  private static List<Blob> generateBlobs(
+      Table table,
+      Set<String> columns,
+      Map<Integer, ThetaSketchJavaSerializable> sketchMap,
+      Snapshot snapshot) {
+    return columns.stream()
+        .map(
+            columnName -> {
+              Types.NestedField field = table.schema().findField(columnName);
+              Sketch sketch = sketchMap.get(field.fieldId()).getSketch();
+              long ndv = (long) sketch.getEstimate();
+              return new Blob(
+                  StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+                  ImmutableList.of(field.fieldId()),
+                  snapshot.snapshotId(),
+                  snapshot.sequenceNumber(),
+                  ByteBuffer.wrap(sketch.toByteArray()),
+                  null,
+                  ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, 
String.valueOf(ndv)));
+            })
+        .collect(Collectors.toList());
+  }
+
+  private static Map<Integer, ThetaSketchJavaSerializable> computeNDVSketches(

Review Comment:
   > To use the TypedImperativeAggregate shouldn't be a expression natively 
provided by Spark?
   
   Not necessarily, it is fairly easy to convert `TypedImperativeAggregate` to 
`Column` and use it. 
   
   ```
   val func = new ThethaSketchAggregator(...)
   new Column(func.toAggregateExpression(isDistinct = false))
   ```
   
   > Also can this be taken as improvement in a subsequent PR?
   
   I think the performance difference will be significant and I doubt the 
existing approach will work if Kryo is enabled. The current solution relies on 
writeObject/readObject methods that will be simply ignored by Kryo.
   
   It will be fairly easy to implement `TypedImperativeAggregate` in our case. 
We will have to return a row with NDV and bytes for the blob from the `eval` 
method. We will create a separate aggregate for each column that is being 
analyzed. Happy to talk more about this offline, if needed. 



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile;
+    try {
+      statisticsFile = writeStatsFile(blobs);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    table.updateStatistics().setStatistics(snapshot.snapshotId(), 
statisticsFile).commit();
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
+  }
+
+  private StatisticsFile writeStatsFile(List<Blob> blobs) throws IOException {
+    LOG.info("Writing stats for table {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    TableOperations operations = ((HasTableOperations) table).operations();
+    String path = operations.metadataFileLocation(String.format("%s.stats", 
UUID.randomUUID()));

Review Comment:
   I'd consider adding a helper method to generate the path and include 
snapshot ID in it.
   
   ```
   private String outputPath() {
     TableOperations operations = ((HasTableOperations) table).operations();
     String fileName = String.format("%s-%s.stats", snapshotId(), 
UUID.randomUUID());
     return operations.metadataFileLocation(fileName);
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return EMPTY_RESULT;
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticsFile;
+    try {
+      statisticsFile = writeStatsFile(blobs);
+    } catch (IOException e) {

Review Comment:
   Let's make `writeStatsFile` not throw any exceptions by adding `catch` after 
`try` there?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+  private static final Result EMPTY_RESULT = 
ImmutableComputeTableStats.Result.builder().build();
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());

Review Comment:
   Minor: What about adding more details here with a helper method?
   
   ```
   private String jobDesc() {
     return String.format(
         "Computing table stats for %s (snapshot_id=%s, columns=%s)",
         table.name(), snapshotId(), columns());
   }
   ```
   
   We will need `snapshotId()`:
   
   ```
   private Long snapshotId() {
     return snapshot != null ? snapshot.snapshotId() : null;
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.ImmutableComputeTableStats;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistics of the given columns and stores it as Puffin 
files. */
+public class ComputeTableStatsSparkAction extends 
BaseSparkAction<ComputeTableStatsSparkAction>
+    implements ComputeTableStats {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
+
+  private final Table table;
+  private Set<String> columns;
+  private Snapshot snapshot;
+
+  ComputeTableStatsSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.snapshot = table.currentSnapshot();
+    this.columns =
+        
table.schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
+  }
+
+  @Override
+  protected ComputeTableStatsSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Computing stats for %s", table.name());
+    JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    if (snapshot == null) {
+      return ImmutableComputeTableStats.Result.builder().build();
+    }
+    LOG.info("Computing stats of {} for snapshot {}", table.name(), 
snapshot.snapshotId());
+    List<Blob> blobs = generateNDVBlobs();
+    StatisticsFile statisticFile;
+    try {
+      statisticFile = writeAndCommitPuffin(blobs);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    return 
ImmutableComputeTableStats.Result.builder().statisticsFile(statisticFile).build();
+  }
+
+  private StatisticsFile writeAndCommitPuffin(List<Blob> blobs) throws 
IOException {
+    LOG.info(
+        "Writing stats to puffin files for table {} for snapshot{}",
+        table.name(),
+        snapshot.snapshotId());
+    TableOperations operations = ((HasTableOperations) table).operations();
+    String path = operations.metadataFileLocation(String.format("%s.stats", 
UUID.randomUUID()));
+    OutputFile outputFile = operations.io().newOutputFile(path);
+    GenericStatisticsFile statisticsFile = generateStatisticsFile(blobs, 
outputFile, path);
+    table.updateStatistics().setStatistics(snapshot.snapshotId(), 
statisticsFile).commit();
+    return statisticsFile;
+  }
+
+  @NotNull
+  private GenericStatisticsFile generateStatisticsFile(
+      List<Blob> blobs, OutputFile outputFile, String path) throws IOException 
{
+    try (PuffinWriter writer =
+        Puffin.write(outputFile).createdBy("Iceberg ComputeTableStats 
action").build()) {
+      blobs.forEach(writer::add);
+      writer.finish();
+      return new GenericStatisticsFile(
+          snapshot.snapshotId(),
+          path,
+          writer.fileSize(),
+          writer.footerSize(),
+          writer.writtenBlobsMetadata().stream()

Review Comment:
   Nope, I meant something like this to `GenericBlobMetadata`. We have `from` 
there that accepts a single object so let's overload it to accept a 
list/collection of objects.
   
   ```
   public static List<BlobMetadata> from(
       Collection<org.apache.iceberg.puffin.BlobMetadata> puffinMetadataList) {
     return puffinMetadataList.stream()
         .map(GenericBlobMetadata::from)
         .collect(ImmutableList.toImmutableList());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to