findepi commented on code in PR #10288:
URL: https://github.com/apache/iceberg/pull/10288#discussion_r1639527100
##########
core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java:
##########
@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 =
"apache-datasketches-theta-v1";
+
+ public static Set<String> blobTypes() {
Review Comment:
is it supposed to return "all standard blob types"?
should the name reflect that?
if we did https://github.com/apache/iceberg/pull/8202, would this new blob
type be added to this method?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.AnalyzeTable;
+import org.apache.iceberg.actions.ImmutableAnalyzeTable;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistic of the given columns and stores it as Puffin files.
*/
+public class AnalyzeTableSparkAction extends
BaseSparkAction<AnalyzeTableSparkAction>
+ implements AnalyzeTable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AnalyzeTableSparkAction.class);
+
+ private final Table table;
+ private Set<String> columns = ImmutableSet.of();
+ private Set<String> types = StandardBlobTypes.blobTypes();
Review Comment:
This is default blob types to collect, right?
When a new blob type is added, it's unlikely to be initially supported by
this class. For example, `apache-datasketches-theta-v1` is defined, but current
main branch code doesn't yet know how to compute it.
I think the `types` should be initialized explicitly with the list of types
supported for collection
##########
api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Set;
+
+/** An action that collects statistics of an Iceberg table and writes to
Puffin files. */
+public interface AnalyzeTable extends Action<AnalyzeTable,
AnalyzeTable.Result> {
+ /**
+ * The set of columns to be analyzed
+ *
+ * @param columnNames a set of column names to be analyzed
+ * @return this for method chaining
+ */
+ AnalyzeTable columns(String... columnNames);
Review Comment:
if we gonna support multiple different blob types (`types(Set<String)`),
should the `columns` be specified for them separately? For example i could want
to calculate NDV for col A and B, and correlation blob type for {B,C} together.
i think we should replace `types(Set<String>)` and `columns(String...)` with
just one method:
```
type(String type, Set<String> columns)
```
(not sure about the name, it should be cumulative)
##########
api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Set;
+
+/** An action that collects statistics of an Iceberg table and writes to
Puffin files. */
+public interface AnalyzeTable extends Action<AnalyzeTable,
AnalyzeTable.Result> {
+ /**
+ * The set of columns to be analyzed
+ *
+ * @param columnNames a set of column names to be analyzed
+ * @return this for method chaining
+ */
+ AnalyzeTable columns(String... columnNames);
+
+ /**
+ * A set of statistics to be collected
+ *
+ * @param types set of statistics to be collected
+ * @return this for method chaining
+ */
+ AnalyzeTable types(Set<String> types);
Review Comment:
What are allowed values for the `types` parameter? How can someone
interacting with the javadoc learn this?
is it "stats types", "blob types" or something else?
if "blob types", we could link to
https://iceberg.apache.org/puffin-spec/#blob-types , but i don't think we can
assume that all known blob types will be supported by the code at all times.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ThetaSketchJavaSerializable.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
+
+class ThetaSketchJavaSerializable implements Serializable {
+
+ private Sketch sketch;
+
+ ThetaSketchJavaSerializable() {}
+
+ ThetaSketchJavaSerializable(final Sketch sketch) {
+ this.sketch = sketch;
+ }
+
+ Sketch getSketch() {
+ return sketch;
+ }
+
+ CompactSketch getCompactSketch() {
+ if (sketch == null) {
+ return null;
+ }
+ if (sketch instanceof UpdateSketch) {
+ return sketch.compact();
+ }
+ return (CompactSketch) sketch;
+ }
+
+ void update(final String value) {
+ if (sketch == null) {
+ sketch = UpdateSketch.builder().build();
+ }
+ if (sketch instanceof UpdateSketch) {
+ ((UpdateSketch) sketch).update(value);
+ } else {
+ throw new RuntimeException("update() on read-only sketch");
+ }
+ }
+
+ double getEstimate() {
+ if (sketch == null) {
+ return 0.0;
+ }
+ return sketch.getEstimate();
+ }
+
+ private void writeObject(final ObjectOutputStream out) throws IOException {
+ if (sketch == null) {
+ out.writeInt(0);
+ return;
+ }
+ final byte[] serializedSketchBytes = sketch.compact().toByteArray();
+ out.writeInt(serializedSketchBytes.length);
+ out.write(serializedSketchBytes);
+ }
+
+ private void readObject(final ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ final int length = in.readInt();
+ if (length == 0) {
+ return;
+ }
+ final byte[] serializedSketchBytes = new byte[length];
+ in.readFully(serializedSketchBytes);
+ sketch = Sketches.wrapSketch(Memory.wrap(serializedSketchBytes));
Review Comment:
we wrote a compact sketch, so we can use `CompactSketch.wrap` here
##########
api/src/main/java/org/apache/iceberg/actions/AnalyzeTable.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Set;
+
+/** An action that collects statistics of an Iceberg table and writes to
Puffin files. */
+public interface AnalyzeTable extends Action<AnalyzeTable,
AnalyzeTable.Result> {
+ /**
+ * The set of columns to be analyzed
+ *
+ * @param columns a set of column names to be analyzed
+ * @return this for method chaining
+ */
+ AnalyzeTable columns(Set<String> columns);
+
+ /**
+ * A set of statistics to be collected on the given columns of the given
table
+ *
+ * @param statsToBeCollected set of statistics to be collected
+ * @return this for method chaining
+ */
+ AnalyzeTable stats(Set<String> statsToBeCollected);
+
Review Comment:
the `snapshot(String snapshotId)` has been added
for branch/tag -- is it existing pattern to support this first class in
APIs, or require the caller to convert the information they have (branch/tag)
into a snapshot ID?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ public static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
+ NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId,
columnsToBeAnalyzed);
+ Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap();
+
+ tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1,
tuple._2));
+ return writeToPuffin(table, Lists.newArrayList(columnsToBeAnalyzed),
sketchMap);
+ }
+
+ private static StatisticsFile writeToPuffin(
+ Table table, List<String> columns, Map<String,
ThetaSketchJavaSerializable> sketchMap)
+ throws IOException {
+ int columnSizes = columns.size();
+ TableOperations operations = ((HasTableOperations) table).operations();
+ FileIO fileIO = ((HasTableOperations) table).operations().io();
+ String path = operations.metadataFileLocation(String.format("%s.stats",
UUID.randomUUID()));
+ OutputFile outputFile = fileIO.newOutputFile(path);
+ try (PuffinWriter writer =
+ Puffin.write(outputFile).createdBy("Spark
DistinctCountProcedure").build()) {
+ for (int i = 0; i < columnSizes; i++) {
+ writer.add(
+ new Blob(
+ StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+
ImmutableList.of(table.schema().findField(columns.get(i)).fieldId()),
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
+ null,
Review Comment:
> since the sketch will be a single long per column
the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ List<String> columnList;
+ if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) {
+ columnList =
+ table.schema().columns().stream()
+ .map(Types.NestedField::name)
+ .collect(Collectors.toList());
+ } else {
+ columnList = Lists.newArrayList(columnsToBeAnalyzed);
+ }
+ Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
+ computeNDVSketches(spark, table.name(), snapshotId, columnList);
+ Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap();
+
+ tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1,
tuple._2));
+ return writeToPuffin(table, columnList, sketchMap);
+ }
+
+ private static StatisticsFile writeToPuffin(
+ Table table, List<String> columns, Map<String,
ThetaSketchJavaSerializable> sketchMap)
+ throws IOException {
+ TableOperations operations = ((HasTableOperations) table).operations();
+ FileIO fileIO = operations.io();
+ String path = operations.metadataFileLocation(String.format("%s.stats",
UUID.randomUUID()));
+ OutputFile outputFile = fileIO.newOutputFile(path);
+ try (PuffinWriter writer =
+ Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) {
+ for (String columnName : columns) {
+ writer.add(
+ new Blob(
+ StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+
ImmutableList.of(table.schema().findField(columnName).fieldId()),
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()),
Review Comment:
i think there should be `compact()` (perhaps not here, but inside
`computeNDVSketches`
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ List<String> columnList;
+ if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) {
Review Comment:
if this is called as varargs, we don't need to support null
if this is not called as varargs, change columnsToBeAnalyzed to be Set or
List
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ List<String> columnList;
+ if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) {
+ columnList =
+ table.schema().columns().stream()
+ .map(Types.NestedField::name)
+ .collect(Collectors.toList());
+ } else {
+ columnList = Lists.newArrayList(columnsToBeAnalyzed);
+ }
+ Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
+ computeNDVSketches(spark, table.name(), snapshotId, columnList);
+ Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap();
+
+ tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1,
tuple._2));
+ return writeToPuffin(table, columnList, sketchMap);
+ }
+
+ private static StatisticsFile writeToPuffin(
+ Table table, List<String> columns, Map<String,
ThetaSketchJavaSerializable> sketchMap)
+ throws IOException {
+ TableOperations operations = ((HasTableOperations) table).operations();
+ FileIO fileIO = operations.io();
+ String path = operations.metadataFileLocation(String.format("%s.stats",
UUID.randomUUID()));
+ OutputFile outputFile = fileIO.newOutputFile(path);
+ try (PuffinWriter writer =
+ Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) {
+ for (String columnName : columns) {
+ writer.add(
+ new Blob(
+ StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+
ImmutableList.of(table.schema().findField(columnName).fieldId()),
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()),
Review Comment:
BTW it would be good to have a cross-engine compatibility test to ensure the
value we write here can indeed be used correctly by other engines. for trino,
you can use https://java.testcontainers.org/modules/databases/trino/
Trino already has such tests, but that doesn't cover Iceberg Spark features
that are **being** implemented.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.AnalyzeTable;
+import org.apache.iceberg.actions.ImmutableAnalyzeTable;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistic of the given columns and stores it as Puffin files.
*/
+public class AnalyzeTableSparkAction extends
BaseSparkAction<AnalyzeTableSparkAction>
+ implements AnalyzeTable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AnalyzeTableSparkAction.class);
+
+ private final Table table;
+ private Set<String> columns = ImmutableSet.of();
+ private Set<String> types = StandardBlobTypes.blobTypes();
+ private Long snapshotId;
+
+ AnalyzeTableSparkAction(SparkSession spark, Table table) {
+ super(spark);
+ this.table = table;
+ }
+
+ @Override
+ protected AnalyzeTableSparkAction self() {
+ return this;
+ }
+
+ @Override
+ public Result execute() {
+ if (snapshotId == null) {
+ snapshotId = table.currentSnapshot().snapshotId();
+ }
+ String desc = String.format("Analyzing table %s for snapshot id %s",
table.name(), snapshotId);
+ JobGroupInfo info = newJobGroupInfo("ANALYZE-TABLE", desc);
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ private Result doExecute() {
+ LOG.info("Starting the analysis of {} for snapshot {}", table.name(),
snapshotId);
+ List<AnalysisResult> analysisResults =
+ types.stream()
+ .map(
+ statsName -> {
+ switch (statsName) {
+ case StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1:
+ return generateNDVAndCommit();
Review Comment:
in the future there will be multiple blob/stat types. the commit should be
separate from blob generation
##########
gradle/libs.versions.toml:
##########
@@ -33,7 +33,7 @@ azuresdk-bom = "1.2.23"
awssdk-s3accessgrants = "2.0.0"
caffeine = "2.9.3"
calcite = "1.10.0"
-datasketches = "6.0.0"
+datasketches="6.0.0"
Review Comment:
please revert
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.AnalyzeTable;
+import org.apache.iceberg.actions.ImmutableAnalyzeTable;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistic of the given columns and stores it as Puffin files.
*/
+public class AnalyzeTableSparkAction extends
BaseSparkAction<AnalyzeTableSparkAction>
+ implements AnalyzeTable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AnalyzeTableSparkAction.class);
+
+ private final Table table;
+ private Set<String> columns = ImmutableSet.of();
Review Comment:
arrays are mutable, so if we decide to switch to arrays, please make sure to
defensive-copy whenever passing to another class
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ public static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
+ NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId,
columnsToBeAnalyzed);
+ Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap();
+
+ tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1,
tuple._2));
+ return writeToPuffin(table, Lists.newArrayList(columnsToBeAnalyzed),
sketchMap);
+ }
+
+ private static StatisticsFile writeToPuffin(
+ Table table, List<String> columns, Map<String,
ThetaSketchJavaSerializable> sketchMap)
+ throws IOException {
+ int columnSizes = columns.size();
+ TableOperations operations = ((HasTableOperations) table).operations();
+ FileIO fileIO = ((HasTableOperations) table).operations().io();
+ String path = operations.metadataFileLocation(String.format("%s.stats",
UUID.randomUUID()));
+ OutputFile outputFile = fileIO.newOutputFile(path);
+ try (PuffinWriter writer =
+ Puffin.write(outputFile).createdBy("Spark
DistinctCountProcedure").build()) {
+ for (int i = 0; i < columnSizes; i++) {
+ writer.add(
+ new Blob(
+ StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+
ImmutableList.of(table.schema().findField(columns.get(i)).fieldId()),
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
+ null,
+ ImmutableMap.of()));
+ }
+ writer.finish();
Review Comment:
i think you need to finish() to get the final fileSize() etc
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.datasketches.theta.CompactSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.Tuple2;
+
+public class NDVSketchGenerator {
+
+ private NDVSketchGenerator() {}
+
+ static StatisticsFile generateNDV(
+ SparkSession spark, Table table, long snapshotId, String...
columnsToBeAnalyzed)
+ throws IOException {
+ List<String> columnList;
+ if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.length == 0) {
+ columnList =
+ table.schema().columns().stream()
+ .map(Types.NestedField::name)
+ .collect(Collectors.toList());
+ } else {
+ columnList = Lists.newArrayList(columnsToBeAnalyzed);
+ }
+ Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
+ computeNDVSketches(spark, table.name(), snapshotId, columnList);
+ Map<String, ThetaSketchJavaSerializable> sketchMap = Maps.newHashMap();
+
+ tuple2Iterator.forEachRemaining(tuple -> sketchMap.put(tuple._1,
tuple._2));
+ return writeToPuffin(table, columnList, sketchMap);
+ }
+
+ private static StatisticsFile writeToPuffin(
+ Table table, List<String> columns, Map<String,
ThetaSketchJavaSerializable> sketchMap)
+ throws IOException {
+ TableOperations operations = ((HasTableOperations) table).operations();
+ FileIO fileIO = operations.io();
+ String path = operations.metadataFileLocation(String.format("%s.stats",
UUID.randomUUID()));
+ OutputFile outputFile = fileIO.newOutputFile(path);
+ try (PuffinWriter writer =
+ Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) {
+ for (String columnName : columns) {
+ writer.add(
+ new Blob(
+ StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
+
ImmutableList.of(table.schema().findField(columnName).fieldId()),
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()),
+ null,
+ ImmutableMap.of()));
+ }
+ writer.finish();
+
+ return new GenericStatisticsFile(
+ table.currentSnapshot().snapshotId(),
+ path,
+ writer.fileSize(),
+ writer.footerSize(),
+ writer.writtenBlobsMetadata().stream()
+ .map(GenericBlobMetadata::from)
+ .collect(ImmutableList.toImmutableList()));
+ }
+ }
+
+ static Iterator<Tuple2<String, ThetaSketchJavaSerializable>>
computeNDVSketches(
+ SparkSession spark, String tableName, long snapshotId, List<String>
columns) {
+ Dataset<Row> data =
+ spark
+ .read()
+ .option("snapshot-id", snapshotId)
+ .table(tableName)
+
.select(columns.stream().map(functions::col).toArray(Column[]::new));
+
+ final JavaPairRDD<String, String> pairs =
+ data.javaRDD()
+ .flatMap(
+ row -> {
+ List<Tuple2<String, String>> columnsList =
+ Lists.newArrayListWithExpectedSize(columns.size());
+ for (int i = 0; i < row.size(); i++) {
+ columnsList.add(new Tuple2<>(columns.get(i),
row.get(i).toString()));
Review Comment:
this shouldn't use toString
this should use `Conversions.toByteBuffer`
see
https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/AnalyzeTableSparkAction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.AnalyzeTable;
+import org.apache.iceberg.actions.ImmutableAnalyzeTable;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.puffin.StandardBlobTypes;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Computes the statistic of the given columns and stores it as Puffin files.
*/
+public class AnalyzeTableSparkAction extends
BaseSparkAction<AnalyzeTableSparkAction>
+ implements AnalyzeTable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AnalyzeTableSparkAction.class);
+
+ private final Table table;
+ private Set<String> columns = ImmutableSet.of();
+ private Set<String> types = StandardBlobTypes.blobTypes();
+ private Long snapshotId;
+
+ AnalyzeTableSparkAction(SparkSession spark, Table table) {
+ super(spark);
+ this.table = table;
+ }
+
+ @Override
+ protected AnalyzeTableSparkAction self() {
+ return this;
+ }
+
+ @Override
+ public Result execute() {
+ if (snapshotId == null) {
+ snapshotId = table.currentSnapshot().snapshotId();
+ }
+ String desc = String.format("Analyzing table %s for snapshot id %s",
table.name(), snapshotId);
+ JobGroupInfo info = newJobGroupInfo("ANALYZE-TABLE", desc);
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ private Result doExecute() {
+ LOG.info("Starting the analysis of {} for snapshot {}", table.name(),
snapshotId);
+ List<AnalysisResult> analysisResults =
+ types.stream()
+ .map(
+ statsName -> {
+ switch (statsName) {
+ case StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1:
+ return generateNDVAndCommit();
+ default:
+ return ImmutableAnalyzeTable.AnalysisResult.builder()
+ .type(statsName)
+ .addAllErrors(Lists.newArrayList("Stats type not
supported"))
+ .build();
+ }
+ })
+ .collect(Collectors.toList());
+ return
ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build();
+ }
+
+ private boolean analyzableTypes(Set<String> columnNames) {
+ return columnNames.stream()
+ .anyMatch(
+ columnName -> {
+ Types.NestedField field = table.schema().findField(columnName);
+ if (field == null) {
+ throw new ValidationException("No column with %s name in the
table", columnName);
+ }
+ Type.TypeID type = field.type().typeId();
+ return type == Type.TypeID.INTEGER
+ || type == Type.TypeID.LONG
+ || type == Type.TypeID.STRING
+ || type == Type.TypeID.DOUBLE;
Review Comment:
Why only these?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]