karuppayya commented on code in PR #10659: URL: https://github.com/apache/iceberg/pull/10659#discussion_r1681586861
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ########## @@ -90,4 +90,8 @@ private SparkSQLProperties() {} public static final String EXECUTOR_CACHE_LOCALITY_ENABLED = "spark.sql.iceberg.executor-cache.locality.enabled"; public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false; + + // Controls whether to calculate column statistics and report them to Spark Review Comment: nit: should we say that this flag controls whetehre stats should be reported to Spark, since the computation is done off band? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -175,7 +184,37 @@ public Statistics estimateStatistics() { protected Statistics estimateStatistics(Snapshot snapshot) { // its a fresh table, no data if (snapshot == null) { - return new Stats(0L, 0L); + return new Stats(0L, 0L, Collections.emptyMap()); + } + + boolean cboEnabled = + Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false")); + Map<NamedReference, ColumnStatistics> colStatsMap = null; + if (readConf.enableColumnStats() && cboEnabled) { + colStatsMap = Maps.newHashMap(); + List<StatisticsFile> files = table.statisticsFiles(); + if (!files.isEmpty()) { + List<BlobMetadata> metadataList = (files.get(0)).blobMetadata(); + + for (BlobMetadata blobMetadata : metadataList) { + int id = blobMetadata.fields().get(0); Review Comment: Should we check if there is more than one field here(for example, ndv stats is collected for say field1 and field2) and not propagate the stats if so? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -175,7 +181,25 @@ public Statistics estimateStatistics() { protected Statistics estimateStatistics(Snapshot snapshot) { // its a fresh table, no data if (snapshot == null) { - return new Stats(0L, 0L); + return new Stats(0L, 0L, Maps.newHashMap()); + } + + Map<NamedReference, ColumnStatistics> map = Maps.newHashMap(); + + if (readConf.enableColumnStats()) { + List<StatisticsFile> files = table.statisticsFiles(); + if (!files.isEmpty()) { + List<BlobMetadata> metadataList = (files.get(0)).blobMetadata(); Review Comment: created https://github.com/apache/iceberg/issues/10693 ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java: ########## @@ -734,6 +795,19 @@ private Expression[] expressions(Expression... expressions) { return expressions; } + private void checkStatistics(SparkScan scan, long expectedRowCount, boolean expectedColumnStats) { Review Comment: when `expectedColumnStats` is false, can we assert the absence of stats ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -175,7 +184,37 @@ public Statistics estimateStatistics() { protected Statistics estimateStatistics(Snapshot snapshot) { // its a fresh table, no data if (snapshot == null) { - return new Stats(0L, 0L); + return new Stats(0L, 0L, Collections.emptyMap()); + } + + boolean cboEnabled = + Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false")); + Map<NamedReference, ColumnStatistics> colStatsMap = null; + if (readConf.enableColumnStats() && cboEnabled) { + colStatsMap = Maps.newHashMap(); + List<StatisticsFile> files = table.statisticsFiles(); + if (!files.isEmpty()) { + List<BlobMetadata> metadataList = (files.get(0)).blobMetadata(); + + for (BlobMetadata blobMetadata : metadataList) { + int id = blobMetadata.fields().get(0); + String colName = table.schema().findColumnName(id); + NamedReference ref = FieldReference.column(colName); + + long ndv = 0; + String ndvStr = blobMetadata.properties().get("ndv"); + if (ndvStr != null && !ndvStr.isEmpty()) { + ndv = Long.parseLong(blobMetadata.properties().get("ndv")); + } else { + LOG.debug("ndv is not set in BlobMetadata for column {}", colName); + } + + // TODO: Fill min, max and null from the manifest file + ColumnStatistics colStats = new SparkColumnStatistics(ndv, null, null, 0L, 0L, 0L, null); Review Comment: When ndv is not set, we are sending in 0. Is that intended? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java: ########## @@ -175,7 +184,37 @@ public Statistics estimateStatistics() { protected Statistics estimateStatistics(Snapshot snapshot) { // its a fresh table, no data if (snapshot == null) { - return new Stats(0L, 0L); + return new Stats(0L, 0L, Collections.emptyMap()); + } + + boolean cboEnabled = + Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false")); + Map<NamedReference, ColumnStatistics> colStatsMap = null; + if (readConf.enableColumnStats() && cboEnabled) { + colStatsMap = Maps.newHashMap(); + List<StatisticsFile> files = table.statisticsFiles(); + if (!files.isEmpty()) { + List<BlobMetadata> metadataList = (files.get(0)).blobMetadata(); + + for (BlobMetadata blobMetadata : metadataList) { + int id = blobMetadata.fields().get(0); + String colName = table.schema().findColumnName(id); + NamedReference ref = FieldReference.column(colName); + + long ndv = 0; + String ndvStr = blobMetadata.properties().get("ndv"); Review Comment: nit: "ndv" could be static final? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org