rahil-c opened a new pull request, #7914:
URL: https://github.com/apache/iceberg/pull/7914
## Issue Summary
* When testing the following Iceberg LakeFormation integration, I
encountered the following failure when running `REMOVE ORPHAN FILES PROCEDURE`
```
java.io.UncheckedIOException: java.io.IOException:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;
at
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:367)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:292)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:277)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:243)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:127)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:229)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
at
org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:171)
~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
```
* Currently in `DeleteOrphanFilesSparkAction#listDirRecursively`, we do not
use the concept of `FileIO` and instead we directly create the `fileSystem` and
make `listStatus` calls.
```
FileSystem fs = path.getFileSystem(conf);
List<String> subDirs = Lists.newArrayList();
for (FileStatus file : fs.listStatus(path, pathFilter)) {
```
The problem with this approach is that when using `LakeFormation`, we need
to use the `s3FileIo` otherwise we will hit a 403 s3 access denied error, as
the file system does not have correct credentials to access this data. S3FileIO
however has the correct credentials when the s3 client is built in
LakeFormationCredentialFactory
```
@Override
public S3Client s3() {
S3ClientBuilder s3ClientBuilder = S3Client.builder();
if (isTableRegisteredWithLakeFormation()) {
s3ClientBuilder.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(),
buildTableArn()));
}
```
## Aim OF PR
* Use s3 File IO within this `REMOVE ORPHAN FILES` procedure code path.
## Testing
* With this change, I do not hit 403 and can run this procedure without
issue.
```
spark-sql> CALL my_catalog.system.remove_orphan_files(table =>
'iceberg_db.iceberg_table8', dry_run => true);
23/06/21 22:28:56 INFO BaseMetastoreTableOperations: Refreshing table
metadata from new version:
s3://rchertar-dev/iceberg-nrt/iceberg_table8/metadata/00003-4a0f642b-88a9-41e1-b1ab-5c0bada18b12.metadata.json
23/06/21 22:28:56 INFO CatalogUtil: Loading custom FileIO implementation:
org.apache.iceberg.aws.s3.S3FileIO
23/06/21 22:28:57 INFO BaseMetastoreCatalog: Table loaded by catalog:
my_catalog.iceberg_db.iceberg_table8
23/06/21 22:28:57 INFO BaseAllMetadataTableScan: Scanning metadata table
my_catalog.iceberg_db.iceberg_table8 with filter true.
23/06/21 22:28:58 INFO BaseAllMetadataTableScan: Scanning metadata table
my_catalog.iceberg_db.iceberg_table8 with filter true.
Time taken: 21.439 seconds
spark-sql> select * from my_catalog.iceberg_db.iceberg_table8;
23/06/21 22:29:24 INFO BaseTableScan: Scanning table
my_catalog.iceberg_db.iceberg_table8 snapshot 7557581385081927782 created at
2023-06-21T22:26:54.853+00:00 with filter true
23/06/21 22:29:24 INFO LoggingMetricsReporter: Received metrics report:
ScanReport{tableName=my_catalog.iceberg_db.iceberg_table8,
snapshotId=7557581385081927782, filter=true, schemaId=0, projectedFieldIds=[1,
2, 3], projectedFieldNames=[id, creation_date, last_update_time],
scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS,
totalDuration=PT0.13239524S, count=1},
resultDataFiles=CounterResult{unit=COUNT, value=3},
resultDeleteFiles=CounterResult{unit=COUNT, value=0},
totalDataManifests=CounterResult{unit=COUNT, value=3},
totalDeleteManifests=CounterResult{unit=COUNT, value=0},
scannedDataManifests=CounterResult{unit=COUNT, value=3},
skippedDataManifests=CounterResult{unit=COUNT, value=0},
totalFileSizeInBytes=CounterResult{unit=BYTES, value=2763},
totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0},
skippedDataFiles=CounterResult{unit=COUNT, value=0},
skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
scannedDeleteManifests=CounterR
esult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT,
value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0},
equalityDeleteFiles=CounterResult{unit=COUNT, value=0},
positionalDeleteFiles=CounterResult{unit=COUNT, value=0}},
metadata={engine-version=3.3.1-amzn-0, app-id=application_1682982958961_0098,
engine-name=spark}}
8 jan8 8
9 jan9 9
10 jan10 10
Time taken: 4.427 seconds, Fetched 3 row(s)
spark-sql>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]