ConeyLiu commented on code in PR #8123: URL: https://github.com/apache/iceberg/pull/8123#discussion_r1271264206
########## core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.TableScanUtil; + +abstract class DistributedDataBatchScan + extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan { + + private static final int NUM_LOCAL_CORES = Runtime.getRuntime().availableProcessors(); + private static final long METADATA_LIMIT_PER_LOCAL_CORE = 128 * 1024 * 1024; // 128 MB + private static final int NUM_WORKER_THREADS = 2; + + protected DistributedDataBatchScan(Table table, Schema schema, TableScanContext context) { + super(table, schema, context); + } + + protected abstract int remoteParallelism(); + + protected abstract PlanningMode dataPlanningMode(); + + protected abstract List<DataFile> planDataRemotely(List<ManifestFile> manifests); + + protected abstract PlanningMode deletePlanningMode(); + + protected abstract List<DeleteFile> planDeletesRemotely(List<ManifestFile> manifests); + + protected abstract void releaseResources(); + + @Override + protected boolean useSnapshotSchema() { + return true; + } + + @Override + protected CloseableIterable<ScanTask> doPlanFiles() { + Snapshot snapshot = snapshot(); + + List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot); + List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot); + + boolean planDataRemotely = shouldPlanRemotely(dataPlanningMode(), dataManifests); + boolean planDeletesRemotely = shouldPlanRemotely(deletePlanningMode(), deleteManifests); + + if (planDataRemotely || planDeletesRemotely) { + ExecutorService workerPool = newWorkerPool(); + + CompletableFuture<CloseableIterable<DataFile>> dataFilesFuture = + newDataFilesFuture(dataManifests, planDataRemotely, workerPool); + + CompletableFuture<DeleteFileIndex> deletesFuture = + newDeletesFuture(deleteManifests, planDeletesRemotely, workerPool); + + try { + CloseableIterable<DataFile> dataFiles = dataFilesFuture.join(); + DeleteFileIndex deletes = deletesFuture.join(); + return createTasks(dataFiles, deletes); + + } catch (CompletionException e) { + dataFilesFuture.cancel(true /* may interrupt */); + deletesFuture.cancel(true /* may interrupt */); + throw new RuntimeException("Failed to plan files", e); + + } finally { + workerPool.shutdown(); + releaseResources(); + } + + } else { + return planFilesLocally(dataManifests, deleteManifests); + } + } + + private CompletableFuture<CloseableIterable<DataFile>> newDataFilesFuture( + List<ManifestFile> manifests, boolean planRemotely, ExecutorService workerPool) { + + return CompletableFuture.supplyAsync( + () -> { + if (planRemotely) { + scanMetrics().scannedDataManifests().increment(manifests.size()); + List<DataFile> dataFiles = planDataRemotely(manifests); + int skippedDataFilesCount = totalFilesCount(manifests) - dataFiles.size(); + scanMetrics().skippedDataFiles().increment(skippedDataFilesCount); + return CloseableIterable.withNoopClose(dataFiles); + } else { + ManifestGroup manifestGroup = newManifestGroup(manifests, ImmutableList.of()); + return CloseableIterable.transform(manifestGroup.entries(), ManifestEntry::file); + } + }, + workerPool); + } + + private CompletableFuture<DeleteFileIndex> newDeletesFuture( + List<ManifestFile> manifests, boolean planRemotely, ExecutorService workerPool) { + + return CompletableFuture.supplyAsync( + () -> { + DeleteFileIndex.Builder builder; + + if (planRemotely) { + scanMetrics().scannedDeleteManifests().increment(manifests.size()); + List<DeleteFile> deleteFiles = planDeletesRemotely(manifests); + int skippedDeleteFilesCount = totalFilesCount(manifests) - deleteFiles.size(); + scanMetrics().skippedDeleteFiles().increment(skippedDeleteFilesCount); + builder = DeleteFileIndex.builderForFiles(io(), deleteFiles); + } else { + builder = DeleteFileIndex.builderFor(io(), manifests); + } + + return builder + .specsById(table().specs()) + .filterData(filter()) + .caseSensitive(isCaseSensitive()) + .planWith(planExecutor()) + .scanMetrics(scanMetrics()) + .build(); + }, + workerPool); + } + + private CloseableIterable<ScanTask> planFilesLocally( + List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) { + ManifestGroup manifestGroup = newManifestGroup(dataManifests, deleteManifests); + return asScanTasks(manifestGroup.planFiles()); + } + + @Override + public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() { + return TableScanUtil.planTaskGroups( + planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost()); + } + + private CloseableIterable<ScanTask> createTasks( + CloseableIterable<DataFile> dataFiles, DeleteFileIndex deletes) { + + String schemaString = SchemaParser.toJson(tableSchema()); + LoadingCache<Integer, String> specStringCache = specCache(PartitionSpecParser::toJson); + LoadingCache<Integer, ResidualEvaluator> residualCache = specCache(this::newResidualEvaluator); + + return CloseableIterable.transform( + dataFiles, + dataFile -> { + DeleteFile[] deleteFiles = deletes.forDataFile(dataFile); Review Comment: > 21 million delete files, 350 TB of deletes to apply The same delete files can be applied to multiple data files. So the deleted file counting should be magnified. > would this use case benefit from a regular job that would convert equality deletes into position deletes? This should be two jobs, first rewrite the equality deletes into position deletes and then apply the position deletes. That means the base file should be read twice. It could not provide many improvements compared with applying equality deletes directly. > Is there a time interval where that would be possible on a regular basis to avoid rewriting the entire set of data files? Hhhh, this is a fish that got away. > Have you profiled it? Is most of the time spent on looking up the index? This is a past job. We found there are no serial memory and IO problems from the logs. We may rerun the job and do some details profiling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
