aokolnychyi commented on code in PR #10288: URL: https://github.com/apache/iceberg/pull/10288#discussion_r1719138348
########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build Review Comment: Minor: `build` -> `build()`. It is kind of grey area but I feel like the action is substantial enough to include `()`. ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) Review Comment: Let's not do this for every row and define `private lazy val icebergType` as a field of the class. ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) + val byteBuffer = child.dataType match { + case StringType => Conversions.toByteBuffer(icebergType, + SparkValueConverter.convert(icebergType, value.toString)) + case _ => Conversions.toByteBuffer(icebergType, SparkValueConverter.convert(icebergType, value)) + } + buffer.asInstanceOf[UpdateSketch].update(byteBuffer) + } + buffer + } + + override def dataType: DataType = BinaryType + + override def merge(buffer: Sketch, input: Sketch): Sketch = { + new SetOperationBuilder().buildUnion.union(buffer, input) + } + + override def eval(buffer: Sketch): Any = { + buffer.toByteArray Review Comment: Shall we define a helper method that would always compact the sketch before converting to bytes and use it here and during serialization? ``` private def toBytes(sketch: Sketch): Array[Byte] = { val compactSketch = sketch.compact() compactSketch.toByteArray } ``` ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) + val byteBuffer = child.dataType match { + case StringType => Conversions.toByteBuffer(icebergType, Review Comment: I don't think this also covers Decimals. Can we add tests for each supported field type? I'd also move the conversion into a separate method to make the code more readable. ``` override def update(buffer: Sketch, input: InternalRow): Sketch = { val value = child.eval(input) if (value != null) { val icebergValue = toIcebergValue(value) val byteBuffer = Conversions.toByteBuffer(icebergType, icebergValue) buffer.asInstanceOf[UpdateSketch].update(byteBuffer) } buffer } private def toIcebergValue(value: Any): Any = { value match { case s: UTF8String => s.toString case d: Decimal => d.toJavaBigDecimal case _ => value } } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.ImmutableComputeTableStats; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistics of the given columns and stores it as Puffin files. */ +public class ComputeTableStatsSparkAction extends BaseSparkAction<ComputeTableStatsSparkAction> + implements ComputeTableStats { + + private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class); + private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build(); + + private final Table table; + private Set<String> columns; + private Snapshot snapshot; + + ComputeTableStatsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.snapshot = table.currentSnapshot(); + } + + @Override + protected ComputeTableStatsSparkAction self() { + return this; + } + + @Override + public ComputeTableStats columns(String... newColumns) { + Preconditions.checkArgument( + newColumns != null && newColumns.length > 0, "Columns cannot be null/empty"); + this.columns = ImmutableSet.copyOf(newColumns); + return this; + } + + @Override + public ComputeTableStats snapshot(long newSnapshotId) { + Snapshot newSnapshot = table.snapshot(newSnapshotId); + Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId); + this.snapshot = newSnapshot; + return this; + } + + @Override + public Result execute() { + JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + if (snapshot == null) { + return EMPTY_RESULT; + } + validateColumns(); + LOG.info( + "Computing stats for columns {} in {} (snapshot {})", + columns(), + table.name(), + snapshotId()); + List<Blob> blobs = generateNDVBlobs(); + StatisticsFile statisticsFile; + statisticsFile = writeStatsFile(blobs); + Review Comment: Is this empty line intentional? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.ImmutableComputeTableStats; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistics of the given columns and stores it as Puffin files. */ +public class ComputeTableStatsSparkAction extends BaseSparkAction<ComputeTableStatsSparkAction> + implements ComputeTableStats { + + private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class); + private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build(); + + private final Table table; + private Set<String> columns; + private Snapshot snapshot; + + ComputeTableStatsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.snapshot = table.currentSnapshot(); + } + + @Override + protected ComputeTableStatsSparkAction self() { + return this; + } + + @Override + public ComputeTableStats columns(String... newColumns) { + Preconditions.checkArgument( + newColumns != null && newColumns.length > 0, "Columns cannot be null/empty"); + this.columns = ImmutableSet.copyOf(newColumns); + return this; + } + + @Override + public ComputeTableStats snapshot(long newSnapshotId) { + Snapshot newSnapshot = table.snapshot(newSnapshotId); + Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId); + this.snapshot = newSnapshot; + return this; + } + + @Override + public Result execute() { + JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + if (snapshot == null) { + return EMPTY_RESULT; + } + validateColumns(); + LOG.info( + "Computing stats for columns {} in {} (snapshot {})", + columns(), + table.name(), + snapshotId()); + List<Blob> blobs = generateNDVBlobs(); + StatisticsFile statisticsFile; + statisticsFile = writeStatsFile(blobs); + + table.updateStatistics().setStatistics(snapshot.snapshotId(), statisticsFile).commit(); + return ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build(); + } + + private StatisticsFile writeStatsFile(List<Blob> blobs) { + LOG.info("Writing stats for table {} for snapshot {}", table.name(), snapshot.snapshotId()); + TableOperations operations = ((HasTableOperations) table).operations(); + OutputFile outputFile = operations.io().newOutputFile(outputPath()); + try (PuffinWriter writer = Puffin.write(outputFile).createdBy(appIdentifier()).build()) { + blobs.forEach(writer::add); + writer.finish(); + return new GenericStatisticsFile( + snapshot.snapshotId(), + outputFile.location(), + writer.fileSize(), + writer.footerSize(), + GenericBlobMetadata.from(writer.writtenBlobsMetadata())); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + private List<Blob> generateNDVBlobs() { + return NDVSketchGenerator.generateNDVSketchesAndBlobs(spark(), table, snapshot, columns()); + } + + private Set<String> columns() { Review Comment: I think this poses a problem. Calling this action on a table with nested columns will always fail. Instead, we should create an implementation of `SchemaVisitor` and collect all applicable columns for which we can derive sketches. The logic for deriving columns can live in `NDVSketchUtil`. ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( Review Comment: Can we add some doc to this class, please? If I understand correctly, the spec requires us to use the ALPHA family when iterating through values but then convert the result to a compact sketch for storage. Coping some sentences from the spec would be great to make sure nobody changes it later. https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.spark.actions.NDVSketchGenerator.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.BlobMetadata; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; + +public class TestComputeTableStatsAction extends CatalogTestBase { Review Comment: Can we add more tests: - Multiple tasks and Spark needs to merge buffers. - All supported data types similar to what we did in `TestStoragePartitionedJoins`? - Successful computation of stats on a table with nested columns without providing explicit columns. - Successful computation of stats on a table with referencing nested columns explicitly. ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) Review Comment: What about accepting `colName: String` here instead to simplify the caller? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List<Blob> generateNDVSketchesAndBlobs( Review Comment: What about simply calling this method `generateSketches`? It is already clear from the class name what it does. I'd prefer shorter names so that we won't have to split statements into multiple lines later. ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.spark.actions.NDVSketchGenerator.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.BlobMetadata; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; + +public class TestComputeTableStatsAction extends CatalogTestBase { + + private static final Types.StructType LEAF_STRUCT_TYPE = + Types.StructType.of( + optional(1, "leafLongCol", Types.LongType.get()), + optional(2, "leafDoubleCol", Types.DoubleType.get())); + + private static final Types.StructType NESTED_STRUCT_TYPE = + Types.StructType.of(required(3, "leafStructCol", LEAF_STRUCT_TYPE)); + + private static final Schema NESTED_SCHEMA = + new Schema(required(4, "nestedStructCol", NESTED_STRUCT_TYPE)); + + @TestTemplate + public void testComputeTableStatsAction() throws NoSuchTableException, ParseException { + assumeTrue(catalogName.equals("spark_catalog")); + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + + List<SimpleRecord> records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result results = + actions.computeTableStats(table).columns("id", "data").execute(); + assertNotNull(results); + + List<StatisticsFile> statisticsFiles = table.statisticsFiles(); + Assertions.assertEquals(statisticsFiles.size(), 1); + + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertNotEquals(statisticsFile.fileSizeInBytes(), 0); + Assertions.assertEquals(statisticsFile.blobMetadata().size(), 2); + + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + Assertions.assertEquals( Review Comment: Is there a dedicated assertion for maps? ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) + val byteBuffer = child.dataType match { + case StringType => Conversions.toByteBuffer(icebergType, + SparkValueConverter.convert(icebergType, value.toString)) + case _ => Conversions.toByteBuffer(icebergType, SparkValueConverter.convert(icebergType, value)) + } + buffer.asInstanceOf[UpdateSketch].update(byteBuffer) + } + buffer + } + + override def dataType: DataType = BinaryType Review Comment: Let's move `dataType` and `nullable` right below the constructor. It is a common practice in Spark as it helps understand some basic info about a class before going into details. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List<Blob> generateNDVSketchesAndBlobs( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columns) { + Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, snapshot, columns); + return generateBlobs(table, columns, columnToSketchMap, snapshot); + } + + private static List<Blob> generateBlobs( + Table table, Set<String> columns, Map<Integer, Sketch> sketchMap, Snapshot snapshot) { + return columns.stream() + .map( + columnName -> { + Schema schema = table.schemas().get(snapshot.schemaId()); + Types.NestedField field = schema.findField(columnName); + Sketch sketch = sketchMap.get(field.fieldId()); + long ndv = (long) sketch.getEstimate(); + return new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.wrap(sketch.toByteArray()), + null, + ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, String.valueOf(ndv))); + }) + .collect(Collectors.toList()); + } + + private static Map<Integer, Sketch> computeNDVSketches( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columnsToBeAnalyzed) { + Map<Integer, Sketch> sketchMap = Maps.newHashMap(); + String tableName = table.name(); + List<String> columns = ImmutableList.copyOf(columnsToBeAnalyzed); + + Column[] aggregateColumns = + columns.stream() + .map( + columnName -> { + ThetaSketchAggregator thetaSketchAggregator = + new ThetaSketchAggregator(new Column(columnName).expr()); + return new Column(thetaSketchAggregator.toAggregateExpression()); + }) + .toArray(Column[]::new); + Dataset<Row> sketches = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId()) + .load(tableName) + .select(aggregateColumns); + + Row rows = sketches.collectAsList().get(0); Review Comment: We can use `.first()` and chain it with the rest of the call. ``` Row sketches = spark .read() .format("iceberg") ... .first(); ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List<Blob> generateNDVSketchesAndBlobs( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columns) { + Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, snapshot, columns); + return generateBlobs(table, columns, columnToSketchMap, snapshot); + } + + private static List<Blob> generateBlobs( + Table table, Set<String> columns, Map<Integer, Sketch> sketchMap, Snapshot snapshot) { + return columns.stream() + .map( + columnName -> { + Schema schema = table.schemas().get(snapshot.schemaId()); + Types.NestedField field = schema.findField(columnName); + Sketch sketch = sketchMap.get(field.fieldId()); + long ndv = (long) sketch.getEstimate(); + return new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.wrap(sketch.toByteArray()), + null, + ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, String.valueOf(ndv))); + }) + .collect(Collectors.toList()); + } + + private static Map<Integer, Sketch> computeNDVSketches( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columnsToBeAnalyzed) { + Map<Integer, Sketch> sketchMap = Maps.newHashMap(); + String tableName = table.name(); + List<String> columns = ImmutableList.copyOf(columnsToBeAnalyzed); + + Column[] aggregateColumns = + columns.stream() + .map( + columnName -> { + ThetaSketchAggregator thetaSketchAggregator = + new ThetaSketchAggregator(new Column(columnName).expr()); + return new Column(thetaSketchAggregator.toAggregateExpression()); + }) + .toArray(Column[]::new); + Dataset<Row> sketches = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId()) + .load(tableName) + .select(aggregateColumns); + + Row rows = sketches.collectAsList().get(0); + Schema schema = table.schemas().get(snapshot.schemaId()); + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = schema.findField(columns.get(i)); + sketchMap.put(field.fieldId(), Sketches.wrapSketch(Memory.wrap((byte[]) rows.get(i)))); Review Comment: Let's use `CompactSketch.wrap()` to ensure it is `CompactSketch`. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { Review Comment: What about calling it `NDVSketchUtil` so that it's not limited to only generating sketches? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List<Blob> generateNDVSketchesAndBlobs( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columns) { + Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, snapshot, columns); + return generateBlobs(table, columns, columnToSketchMap, snapshot); + } + + private static List<Blob> generateBlobs( + Table table, Set<String> columns, Map<Integer, Sketch> sketchMap, Snapshot snapshot) { + return columns.stream() + .map( + columnName -> { + Schema schema = table.schemas().get(snapshot.schemaId()); + Types.NestedField field = schema.findField(columnName); + Sketch sketch = sketchMap.get(field.fieldId()); + long ndv = (long) sketch.getEstimate(); + return new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.wrap(sketch.toByteArray()), + null, + ImmutableMap.of(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, String.valueOf(ndv))); + }) + .collect(Collectors.toList()); + } + + private static Map<Integer, Sketch> computeNDVSketches( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columnsToBeAnalyzed) { + Map<Integer, Sketch> sketchMap = Maps.newHashMap(); + String tableName = table.name(); + List<String> columns = ImmutableList.copyOf(columnsToBeAnalyzed); Review Comment: I not a big a fan of using both `Set` and `List`, it gets confusing quickly. Let's accept `List` in this class and update the action to dedup the values. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAggregator; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List<Blob> generateNDVSketchesAndBlobs( + SparkSession spark, Table table, Snapshot snapshot, Set<String> columns) { + Map<Integer, Sketch> columnToSketchMap = computeNDVSketches(spark, table, snapshot, columns); + return generateBlobs(table, columns, columnToSketchMap, snapshot); + } + + private static List<Blob> generateBlobs( + Table table, Set<String> columns, Map<Integer, Sketch> sketchMap, Snapshot snapshot) { + return columns.stream() + .map( + columnName -> { + Schema schema = table.schemas().get(snapshot.schemaId()); + Types.NestedField field = schema.findField(columnName); + Sketch sketch = sketchMap.get(field.fieldId()); + long ndv = (long) sketch.getEstimate(); + return new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.wrap(sketch.toByteArray()), + null, Review Comment: Compression? ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) + val byteBuffer = child.dataType match { + case StringType => Conversions.toByteBuffer(icebergType, + SparkValueConverter.convert(icebergType, value.toString)) + case _ => Conversions.toByteBuffer(icebergType, SparkValueConverter.convert(icebergType, value)) + } + buffer.asInstanceOf[UpdateSketch].update(byteBuffer) + } + buffer + } + + override def dataType: DataType = BinaryType + + override def merge(buffer: Sketch, input: Sketch): Sketch = { + new SetOperationBuilder().buildUnion.union(buffer, input) Review Comment: I don't think current tests invoke this method (we need multiple tasks, I guess). Can we make sure it is invoked and works correctly? I saw that Trino and other examples call `getResult()` on `Union` explicitly. I don't have a preference as long as it works, just checking. ``` val union = Sketches.setOperationBuilder().buildUnion() union.union(buffer) union.union(input) union.getResult() ``` ########## spark/v3.5/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAggregator.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.Sketches +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkValueConverter +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StringType + +case class ThetaSketchAggregator( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + def this(child: Expression) = this(child, 0, 0) + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergType = SparkSchemaUtil.convert(child.dataType) + val byteBuffer = child.dataType match { + case StringType => Conversions.toByteBuffer(icebergType, + SparkValueConverter.convert(icebergType, value.toString)) + case _ => Conversions.toByteBuffer(icebergType, SparkValueConverter.convert(icebergType, value)) + } + buffer.asInstanceOf[UpdateSketch].update(byteBuffer) + } + buffer + } + + override def dataType: DataType = BinaryType + + override def merge(buffer: Sketch, input: Sketch): Sketch = { + new SetOperationBuilder().buildUnion.union(buffer, input) + } + + override def eval(buffer: Sketch): Any = { + buffer.toByteArray + } + + override def serialize(buffer: Sketch): Array[Byte] = { + buffer.compact().toByteArray + } + + override def deserialize(storageFormat: Array[Byte]): Sketch = { + Sketches.wrapSketch(Memory.wrap(storageFormat)) Review Comment: What about using `CompactSketch.wrap()` to ensure we are dealing with `CompactSketch`? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.ImmutableComputeTableStats; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistics of the given columns and stores it as Puffin files. */ +public class ComputeTableStatsSparkAction extends BaseSparkAction<ComputeTableStatsSparkAction> + implements ComputeTableStats { + + private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class); + private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build(); + + private final Table table; + private Set<String> columns; + private Snapshot snapshot; + + ComputeTableStatsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.snapshot = table.currentSnapshot(); + } + + @Override + protected ComputeTableStatsSparkAction self() { + return this; + } + + @Override + public ComputeTableStats columns(String... newColumns) { + Preconditions.checkArgument( + newColumns != null && newColumns.length > 0, "Columns cannot be null/empty"); + this.columns = ImmutableSet.copyOf(newColumns); + return this; + } + + @Override + public ComputeTableStats snapshot(long newSnapshotId) { + Snapshot newSnapshot = table.snapshot(newSnapshotId); + Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId); + this.snapshot = newSnapshot; + return this; + } + + @Override + public Result execute() { + JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + if (snapshot == null) { + return EMPTY_RESULT; + } + validateColumns(); + LOG.info( + "Computing stats for columns {} in {} (snapshot {})", + columns(), + table.name(), + snapshotId()); + List<Blob> blobs = generateNDVBlobs(); + StatisticsFile statisticsFile; Review Comment: Let's not split the var declaration and call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org