stevenzwu commented on code in PR #8803: URL: https://github.com/apache/iceberg/pull/8803#discussion_r1365732924
########## api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Set; +import org.apache.iceberg.ContentFile; + +public class ContentFileUtil { Review Comment: this class should be in the core module (not api) ########## core/src/main/java/org/apache/iceberg/BaseFile.java: ########## @@ -504,6 +508,27 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt } } + private static <TypeT> Map<Integer, TypeT> filterColumnsStats( Review Comment: return type can be `SerializableMap` ########## core/src/main/java/org/apache/iceberg/GenericDeleteFile.java: ########## @@ -67,23 +69,30 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every + * column stat is kept. */ - private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) { - super(toCopy, fullCopy); + private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy, Set<Integer> statsToKeep) { Review Comment: nit: statsNeeded -> columnsToKeepStats for consistency ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java: ########## @@ -96,6 +96,9 @@ private FlinkReadOptions() {} public static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS_OPTION = ConfigOptions.key(PREFIX + INCLUDE_COLUMN_STATS).booleanType().defaultValue(false); + public static final String COLUMN_STATS_TO_KEEP = "column-stats-to-keep"; Review Comment: columns-to-keep-stats ########## core/src/main/java/org/apache/iceberg/BaseFile.java: ########## @@ -504,6 +508,27 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt } } + private static <TypeT> Map<Integer, TypeT> filterColumnsStats( + Map<Integer, TypeT> map, Set<Integer> columnIds) { + if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); + } + + if (map == null) { Review Comment: nit: I know `SerializableMap` handles null properly. it doesn't really matter in practice. but I still find it more intuitive to check the map null first. ########## core/src/main/java/org/apache/iceberg/BaseFile.java: ########## @@ -504,6 +508,27 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt } } + private static <TypeT> Map<Integer, TypeT> filterColumnsStats( + Map<Integer, TypeT> map, Set<Integer> columnIds) { + if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); + } + + if (map == null) { + return null; + } + + Map<Integer, TypeT> filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); + for (Integer columnId : columnIds) { + TypeT value = map.get(columnId); + if (value != null) { Review Comment: is it necessary to check null value? ########## core/src/main/java/org/apache/iceberg/BaseFile.java: ########## @@ -174,8 +176,10 @@ public PartitionData copy() { * * @param toCopy a generic data file to copy. * @param fullCopy whether to copy all fields or to drop column-level stats + * @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every + * column stat is kept. */ - BaseFile(BaseFile<F> toCopy, boolean fullCopy) { + BaseFile(BaseFile<F> toCopy, boolean fullCopy, Set<Integer> statsToKeep) { Review Comment: nit: statsToKeep -> columnsToKeepStats for consistency ########## api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Set; +import org.apache.iceberg.ContentFile; + +public class ContentFileUtil { + private ContentFileUtil() {} + + /** + * Copies the {@link ContentFile} with the specific stat settings. + * + * @param file a generic data file to copy. + * @param withStats whether to keep any stats + * @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every + * column stat is kept. + * @return The copied file + */ + public static <F extends ContentFile<K>, K> K copy( + F file, boolean withStats, Set<Integer> statsToKeep) { Review Comment: nit: statsToKeep -> columnsToKeepStats for consistency ########## api/src/test/java/org/apache/iceberg/TestHelpers.java: ########## @@ -662,6 +663,11 @@ public DataFile copyWithoutStats() { return this; } + @Override + public DataFile copyWithStats(Set<Integer> statsToKeep) { Review Comment: nit: statsToKeep -> columnsToKeepStats for consistency ########## core/src/main/java/org/apache/iceberg/GenericDataFile.java: ########## @@ -66,23 +68,30 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every + * column stat is kept. */ - private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { - super(toCopy, fullCopy); + private GenericDataFile(GenericDataFile toCopy, boolean fullCopy, Set<Integer> statsToKeep) { Review Comment: nit: statsNeeded -> columnsToKeepStats for consistency ########## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ########## @@ -154,6 +156,11 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { return this; } + ManifestGroup columnStatsToKeep(Set<Integer> newColumnStatsToKeep) { + this.columnStatsToKeep = newColumnStatsToKeep; Review Comment: the convention is to do a defensive copy here. see the `select` method above from Line 148 ########## core/src/main/java/org/apache/iceberg/BaseFile.java: ########## @@ -504,6 +508,27 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt } } + private static <TypeT> Map<Integer, TypeT> filterColumnsStats( + Map<Integer, TypeT> map, Set<Integer> columnIds) { + if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); + } + + if (map == null) { + return null; + } + + Map<Integer, TypeT> filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); Review Comment: nit: is it tiny bit simpler to use Java stream API for filter. ########## core/src/main/java/org/apache/iceberg/BaseScan.java: ########## @@ -165,6 +169,12 @@ public ThisT includeColumnStats() { return newRefinedScan(table, schema, context.shouldReturnColumnStats(true)); } + @Override + public ThisT includeColumnStats(Set<Integer> statsNeeded) { Review Comment: nit: statsNeeded -> columnsToIncludeStats for consistency ########## core/src/main/java/org/apache/iceberg/V2Metadata.java: ########## @@ -560,6 +561,11 @@ public F copy() { throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); } + @Override + public F copyWithStats(Set<Integer> statsToKeep) { Review Comment: nit: statsToKeep -> columnsToKeepStats for consistency ########## core/src/main/java/org/apache/iceberg/TableScanContext.java: ########## @@ -125,6 +132,16 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { .build(); } + TableScanContext columnsToIncludeStats(Set<Integer> columnStatsToInclude) { + Preconditions.checkState( + returnColumnStats(), + "Cannot select column stats to include when column stats are not returned"); Review Comment: nit: wondering if this small rewording can be a little clearer ``` Cannot select columns to keep stats when column stats are not returned ``` ########## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ########## @@ -381,19 +390,22 @@ static class TaskContext { private final DeleteFileIndex deletes; private final ResidualEvaluator residuals; private final boolean dropStats; + private final Set<Integer> statsToKeep; private final ScanMetrics scanMetrics; TaskContext( PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats, + Set<Integer> statsToKeep, Review Comment: nit: statsNeeded -> columnsToKeepStats for consistency ########## docs/flink-configuration.md: ########## @@ -130,6 +130,7 @@ env.getConfig() | streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. | | monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. | | include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. | +| column-stats-to-keep | connector.iceberg.column-stats-to-keep | N/A | empty | Create a new scan from this that loads the column stats with each data file for the specified column ids. Column stats include: value count, null value count, lower bounds, and upper bounds. | Review Comment: columns-to-keep-stats ########## core/src/main/java/org/apache/iceberg/V1Metadata.java: ########## @@ -485,6 +486,11 @@ public DataFile copy() { return wrapped.copy(); } + @Override + public DataFile copyWithStats(Set<Integer> statsToKeep) { Review Comment: nit: statsToKeep -> columnsToKeepStats for consistency ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ########## @@ -190,4 +203,11 @@ public int maxAllowedPlanningFailures() { .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue()) .parse(); } + + public static Set<Integer> split(String text) { Review Comment: this can be private ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ########## @@ -152,6 +155,16 @@ public boolean includeColumnStats() { .parse(); } + public Set<Integer> columnStatsToKeep() { Review Comment: columnsToKeepStats ########## core/src/main/java/org/apache/iceberg/ManifestGroup.java: ########## @@ -417,6 +429,10 @@ boolean shouldKeepStats() { return !dropStats; } + Set<Integer> statsToKeep() { Review Comment: nit: statsNeeded -> columnsToKeepStats for consistency ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java: ########## @@ -464,6 +476,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } + public Builder columnStatsToKeep(Set<Integer> newColumnStatsToKeep) { Review Comment: nit: columnsToKeepStats we also need to expose this method in `IcebergSource` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ########## @@ -152,6 +155,16 @@ public boolean includeColumnStats() { .parse(); } + public Set<Integer> columnStatsToKeep() { + return split( Review Comment: I am also wondering if it is intuitive to ask users to configure this via field ids. Users tend to interact with tables via field names (not ids) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org