szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147018350
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
+ static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
// cache a position map needed by each partition spec to normalize
partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
+ // logic to handle the partition evolution
+ int[] normalizedPositions =
+ normalizedPositionsBySpec.computeIfAbsent(
+ table.spec().specId(),
+ specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+
+ // parallelize the manifest read and
Review Comment:
Comment is incomplete, let's just remove it as I don't see that much value.
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
Review Comment:
Could we consolidate these two VisibleForTesting methods? Would just
exposing dataFiles() work for those tests? I don't really exposing Partition
inner class.
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
+ static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
// cache a position map needed by each partition spec to normalize
partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
+ // logic to handle the partition evolution
+ int[] normalizedPositions =
+ normalizedPositionsBySpec.computeIfAbsent(
+ table.spec().specId(),
+ specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+
+ // parallelize the manifest read and
+ CloseableIterable<DataFile> datafiles = planDataFiles(scan);
- for (FileScanTask task : tasks) {
- PartitionData original = (PartitionData) task.file().partition();
- int[] normalizedPositions =
- normalizedPositionsBySpec.computeIfAbsent(
- task.spec().specId(),
- specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+ for (DataFile dataFile : datafiles) {
+ PartitionData original = (PartitionData) dataFile.partition();
PartitionData normalized =
normalizePartition(original, normalizedPartitionType,
normalizedPositions);
- partitions.get(normalized).update(task.file());
+ partitions.get(normalized).update(dataFile);
}
+
return partitions.all();
}
+ @VisibleForTesting
+ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+ Table table = scan.table();
+ Snapshot snapshot = scan.snapshot();
+
+ // read list of data and delete manifests from current snapshot obtained
via scan
+ CloseableIterable<ManifestFile> dataManifests =
+ CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
+ Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = table.specs().get(specId);
+ PartitionSpec transformedSpec =
transformSpec(scan.tableSchema(), spec);
+ return ManifestEvaluator.forRowFilter(
+ scan.filter(), transformedSpec, scan.isCaseSensitive());
+ });
+
+ CloseableIterable<ManifestFile> filteredManifests =
+ CloseableIterable.filter(
+ dataManifests, manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ Iterable<CloseableIterable<DataFile>> tasks =
+ CloseableIterable.transform(
+ filteredManifests,
+ manifest ->
+ ManifestFiles.read(manifest, table.io(), table.specs())
+ .caseSensitive(scan.isCaseSensitive())
+ // hardcoded to avoid scan stats column on partition table
+ .select(BaseScan.SCAN_COLUMNS));
+
+ return (scan.planExecutor() != null)
Review Comment:
Nit: do we need extra () here?
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
+ static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
// cache a position map needed by each partition spec to normalize
partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
+ // logic to handle the partition evolution
Review Comment:
Nit: can we add one line before this?
And also, if we keep comment, we can just do something shorter like :
```handle partition evolution```
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
+ static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
// cache a position map needed by each partition spec to normalize
partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
+ // logic to handle the partition evolution
+ int[] normalizedPositions =
+ normalizedPositionsBySpec.computeIfAbsent(
+ table.spec().specId(),
+ specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+
+ // parallelize the manifest read and
+ CloseableIterable<DataFile> datafiles = planDataFiles(scan);
- for (FileScanTask task : tasks) {
- PartitionData original = (PartitionData) task.file().partition();
- int[] normalizedPositions =
- normalizedPositionsBySpec.computeIfAbsent(
- task.spec().specId(),
- specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+ for (DataFile dataFile : datafiles) {
+ PartitionData original = (PartitionData) dataFile.partition();
PartitionData normalized =
normalizePartition(original, normalizedPartitionType,
normalizedPositions);
- partitions.get(normalized).update(task.file());
+ partitions.get(normalized).update(dataFile);
}
+
return partitions.all();
}
+ @VisibleForTesting
+ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+ Table table = scan.table();
+ Snapshot snapshot = scan.snapshot();
+
+ // read list of data and delete manifests from current snapshot obtained
via scan
Review Comment:
Comment seems wrong , it's only data files. I would suggest to remove it,
as its pretty obvious what its's doing?
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
}
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ @VisibleForTesting
+ static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();
// cache a position map needed by each partition spec to normalize
partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
+ // logic to handle the partition evolution
+ int[] normalizedPositions =
+ normalizedPositionsBySpec.computeIfAbsent(
+ table.spec().specId(),
+ specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+
+ // parallelize the manifest read and
+ CloseableIterable<DataFile> datafiles = planDataFiles(scan);
- for (FileScanTask task : tasks) {
- PartitionData original = (PartitionData) task.file().partition();
- int[] normalizedPositions =
- normalizedPositionsBySpec.computeIfAbsent(
- task.spec().specId(),
- specId -> normalizedPositions(table, specId,
normalizedPartitionType));
+ for (DataFile dataFile : datafiles) {
+ PartitionData original = (PartitionData) dataFile.partition();
PartitionData normalized =
normalizePartition(original, normalizedPartitionType,
normalizedPositions);
- partitions.get(normalized).update(task.file());
+ partitions.get(normalized).update(dataFile);
}
+
return partitions.all();
}
+ @VisibleForTesting
+ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+ Table table = scan.table();
+ Snapshot snapshot = scan.snapshot();
+
+ // read list of data and delete manifests from current snapshot obtained
via scan
+ CloseableIterable<ManifestFile> dataManifests =
+ CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
+ Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = table.specs().get(specId);
+ PartitionSpec transformedSpec =
transformSpec(scan.tableSchema(), spec);
+ return ManifestEvaluator.forRowFilter(
+ scan.filter(), transformedSpec, scan.isCaseSensitive());
+ });
+
+ CloseableIterable<ManifestFile> filteredManifests =
+ CloseableIterable.filter(
+ dataManifests, manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ Iterable<CloseableIterable<DataFile>> tasks =
+ CloseableIterable.transform(
+ filteredManifests,
+ manifest ->
+ ManifestFiles.read(manifest, table.io(), table.specs())
+ .caseSensitive(scan.isCaseSensitive())
+ // hardcoded to avoid scan stats column on partition table
Review Comment:
Nit: I'd also vote to remove this comment, not sure if this line in
particular needs explanation over other ones.
If needed maybe we can put it after the select like
```.select(SCAN_COLUMNS); // don't select stats columns```
##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan,
boolean ignoreResiduals
}
}
- protected void validateIncludesPartitionScan(
- CloseableIterable<FileScanTask> tasks, int partValue) {
- validateIncludesPartitionScan(tasks, 0, partValue);
- }
-
- protected void validateIncludesPartitionScan(
- CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+ protected void validatePartition(
+ Iterable<PartitionsTable.Partition> parts, int position, int
partitionValue) {
Review Comment:
Id actually prefer this to be CloseableIterable<DataFile>, as Partition
seems a bit internal class and shouldn't be used. I think it would be the
same, unless I'm mistaken?
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -238,5 +241,9 @@ void update(DataFile file) {
this.dataFileCount += 1;
this.specId = file.specId();
}
+
+ StructLike key() {
Review Comment:
Can remove, if we decide to make tests use 'planDataFiles'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]