steveloughran commented on code in PR #4294:
URL: https://github.com/apache/hadoop/pull/4294#discussion_r875834883
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
+
+ private final InputStream inputStream;
+ private final S3Object obj;
+ private long drained;
+
+ DrainTask(InputStream inputStream, S3Object obj) {
+ this.inputStream = inputStream;
+ this.obj = obj;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while(this.inputStream.read() >= 0) {
+ drained++;
+ }
+
+ LOG.debug("Drained stream of {} bytes", drained);
+
+ Io.closeIgnoringIoException(this.inputStream);
+ Io.closeIgnoringIoException(this.obj);
+ } catch (Exception e) {
Review Comment:
if this happens then the readl: raised an exception. the stream MUST be
aborted to stop it being returned to the http connection pool, as its
connection is probably broken
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
Review Comment:
1. declare final to keep style checker happy
2. i'd prefer to not use Runnable, instead completable futures., look at
drainOrAbortHttpStream() and its use. at which point you can just pass in a
function
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -98,6 +106,7 @@ public S3File(
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+ this.futurePool = context.getFuturePool();
Review Comment:
if this is only for the drain, given the context is already stored, you can
just get the pool when needed
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
+
+ private final InputStream inputStream;
+ private final S3Object obj;
+ private long drained;
+
+ DrainTask(InputStream inputStream, S3Object obj) {
+ this.inputStream = inputStream;
+ this.obj = obj;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while(this.inputStream.read() >= 0) {
Review Comment:
look at the changes in hadoop trunk s3a input stream here...it reads into a
buffer for draining, and is marginally faster
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]