[
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572263#comment-15572263
]
ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------
Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/130#discussion_r83244375
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
---
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
+/**
+ * Upload files/parts directly via different buffering mechanisms:
+ * including memory and disk.
+ *
+ * If the stream is closed and no update has started, then the upload
+ * is instead done as a single PUT operation.
+ *
+ * Unstable: statistics and error handling might evolve.
+ */
[email protected]
[email protected]
+class S3ABlockOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ABlockOutputStream.class);
+
+ /** Owner FileSystem. */
+ private final S3AFileSystem fs;
+
+ /** Object being uploaded. */
+ private final String key;
+
+ /** Size of all blocks. */
+ private final int blockSize;
+
+ /** Callback for progress. */
+ private final ProgressListener progressListener;
+ private final ListeningExecutorService executorService;
+
+ /**
+ * Retry policy for multipart commits; not all AWS SDK versions retry
that.
+ */
+ private final RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
+ 5,
+ 2000,
+ TimeUnit.MILLISECONDS);
+ /**
+ * Factory for blocks.
+ */
+ private final S3ADataBlocks.BlockFactory blockFactory;
+
+ /** Preallocated byte buffer for writing single characters. */
+ private final byte[] singleCharWrite = new byte[1];
+
+ /** Multipart upload details; null means none started. */
+ private MultiPartUpload multiPartUpload;
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /** Current data block. Null means none currently active */
+ private S3ADataBlocks.DataBlock activeBlock;
+
+ /** Count of blocks uploaded. */
+ private long blockCount = 0;
+
+ /** Statistics to build up. */
+ private final S3AInstrumentation.OutputStreamStatistics statistics;
+
+ /**
+ * Write operation helper; encapsulation of the filesystem operations.
+ */
+ private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+
+ /**
+ * An S3A output stream which uploads partitions in a separate pool of
+ * threads; different {@link S3ADataBlocks.BlockFactory}
+ * instances can control where data is buffered.
+ *
+ * @param fs S3AFilesystem
+ * @param key S3 object to work on.
+ * @param executorService the executor service to use to schedule work
+ * @param progress report progress in order to prevent timeouts. If
+ * this class implements {@code ProgressListener} then it will be
+ * directly wired up to the AWS client, so receive detailed progress
+ * information.
+ * @param blockSize size of a single block.
+ * @param blockFactory factory for creating stream destinations
+ * @param statistics stats for this stream
+ * @param writeOperationHelper state of the write operation.
+ * @throws IOException on any problem
+ */
+ S3ABlockOutputStream(S3AFileSystem fs,
+ String key,
+ ExecutorService executorService,
+ Progressable progress,
+ long blockSize,
+ S3ADataBlocks.BlockFactory blockFactory,
+ S3AInstrumentation.OutputStreamStatistics statistics,
+ S3AFileSystem.WriteOperationHelper writeOperationHelper)
+ throws IOException {
+ this.fs = fs;
+ this.key = key;
+ this.blockFactory = blockFactory;
+ this.blockSize = (int) blockSize;
+ this.statistics = statistics;
+ this.writeOperationHelper = writeOperationHelper;
+ Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+ "Block size is too small: %d", blockSize);
+ this.executorService =
MoreExecutors.listeningDecorator(executorService);
+ this.multiPartUpload = null;
+ this.progressListener = (progress instanceof ProgressListener) ?
+ (ProgressListener) progress
+ : new ProgressableListener(progress);
+ // create that first block. This guarantees that an open + close
sequence
+ // writes a 0-byte entry.
+ maybeCreateBlock();
+ LOG.debug("Initialized S3ABlockOutputStream for {}" +
+ " output to {}", writeOperationHelper, activeBlock);
+ }
+
+ /**
+ * Demand create a destination block.
+ * @return the active block; null if there isn't one.
+ * @throws IOException on any failure to create
+ */
+ private synchronized S3ADataBlocks.DataBlock maybeCreateBlock()
--- End diff --
renamed
> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
> Key: HADOOP-13560
> URL: https://issues.apache.org/jira/browse/HADOOP-13560
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 2.9.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Attachments: HADOOP-13560-branch-2-001.patch,
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch,
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very
> large commit operations for committers using rename
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]