steveloughran commented on code in PR #4912: URL: https://github.com/apache/iceberg/pull/4912#discussion_r1717062832
########## aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java: ########## @@ -172,31 +274,62 @@ private void positionStream() throws IOException { } // close the stream and open at desired position - LOG.debug("Seek with new stream for {} to offset {}", location, next); + LOG.warn("Seek with new stream for {} to offset {}", location, next); pos = next; openStream(); } - private void openStream() throws IOException { - GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() - .bucket(location.bucket()) - .key(location.key()) - .range(String.format("bytes=%s-", pos)); - - S3RequestUtil.configureEncryption(awsProperties, requestBuilder); - + private void openStream() { closeStream(); - stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = readRange(String.format("bytes=%s-", pos)); + } + + private void closeStream() { + closeServerSideStream(stream); + stream = null; } - private void closeStream() throws IOException { - if (stream != null) { - stream.close(); + private static void closeServerSideStream(InputStream streamToClose) { + if (streamToClose != null) { + try { + if (streamToClose instanceof Abortable) { + // Stated in the ResponseInputStream javadoc: + // If it is not desired to read remaining data from the stream, + // you can explicitly abort the connection via abort(). + ((Abortable) streamToClose).abort(); + } else { + streamToClose.close(); + } + } catch (IOException | AbortedException e) { + // ignore failure to abort or close stream + } } } - public void setSkipSize(int skipSize) { - this.skipSize = skipSize; + private static boolean shouldRetry(Exception exception) { Review Comment: reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom ########## aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java: ########## @@ -178,12 +192,16 @@ private byte[] randomData(int size) { } private void writeS3Data(S3URI uri, byte[] data) throws IOException { - s3.putObject( + s3Client.putObject( PutObjectRequest.builder() .bucket(uri.bucket()) .key(uri.key()) .contentLength((long) data.length) .build(), RequestBody.fromBytes(data)); Review Comment: FWIW, this triggers a full array copy of the data. ########## aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java: ########## @@ -172,31 +274,62 @@ private void positionStream() throws IOException { } // close the stream and open at desired position - LOG.debug("Seek with new stream for {} to offset {}", location, next); + LOG.warn("Seek with new stream for {} to offset {}", location, next); pos = next; openStream(); } - private void openStream() throws IOException { - GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() - .bucket(location.bucket()) - .key(location.key()) - .range(String.format("bytes=%s-", pos)); - - S3RequestUtil.configureEncryption(awsProperties, requestBuilder); - + private void openStream() { closeStream(); - stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = readRange(String.format("bytes=%s-", pos)); + } + + private void closeStream() { + closeServerSideStream(stream); + stream = null; } - private void closeStream() throws IOException { - if (stream != null) { - stream.close(); + private static void closeServerSideStream(InputStream streamToClose) { + if (streamToClose != null) { + try { + if (streamToClose instanceof Abortable) { + // Stated in the ResponseInputStream javadoc: + // If it is not desired to read remaining data from the stream, + // you can explicitly abort the connection via abort(). + ((Abortable) streamToClose).abort(); Review Comment: abort() removes it from the HTTP pool. * this is good if you are closing with an unrecoverable network error * this is absolutely what you do not want to do on a normal read. ########## aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java: ########## @@ -88,23 +96,69 @@ public void seek(long newPos) { @Override public int read() throws IOException { - Preconditions.checkState(!closed, "Cannot read: already closed"); - positionStream(); + int[] byteRef = new int[1]; + try { + Tasks.foreach(0) + .retry(awsProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + awsProperties.s3ReadRetryMinWaitMs(), + awsProperties.s3ReadRetryMaxWaitMs(), + awsProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(ignored -> { + try { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + byteRef[0] = stream.read(); + } catch (IOException e) { + closeStream(); + throw new UncheckedIOException(e); Review Comment: on an unrecoverable network error, `abortStream()` is required to avoid the failed TLS connection being recycled. This adds a new challenge: identify an unrecoverable network error from the deep nested stacks which can come from the SDK, and distinguish from other failures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org