findepi commented on code in PR #10288: URL: https://github.com/apache/iceberg/pull/10288#discussion_r1639527100
########## core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java: ########## @@ -26,4 +29,8 @@ private StandardBlobTypes() {} * href="https://datasketches.apache.org/">Apache DataSketches</a> library */ public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; + + public static Set<String> blobTypes() { Review Comment: is it supposed to return "all standard blob types"? should the name reflect that? if we did https://github.com/apache/iceberg/pull/8202, would this new blob type be added to this method? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.AnalyzeTable; +import org.apache.iceberg.actions.ImmutableAnalyzeTable; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistic of the given columns and stores it as Puffin files. */ +public class AnalyzeTableSparkAction extends BaseSparkAction<AnalyzeTableSparkAction> + implements AnalyzeTable { + + private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); + + private final Table table; + private Set<String> columns = ImmutableSet.of(); + private Set<String> types = StandardBlobTypes.blobTypes(); Review Comment: This is default blob types to collect, right? When a new blob type is added, it's unlikely to be initially supported by this class. For example, `apache-datasketches-theta-v1` is defined, but current main branch code doesn't yet know how to compute it. I think the `types` should be initialized explicitly with the list of types supported for collection ########## api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Set; + +/** An action that collects statistics of an Iceberg table and writes to Puffin files. */ +public interface AnalyzeTable extends Action<AnalyzeTable, AnalyzeTable.Result> { + /** + * The set of columns to be analyzed + * + * @param columnNames a set of column names to be analyzed + * @return this for method chaining + */ + AnalyzeTable columns(String... columnNames); Review Comment: if we gonna support multiple different blob types (`types(Set<String)`), should the `columns` be specified for them separately? For example i could want to calculate NDV for col A and B, and correlation blob type for {B,C} together. i think we should replace `types(Set<String>)` and `columns(String...)` with just one method: ``` type(String type, Set<String> columns) ``` (not sure about the name, it should be cumulative) ########## api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Set; + +/** An action that collects statistics of an Iceberg table and writes to Puffin files. */ +public interface AnalyzeTable extends Action<AnalyzeTable, AnalyzeTable.Result> { + /** + * The set of columns to be analyzed + * + * @param columnNames a set of column names to be analyzed + * @return this for method chaining + */ + AnalyzeTable columns(String... columnNames); + + /** + * A set of statistics to be collected + * + * @param types set of statistics to be collected + * @return this for method chaining + */ + AnalyzeTable types(Set<String> types); Review Comment: What are allowed values for the `types` parameter? How can someone interacting with the javadoc learn this? is it "stats types", "blob types" or something else? if "blob types", we could link to https://iceberg.apache.org/puffin-spec/#blob-types , but i don't think we can assume that all known blob types will be supported by the code at all times. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ThetaSketchJavaSerializable.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.datasketches.theta.UpdateSketch; + +class ThetaSketchJavaSerializable implements Serializable { + + private Sketch sketch; + + ThetaSketchJavaSerializable() {} + + ThetaSketchJavaSerializable(final Sketch sketch) { + this.sketch = sketch; + } + + Sketch getSketch() { + return sketch; + } + + CompactSketch getCompactSketch() { + if (sketch == null) { + return null; + } + if (sketch instanceof UpdateSketch) { + return sketch.compact(); + } + return (CompactSketch) sketch; + } + + void update(final String value) { + if (sketch == null) { + sketch = UpdateSketch.builder().build(); + } + if (sketch instanceof UpdateSketch) { + ((UpdateSketch) sketch).update(value); + } else { + throw new RuntimeException("update() on read-only sketch"); + } + } + + double getEstimate() { + if (sketch == null) { + return 0.0; + } + return sketch.getEstimate(); + } + + private void writeObject(final ObjectOutputStream out) throws IOException { + if (sketch == null) { + out.writeInt(0); + return; + } + final byte[] serializedSketchBytes = sketch.compact().toByteArray(); + out.writeInt(serializedSketchBytes.length); + out.write(serializedSketchBytes); + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + final int length = in.readInt(); + if (length == 0) { + return; + } + final byte[] serializedSketchBytes = new byte[length]; + in.readFully(serializedSketchBytes); + sketch = Sketches.wrapSketch(Memory.wrap(serializedSketchBytes)); Review Comment: we wrote a compact sketch, so we can use `CompactSketch.wrap` here ########## api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Set; + +/** An action that collects statistics of an Iceberg table and writes to Puffin files. */ +public interface AnalyzeTable extends Action<AnalyzeTable, AnalyzeTable.Result> { + /** + * The set of columns to be analyzed + * + * @param columns a set of column names to be analyzed + * @return this for method chaining + */ + AnalyzeTable columns(Set<String> columns); + + /** + * A set of statistics to be collected on the given columns of the given table + * + * @param statsToBeCollected set of statistics to be collected + * @return this for method chaining + */ + AnalyzeTable stats(Set<String> statsToBeCollected); + Review Comment: the `snapshot(String snapshotId)` has been added for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = + NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed); + Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap(); + + tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1, tuple._2)); + return writeToPuffin(table, Lists.newArrayList(columnsToBeAnalyzed), sketchMap); + } + + private static StatisticsFile writeToPuffin( + Table table, List<String> columns, Map<String, ThetaSketchJavaSerializable> sketchMap) + throws IOException { + int columnSizes = columns.size(); + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = ((HasTableOperations) table).operations().io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) { + for (int i = 0; i < columnSizes; i++) { + writer.add( + new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(table.schema().findField(columns.get(i)).fieldId()), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()), + null, Review Comment: > since the sketch will be a single long per column the sketch should be more than that, small number of kb iirc Trino uses ZSTD for the blobs, and no compression for the footer. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + List<String> columnList; + if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) { + columnList = + table.schema().columns().stream() + .map(Types.NestedField::name) + .collect(Collectors.toList()); + } else { + columnList = Lists.newArrayList(columnsToBeAnalyzed); + } + Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = + computeNDVSketches(spark, table.name(), snapshotId, columnList); + Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap(); + + tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1, tuple._2)); + return writeToPuffin(table, columnList, sketchMap); + } + + private static StatisticsFile writeToPuffin( + Table table, List<String> columns, Map<String, ThetaSketchJavaSerializable> sketchMap) + throws IOException { + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = operations.io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) { + for (String columnName : columns) { + writer.add( + new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(table.schema().findField(columnName).fieldId()), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()), Review Comment: i think there should be `compact()` (perhaps not here, but inside `computeNDVSketches` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + List<String> columnList; + if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) { Review Comment: if this is called as varargs, we don't need to support null if this is not called as varargs, change columnsToBeAnalyzed to be Set or List ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + List<String> columnList; + if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) { + columnList = + table.schema().columns().stream() + .map(Types.NestedField::name) + .collect(Collectors.toList()); + } else { + columnList = Lists.newArrayList(columnsToBeAnalyzed); + } + Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = + computeNDVSketches(spark, table.name(), snapshotId, columnList); + Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap(); + + tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1, tuple._2)); + return writeToPuffin(table, columnList, sketchMap); + } + + private static StatisticsFile writeToPuffin( + Table table, List<String> columns, Map<String, ThetaSketchJavaSerializable> sketchMap) + throws IOException { + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = operations.io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) { + for (String columnName : columns) { + writer.add( + new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(table.schema().findField(columnName).fieldId()), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()), Review Comment: BTW it would be good to have a cross-engine compatibility test to ensure the value we write here can indeed be used correctly by other engines. for trino, you can use https://java.testcontainers.org/modules/databases/trino/ Trino already has such tests, but that doesn't cover Iceberg Spark features that are **being** implemented. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.AnalyzeTable; +import org.apache.iceberg.actions.ImmutableAnalyzeTable; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistic of the given columns and stores it as Puffin files. */ +public class AnalyzeTableSparkAction extends BaseSparkAction<AnalyzeTableSparkAction> + implements AnalyzeTable { + + private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); + + private final Table table; + private Set<String> columns = ImmutableSet.of(); + private Set<String> types = StandardBlobTypes.blobTypes(); + private Long snapshotId; + + AnalyzeTableSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected AnalyzeTableSparkAction self() { + return this; + } + + @Override + public Result execute() { + if (snapshotId == null) { + snapshotId = table.currentSnapshot().snapshotId(); + } + String desc = String.format("Analyzing table %s for snapshot id %s", table.name(), snapshotId); + JobGroupInfo info = newJobGroupInfo("ANALYZE-TABLE", desc); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + LOG.info("Starting the analysis of {} for snapshot {}", table.name(), snapshotId); + List<AnalysisResult> analysisResults = + types.stream() + .map( + statsName -> { + switch (statsName) { + case StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1: + return generateNDVAndCommit(); Review Comment: in the future there will be multiple blob/stat types. the commit should be separate from blob generation ########## gradle/libs.versions.toml: ########## @@ -33,7 +33,7 @@ azuresdk-bom = "1.2.23" awssdk-s3accessgrants = "2.0.0" caffeine = "2.9.3" calcite = "1.10.0" -datasketches = "6.0.0" +datasketches="6.0.0" Review Comment: please revert ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.AnalyzeTable; +import org.apache.iceberg.actions.ImmutableAnalyzeTable; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistic of the given columns and stores it as Puffin files. */ +public class AnalyzeTableSparkAction extends BaseSparkAction<AnalyzeTableSparkAction> + implements AnalyzeTable { + + private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); + + private final Table table; + private Set<String> columns = ImmutableSet.of(); Review Comment: arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + public static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = + NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed); + Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap(); + + tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1, tuple._2)); + return writeToPuffin(table, Lists.newArrayList(columnsToBeAnalyzed), sketchMap); + } + + private static StatisticsFile writeToPuffin( + Table table, List<String> columns, Map<String, ThetaSketchJavaSerializable> sketchMap) + throws IOException { + int columnSizes = columns.size(); + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = ((HasTableOperations) table).operations().io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) { + for (int i = 0; i < columnSizes; i++) { + writer.add( + new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(table.schema().findField(columns.get(i)).fieldId()), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()), + null, + ImmutableMap.of())); + } + writer.finish(); Review Comment: i think you need to finish() to get the final fileSize() etc ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import scala.Tuple2; + +public class NDVSketchGenerator { + + private NDVSketchGenerator() {} + + static StatisticsFile generateNDV( + SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) + throws IOException { + List<String> columnList; + if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) { + columnList = + table.schema().columns().stream() + .map(Types.NestedField::name) + .collect(Collectors.toList()); + } else { + columnList = Lists.newArrayList(columnsToBeAnalyzed); + } + Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = + computeNDVSketches(spark, table.name(), snapshotId, columnList); + Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap(); + + tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1, tuple._2)); + return writeToPuffin(table, columnList, sketchMap); + } + + private static StatisticsFile writeToPuffin( + Table table, List<String> columns, Map<String, ThetaSketchJavaSerializable> sketchMap) + throws IOException { + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = operations.io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) { + for (String columnName : columns) { + writer.add( + new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(table.schema().findField(columnName).fieldId()), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()), + null, + ImmutableMap.of())); + } + writer.finish(); + + return new GenericStatisticsFile( + table.currentSnapshot().snapshotId(), + path, + writer.fileSize(), + writer.footerSize(), + writer.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + } + + static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches( + SparkSession spark, String tableName, long snapshotId, List<String> columns) { + Dataset<Row> data = + spark + .read() + .option("snapshot-id", snapshotId) + .table(tableName) + .select(columns.stream().map(functions::col).toArray(Column[]::new)); + + final JavaPairRDD<String, String> pairs = + data.javaRDD() + .flatMap( + row -> { + List<Tuple2<String, String>> columnsList = + Lists.newArrayListWithExpectedSize(columns.size()); + for (int i = 0; i < row.size(); i++) { + columnsList.add(new Tuple2<>(columns.get(i), row.get(i).toString())); Review Comment: this shouldn't use toString this should use `Conversions.toByteBuffer` see https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.AnalyzeTable; +import org.apache.iceberg.actions.ImmutableAnalyzeTable; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistic of the given columns and stores it as Puffin files. */ +public class AnalyzeTableSparkAction extends BaseSparkAction<AnalyzeTableSparkAction> + implements AnalyzeTable { + + private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); + + private final Table table; + private Set<String> columns = ImmutableSet.of(); + private Set<String> types = StandardBlobTypes.blobTypes(); + private Long snapshotId; + + AnalyzeTableSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected AnalyzeTableSparkAction self() { + return this; + } + + @Override + public Result execute() { + if (snapshotId == null) { + snapshotId = table.currentSnapshot().snapshotId(); + } + String desc = String.format("Analyzing table %s for snapshot id %s", table.name(), snapshotId); + JobGroupInfo info = newJobGroupInfo("ANALYZE-TABLE", desc); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + LOG.info("Starting the analysis of {} for snapshot {}", table.name(), snapshotId); + List<AnalysisResult> analysisResults = + types.stream() + .map( + statsName -> { + switch (statsName) { + case StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1: + return generateNDVAndCommit(); + default: + return ImmutableAnalyzeTable.AnalysisResult.builder() + .type(statsName) + .addAllErrors(Lists.newArrayList("Stats type not supported")) + .build(); + } + }) + .collect(Collectors.toList()); + return ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build(); + } + + private boolean analyzableTypes(Set<String> columnNames) { + return columnNames.stream() + .anyMatch( + columnName -> { + Types.NestedField field = table.schema().findField(columnName); + if (field == null) { + throw new ValidationException("No column with %s name in the table", columnName); + } + Type.TypeID type = field.type().typeId(); + return type == Type.TypeID.INTEGER + || type == Type.TypeID.LONG + || type == Type.TypeID.STRING + || type == Type.TypeID.DOUBLE; Review Comment: Why only these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org