amogh-jahagirdar commented on code in PR #10433: URL: https://github.com/apache/iceberg/pull/10433#discussion_r1716246358
########## api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.time.Duration; +import java.util.List; +import java.util.function.Supplier; +import javax.net.ssl.SSLException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * RetryableInputStream wraps over an underlying InputStream and retries failures encountered when + * reading through the stream. On retries, the underlying streams will be reinitialized. + */ +public class RetryableInputStream extends InputStream { + + private final InputStream underlyingStream; + private final RetryPolicy<Object> retryPolicy; + + private RetryableInputStream(InputStream underlyingStream, RetryPolicy<Object> retryPolicy) { + this.underlyingStream = underlyingStream; + this.retryPolicy = retryPolicy; + } + + @Override + public int read() throws IOException { + return Failsafe.with(retryPolicy).get(() -> underlyingStream.read()); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return Failsafe.with(retryPolicy).get(() -> underlyingStream.read(b, off, len)); + } + + @Override + public void close() throws IOException { + underlyingStream.close(); + } + + public static RetryableInputStream.Builder builderFor(Supplier<InputStream> newStreamSupplier) { + return new Builder(newStreamSupplier); + } + + public static class Builder { + + private InputStream underlyingStream; + private final Supplier<InputStream> newStreamProvider; + private List<Class<? extends Exception>> retryableExceptions = + ImmutableList.of(SSLException.class, SocketTimeoutException.class); Review Comment: Really sorry, I raised this draft and never followed through: @singhpk234 You're correct that Iceberg itself does not have a configuration to overwrite the default SDK retry policy. I'd say that's separate to this and wouldn't club the two together imo. But my general thought is that AwsProperties has gotten a bit complex with all the knobs that are exposed, and I think in Iceberg we'd want to avoid the complexity that Hadoop s3a gets into. It's a classic "knobs enable users control" vs "simple/reducing maintenance burden" problem and personally I think Iceberg should attempt to be in the latter category. At least for the 503 retry case, users can already provide a s3client that's configured with their desired retry policy. On retrying outputstream, I'd say we can address that separately and when it becomes an issue. All the cases, I've so far have really just been on the read side. ########## aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java: ########## @@ -139,7 +140,11 @@ private InputStream readRange(String range) { S3RequestUtil.configureEncryption(s3FileIOProperties, requestBuilder); - return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = + RetryableInputStream.builderFor( + () -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) Review Comment: I revisited this PR, and took a look with fresh eyes and yes the current logic is definitely incorrect for the case where a *non* range read (readFully, readTail perform range reads) is performed. For the range-reads we don't really care about the current position for the purpose of tracking in the retryable input stream. But for normal seeking based reads we definitely do! I think the way to solve this is to pass a supplier of the current position to the retryable input stream. That supplier would have a reference to `this` and the stream provider would be a function which accepts a position. Upon retries the stream provider would open a new connection and open a stream that begins with the position that the position supplier (which is guaranteed to be the correct position to start the stream from) returns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org