karuppayya commented on code in PR #10659:
URL: https://github.com/apache/iceberg/pull/10659#discussion_r1681586861
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -90,4 +90,8 @@ private SparkSQLProperties() {}
public static final String EXECUTOR_CACHE_LOCALITY_ENABLED =
"spark.sql.iceberg.executor-cache.locality.enabled";
public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false;
+
+ // Controls whether to calculate column statistics and report them to Spark
Review Comment:
nit: should we say that this flag controls whetehre stats should be reported
to Spark, since the computation is done off band?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -175,7 +184,37 @@ public Statistics estimateStatistics() {
protected Statistics estimateStatistics(Snapshot snapshot) {
// its a fresh table, no data
if (snapshot == null) {
- return new Stats(0L, 0L);
+ return new Stats(0L, 0L, Collections.emptyMap());
+ }
+
+ boolean cboEnabled =
+ Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(),
"false"));
+ Map<NamedReference, ColumnStatistics> colStatsMap = null;
+ if (readConf.enableColumnStats() && cboEnabled) {
+ colStatsMap = Maps.newHashMap();
+ List<StatisticsFile> files = table.statisticsFiles();
+ if (!files.isEmpty()) {
+ List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+ for (BlobMetadata blobMetadata : metadataList) {
+ int id = blobMetadata.fields().get(0);
Review Comment:
Should we check if there is more than one field here(for example, ndv stats
is collected for say field1 and field2) and not propagate the stats if so?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -175,7 +181,25 @@ public Statistics estimateStatistics() {
protected Statistics estimateStatistics(Snapshot snapshot) {
// its a fresh table, no data
if (snapshot == null) {
- return new Stats(0L, 0L);
+ return new Stats(0L, 0L, Maps.newHashMap());
+ }
+
+ Map<NamedReference, ColumnStatistics> map = Maps.newHashMap();
+
+ if (readConf.enableColumnStats()) {
+ List<StatisticsFile> files = table.statisticsFiles();
+ if (!files.isEmpty()) {
+ List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
Review Comment:
created https://github.com/apache/iceberg/issues/10693
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java:
##########
@@ -734,6 +795,19 @@ private Expression[] expressions(Expression...
expressions) {
return expressions;
}
+ private void checkStatistics(SparkScan scan, long expectedRowCount, boolean
expectedColumnStats) {
Review Comment:
when `expectedColumnStats` is false, can we assert the absence of stats
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -175,7 +184,37 @@ public Statistics estimateStatistics() {
protected Statistics estimateStatistics(Snapshot snapshot) {
// its a fresh table, no data
if (snapshot == null) {
- return new Stats(0L, 0L);
+ return new Stats(0L, 0L, Collections.emptyMap());
+ }
+
+ boolean cboEnabled =
+ Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(),
"false"));
+ Map<NamedReference, ColumnStatistics> colStatsMap = null;
+ if (readConf.enableColumnStats() && cboEnabled) {
+ colStatsMap = Maps.newHashMap();
+ List<StatisticsFile> files = table.statisticsFiles();
+ if (!files.isEmpty()) {
+ List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+ for (BlobMetadata blobMetadata : metadataList) {
+ int id = blobMetadata.fields().get(0);
+ String colName = table.schema().findColumnName(id);
+ NamedReference ref = FieldReference.column(colName);
+
+ long ndv = 0;
+ String ndvStr = blobMetadata.properties().get("ndv");
+ if (ndvStr != null && !ndvStr.isEmpty()) {
+ ndv = Long.parseLong(blobMetadata.properties().get("ndv"));
+ } else {
+ LOG.debug("ndv is not set in BlobMetadata for column {}", colName);
+ }
+
+ // TODO: Fill min, max and null from the manifest file
+ ColumnStatistics colStats = new SparkColumnStatistics(ndv, null,
null, 0L, 0L, 0L, null);
Review Comment:
When ndv is not set, we are sending in 0. Is that intended?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -175,7 +184,37 @@ public Statistics estimateStatistics() {
protected Statistics estimateStatistics(Snapshot snapshot) {
// its a fresh table, no data
if (snapshot == null) {
- return new Stats(0L, 0L);
+ return new Stats(0L, 0L, Collections.emptyMap());
+ }
+
+ boolean cboEnabled =
+ Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(),
"false"));
+ Map<NamedReference, ColumnStatistics> colStatsMap = null;
+ if (readConf.enableColumnStats() && cboEnabled) {
+ colStatsMap = Maps.newHashMap();
+ List<StatisticsFile> files = table.statisticsFiles();
+ if (!files.isEmpty()) {
+ List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();
+
+ for (BlobMetadata blobMetadata : metadataList) {
+ int id = blobMetadata.fields().get(0);
+ String colName = table.schema().findColumnName(id);
+ NamedReference ref = FieldReference.column(colName);
+
+ long ndv = 0;
+ String ndvStr = blobMetadata.properties().get("ndv");
Review Comment:
nit: "ndv" could be static final?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]