[
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574765#comment-15574765
]
ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------
Github user thodemoor commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/130#discussion_r83381587
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
---
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as partitions.
+ */
+final class S3ADataBlocks {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ADataBlocks.class);
+
+ private S3ADataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ * @param b byte array containing data
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ * @param owner factory owner
+ * @param name factory name -the option from {@link Constants}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ static BlockFactory createFactory(S3AFileSystem owner,
+ String name) {
+ switch (name) {
+ case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+ return new ArrayBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BUFFER_DISK:
+ return new DiskBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BYTEBUFFER:
+ return new ByteBufferBlockFactory(owner);
+ default:
+ throw new IllegalArgumentException("Unsupported block buffer" +
+ " \"" + name + '"');
+ }
+ }
+
+ /**
+ * Base class for block factories.
+ */
+ static abstract class BlockFactory implements Closeable {
+
+ private final S3AFileSystem owner;
+
+ protected BlockFactory(S3AFileSystem owner) {
+ this.owner = owner;
+ }
+
+
+ /**
+ * Create a block.
+ * @param limit limit of the block.
+ * @return a new block.
+ */
+ abstract DataBlock create(int limit) throws IOException;
+
+ /**
+ * Implement any close/cleanup operation.
+ * Base class is a no-op
+ * @throws IOException -ideally, it shouldn't.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Owner.
+ */
+ protected S3AFileSystem getOwner() {
+ return owner;
+ }
+ }
+
+ /**
+ * This represents a block being uploaded.
+ */
+ static abstract class DataBlock implements Closeable {
+
+ enum DestState {Writing, Upload, Closed}
+
+ private volatile DestState state = Writing;
+
+ /**
+ * Atomically enter a state, verifying current state.
+ * @param current current state. null means "no check"
+ * @param next next state
+ * @throws IllegalStateException if the current state is not as
expected
+ */
+ protected synchronized final void enterState(DestState current,
+ DestState next)
+ throws IllegalStateException {
+ verifyState(current);
+ LOG.debug("{}: entering state {}", this, next);
+ state = next;
+ }
+
+ /**
+ * Verify that the block is in the declared state.
+ * @param expected expected state.
+ * @throws IllegalStateException if the DataBlock is in the wrong state
+ */
+ protected final void verifyState(DestState expected)
+ throws IllegalStateException {
+ if (expected != null && state != expected) {
+ throw new IllegalStateException("Expected stream state " + expected
+ + " -but actual state is " + state + " in " + this);
+ }
+ }
+
+ /**
+ * Current state.
+ * @return the current state.
+ */
+ final DestState getState() {
+ return state;
+ }
+
+ /**
+ * Return the current data size.
+ * @return the size of the data
+ */
+ abstract int dataSize();
+
+ /**
+ * Predicate to verify that the block has the capacity to write
+ * the given set of bytes.
+ * @param bytes number of bytes desired to be written.
+ * @return true if there is enough space.
+ */
+ abstract boolean hasCapacity(long bytes);
+
+ /**
+ * Predicate to check if there is data in the block.
+ * @return true if there is
+ */
+ boolean hasData() {
+ return dataSize() > 0;
+ }
+
+ /**
+ * The remaining capacity in the block before it is full.
+ * @return the number of bytes remaining.
+ */
+ abstract int remainingCapacity();
+
+ /**
+ * Write a series of bytes from the buffer, from the offset.
+ * Returns the number of bytes written.
+ * Only valid in the state {@code Writing}.
+ * Base class verifies the state but does no writing.
+ * @param buffer buffer
+ * @param offset offset
+ * @param length length of write
+ * @return number of bytes written
+ * @throws IOException trouble
+ */
+ int write(byte[] buffer, int offset, int length) throws IOException {
+ verifyState(Writing);
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(offset >= 0, "offset is negative");
+ Preconditions.checkArgument(
+ !(buffer.length - offset < length),
+ "buffer shorter than amount of data to write");
+ return 0;
+ }
+
+ /**
+ * Flush the output.
+ * Only valid in the state {@code Writing}.
+ * In the base class, this is a no-op
+ * @throws IOException any IO problem.
+ */
+ void flush() throws IOException {
+ verifyState(Writing);
+ }
+
+ /**
+ * Switch to the upload state and return a stream for uploading.
+ * Base class calls {@link #enterState(DestState, DestState)} to
+ * manage the state machine.
+ * @return the stream
+ * @throws IOException trouble
+ */
+ InputStream startUpload() throws IOException {
+ LOG.debug("Start datablock upload");
+ enterState(Writing, Upload);
+ return null;
+ }
+
+ /**
+ * Enter the closed state.
+ * @return true if the class was in any other state, implying that
+ * the subclass should do its close operations
+ */
+ protected synchronized boolean enterClosedState() {
+ if (!state.equals(Closed)) {
+ enterState(null, Closed);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enterClosedState()) {
+ LOG.debug("Closed {}", this);
+ innerClose();
+ }
+ }
+
+ /**
+ * Inner close logic for subclasses to implement.
+ */
+ protected void innerClose() throws IOException {
+
+ }
+
+ }
+
+ // ====================================================================
+
+ /**
+ * Use byte arrays on the heap for storage.
+ */
+ static class ArrayBlockFactory extends BlockFactory {
+
+ ArrayBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ DataBlock create(int limit) throws IOException {
+ return new ByteArrayBlock(limit);
+ }
+
+ }
+
+ /**
+ * Stream to memory via a {@code ByteArrayOutputStream}.
+ *
+ * This was taken from {@code S3AFastOutputStream} and has the
+ * same problem which surfaced there: it consumes heap space
+ * proportional to the mismatch between writes to the stream and
+ * the JVM-wide upload bandwidth to the S3 endpoint.
--- End diff --
but bounded by ...
> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
> Key: HADOOP-13560
> URL: https://issues.apache.org/jira/browse/HADOOP-13560
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 2.9.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Attachments: HADOOP-13560-branch-2-001.patch,
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch,
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very
> large commit operations for committers using rename
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]