huaxingao commented on code in PR #10659:
URL: https://github.com/apache/iceberg/pull/10659#discussion_r1693697463


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -175,7 +185,46 @@ public Statistics estimateStatistics() {
   protected Statistics estimateStatistics(Snapshot snapshot) {
     // its a fresh table, no data
     if (snapshot == null) {
-      return new Stats(0L, 0L);
+      return new Stats(0L, 0L, Collections.emptyMap());
+    }
+
+    boolean cboEnabled =
+        Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), 
"false"));
+    Map<NamedReference, ColumnStatistics> colStatsMap = null;
+    if (readConf.reportColumnStats() && cboEnabled) {
+      colStatsMap = Maps.newHashMap();
+      List<StatisticsFile> files = table.statisticsFiles();
+      if (!files.isEmpty()) {
+        List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+        for (BlobMetadata blobMetadata : metadataList) {
+          int id = blobMetadata.fields().get(0);
+          String colName = table.schema().findColumnName(id);
+          NamedReference ref = FieldReference.column(colName);
+
+          Long ndv = null;
+          if (blobMetadata
+              .type()
+              
.equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1))
 {
+            String ndvStr = blobMetadata.properties().get(NDV_KEY);
+            if (ndvStr != null && !ndvStr.isEmpty()) {
+              ndv = Long.parseLong(ndvStr);
+            } else {
+              LOG.debug("ndv is not set in BlobMetadata for column {}", 
colName);
+            }
+          } else {
+            LOG.debug("DataSketch blob is not available for column {}", 
colName);
+          }
+
+          // TODO: Fill min, max and null from the manifest file

Review Comment:
   Created [issue](https://github.com/apache/iceberg/issues/10791)



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java:
##########
@@ -130,6 +142,98 @@ public void testEstimatedRowCount() throws 
NoSuchTableException {
     assertThat(stats.numRows().getAsLong()).isEqualTo(10000L);
   }
 
+  @TestTemplate
+  public void testColStats() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    SparkScanBuilder scanBuilder =
+        new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
+    SparkScan scan = (SparkScan) scanBuilder.build();
+
+    Map<String, String> reportColStatsDisabled =
+        ImmutableMap.of(
+            SQLConf.CBO_ENABLED().key(), "true", 
SparkSQLProperties.REPORT_COLUMN_STATS, "false");
+
+    Map<String, String> reportColStatsEnabled =
+        ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");
+
+    // Test table does not have col stats
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+    // The expected col NDVs are nulls
+    withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 
4L, new HashMap<>()));
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId,
+            "/test/statistics/file.puffin",
+            100,
+            42,
+            ImmutableList.of(
+                new GenericBlobMetadata(
+                    APACHE_DATASKETCHES_THETA_V1,
+                    snapshotId,
+                    1,
+                    ImmutableList.of(1),
+                    ImmutableMap.of("ndv", "4"))));
+
+    table.updateStatistics().setStatistics(snapshotId, 
statisticsFile).commit();
+
+    // Test where 1 column has NDV and the other does not have NDV
+    checkColStatisticsNotReported(scan, 4L);
+    withSQLConf(reportColStatsDisabled, () -> 
checkColStatisticsNotReported(scan, 4L));
+
+    HashMap<String, Long> expectedOneNDV = new HashMap<>();
+    expectedOneNDV.put("id", 4L);
+    withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 
4L, expectedOneNDV));
+
+    statisticsFile =

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to