[
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550301#comment-15550301
]
ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------
Github user cnauroth commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/130#discussion_r82086784
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
---
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as partitions.
+ */
+final class S3ADataBlocks {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(S3ADataBlocks.class);
+
+ private S3ADataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ * @param b byte array containing data
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ * @param owner factory owner
+ * @param name factory name -the option from {@link Constants}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ static BlockFactory createFactory(S3AFileSystem owner,
+ String name) {
+ switch (name) {
+ case Constants.BLOCK_OUTPUT_BUFFER_ARRAY:
+ return new ArrayBlockFactory(owner);
+ case Constants.BLOCK_OUTPUT_BUFFER_DISK:
+ return new DiskBlockFactory(owner);
+ case Constants.BLOCK_OUTPUT_BYTEBUFFER:
+ return new ByteBufferBlockFactory(owner);
+ default:
+ throw new IllegalArgumentException("Unsupported block buffer" +
+ " \"" + name + '"');
+ }
+ }
+
+ /**
+ * Base class for block factories.
+ */
+ static abstract class BlockFactory implements Closeable {
+
+ /**
+ * Owner.
+ */
+ protected final S3AFileSystem owner;
+
+ protected BlockFactory(S3AFileSystem owner) {
+ this.owner = owner;
+ }
+
+ /**
+ * Create a block.
+ * @param limit limit of the block.
+ * @return a new block.
+ */
+ abstract DataBlock create(int limit) throws IOException;
+
+ /**
+ * Implement any close/cleanup operation.
+ * Base class is a no-op
+ * @throws IOException -ideally, it shouldn't.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ /**
+ * This represents a block being uploaded.
+ */
+ static abstract class DataBlock implements Closeable {
+
+ private volatile DestState state = Writing;
+
+ /**
+ * Enter
+ * @param current
+ * @param next
+ * @throws IllegalStateException
+ */
+ protected synchronized final void enterState(DestState current,
+ DestState next)
+ throws IllegalStateException {
+ verifyState(current);
+ LOG.debug("{}: entering state {}", this, next);
+ state = next;
+ }
+
+ /**
+ * Verify that the block is in the declared state.
+ * @param expected expected state.
+ * @throws IllegalStateException if the DataBlock is in the wrong state
+ */
+ protected final void verifyState(DestState expected) throws
IllegalStateException {
+ if (expected != null && state != expected) {
+ throw new IllegalStateException("Expected stream state " + expected
+ + " -but actual state is " + state + " in " + this);
+ }
+ }
+
+ /**
+ * Current state.
+ * @return the current state.
+ */
+ final DestState getState() {
+ return state;
+ }
+
+ /**
+ * Return the current data size.
+ * @return the size of the data
+ */
+ abstract int dataSize();
+
+ /**
+ * Predicate to verify that the block has the capacity to write
+ * the given set of bytes.
+ * @param bytes number of bytes desired to be written.
+ * @return true if there is enough space.
+ */
+ abstract boolean hasCapacity(long bytes);
+
+ /**
+ * Predicate to check if there is data in the block.
+ * @return true if there is
+ */
+ boolean hasData() {
+ return dataSize() > 0;
+ }
+
+ /**
+ * The remaining capacity in the block before it is full.
+ * @return the number of bytes remaining.
+ */
+ abstract int remainingCapacity();
+
+ /**
+ * Write a series of bytes from the buffer, from the offset.
+ * Returns the number of bytes written.
+ * Only valid in the state {@code Writing}.
+ * Base class verifies the state but does no writing.
+ * @param buffer buffer
+ * @param offset offset
+ * @param length length of write
+ * @return number of bytes written
+ * @throws IOException trouble
+ */
+ int write(byte[] buffer, int offset, int length) throws IOException {
+ verifyState(Writing);
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(offset >= 0, "offset is negative");
+ Preconditions.checkArgument(
+ !(buffer.length - offset < length),
+ "buffer shorter than amount of data to write");
+ return 0;
+ }
+
+ /**
+ * Flush the output.
+ * Only valid in the state {@code Writing}.
+ * In the base class, this is a no-op
+ * @throws IOException any IO problem.
+ */
+ void flush() throws IOException {
+ verifyState(Writing);
+ }
+
+ /**
+ * Switch to the upload state and return a stream for uploading.
+ * Base class calls {@link #enterState(DestState, DestState)} to
+ * manage the state machine.
+ * @return the stream
+ * @throws IOException trouble
+ */
+ InputStream startUpload() throws IOException {
+ LOG.debug("Start datablock upload");
+ enterState(Writing, Upload);
+ return null;
+ }
+
+ /**
+ * Enter the closed state.
+ * @return true if the class was in any other state, implying that
+ * the subclass should do its close operations
+ */
+ protected synchronized boolean enterClosedState() {
+ if (!state.equals(Closed)) {
+ try {
+ enterState(null, Closed);
+ } catch (IllegalStateException ignored) {
+
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enterClosedState()) {
+ LOG.debug("Closed {}", this);
+ innerClose();
+ }
+ }
+
+ /**
+ * Inner close logic for subclasses to implement.
+ */
+ protected void innerClose() throws IOException {
+
+ }
+
+ enum DestState {Writing, Upload, Closed}
+ }
+
+ // ====================================================================
+
+ /**
+ * Use byte arrays on the heap for storage.
+ */
+ static class ArrayBlockFactory extends BlockFactory {
+
+ ArrayBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ DataBlock create(int limit) throws IOException {
+ return new ByteArrayBlock(limit);
+ }
+
+ }
+
+ /**
+ * Stream to memory via a {@code ByteArrayOutputStream}.
+ *
+ * This was taken from {@code S3AFastOutputStream} and has the
+ * same problem which surfaced there: it consumes heap space
+ * proportional to the mismatch between writes to the stream and
+ * the JVM-wide upload bandwidth to the S3 endpoint.
+ */
+
+ static class ByteArrayBlock extends DataBlock {
+ private ByteArrayOutputStream buffer;
+ private final int limit;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ ByteArrayBlock(int limit) {
+ this.limit = limit;
+ buffer = new ByteArrayOutputStream();
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : buffer.size();
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = buffer.size();
+ ByteArrayInputStream bufferData = new ByteArrayInputStream(
+ buffer.toByteArray());
+ buffer.reset();
+ buffer = null;
+ return bufferData;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - dataSize();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.write(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteArrayBlock{" +
+ "state=" + getState() +
+ ", limit=" + limit +
+ ", dataSize=" + dataSize +
+ '}';
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Stream via Direct ByteBuffers; these are allocated off heap
+ * via {@link DirectBufferPool}.
+ * This is actually the most complex of all the block factories,
+ * due to the need to explicitly recycle buffers; in comparison, the
+ * {@link DiskBlock} buffer delegates the work of deleting files to
+ * the {@link DiskBlock.FileDeletingInputStream}. Here the
+ * input stream {@link ByteBufferInputStream} has a similar task, along
+ * with the foundational work of streaming data from a byte array.
+ */
+
+ static class ByteBufferBlockFactory extends BlockFactory {
+
+ private final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+ ByteBufferBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ ByteBufferBlock create(int limit) throws IOException {
+ return new ByteBufferBlock(limit);
+ }
+
+ private ByteBuffer requestBuffer(int limit) {
+ LOG.debug("Requesting buffer of size {}", limit);
+ buffersOutstanding.incrementAndGet();
+ return bufferPool.getBuffer(limit);
+ }
+
+ private void releaseBuffer(ByteBuffer buffer) {
+ LOG.debug("Releasing buffer");
+ bufferPool.returnBuffer(buffer);
+ buffersOutstanding.decrementAndGet();
+ }
+
+ /**
+ * Get count of outstanding buffers.
+ * @return the current buffer count
+ */
+ public int getOutstandingBufferCount() {
+ return buffersOutstanding.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlockFactory{"
+ + "buffersOutstanding=" + buffersOutstanding +
+ '}';
+ }
+
+ /**
+ * A DataBlock which requests a buffer from pool on creation; returns
+ * it when the output stream is closed.
+ */
+ class ByteBufferBlock extends DataBlock {
+ private ByteBuffer buffer;
+ private final int bufferSize;
+ // cache data size so that it is consistent after the buffer is
reset.
+ private Integer dataSize;
+
+ /**
+ * Instantiate. This will request a ByteBuffer of the desired size.
+ * @param bufferSize buffer size
+ */
+ ByteBufferBlock(int bufferSize) {
+ this.bufferSize = bufferSize;
+ buffer = requestBuffer(bufferSize);
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : bufferCapacityUsed();
+ }
+
+ @Override
+ ByteBufferInputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = bufferCapacityUsed();
+ // set the buffer up from reading from the beginning
+ buffer.limit(buffer.position());
+ buffer.position(0);
+ return new ByteBufferInputStream(dataSize, buffer);
+ }
+
+ @Override
+ public boolean hasCapacity(long bytes) {
+ return bytes <= remainingCapacity();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return buffer != null ? buffer.remaining() : 0;
+ }
+
+ private int bufferCapacityUsed() {
+ return buffer.capacity() - buffer.remaining();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.put(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlock{"
+ + "state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + bufferSize +
+ ", remainingCapacity=" + remainingCapacity() +
+ '}';
+ }
+
+ }
+
+ /**
+ * Provide an input stream from a byte buffer; supporting
+ * {@link #mark(int)}, which is required to enable replay of failed
+ * PUT attempts.
+ * This input stream returns the buffer to the pool afterwards.
+ */
+ class ByteBufferInputStream extends InputStream {
+
+ private final int size;
+ private ByteBuffer byteBuffer;
+
+ ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
+
+ /**
+ * Return the buffer to the pool after the stream is closed.
+ */
+ @Override
+ public synchronized void close() {
+ if (byteBuffer != null) {
+ LOG.debug("releasing buffer");
+ releaseBuffer(byteBuffer);
+ byteBuffer = null;
+ }
+ }
+
+ /**
+ * Verify that the stream is open.
+ * @throws IOException if the stream is closed
+ */
+ private void verifyOpen() throws IOException {
+ if (byteBuffer == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+ public synchronized int read() throws IOException {
+ if (available() > 0) {
+ return byteBuffer.get() & 0xFF;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public synchronized long skip(long offset) throws IOException {
+ verifyOpen();
+ long newPos = position() + offset;
+ if (newPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (newPos > size) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ byteBuffer.position((int) newPos);
+ return newPos;
+ }
+
+ @Override
+ public synchronized int available() {
+ Preconditions.checkState(byteBuffer != null,
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ return byteBuffer.remaining();
+ }
+
+ /**
+ * Get the current buffer position.
+ * @return the buffer position
+ */
+ public synchronized int position() {
+ return byteBuffer.position();
+ }
+
+ /**
+ * Check if there is data left.
+ * @return true if there is data remaining in the buffer.
+ */
+ public synchronized boolean hasRemaining() {
+ return byteBuffer.hasRemaining();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ LOG.debug("mark at {}", position());
+ byteBuffer.mark();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ LOG.debug("reset");
+ byteBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Read in data.
+ * @param buffer destination buffer
+ * @param offset offset within the buffer
+ * @param length length of bytes to read
+ * @throws EOFException if the position is negative
+ * @throws IndexOutOfBoundsException if there isn't space for the
+ * amount of data requested.
+ * @throws IllegalArgumentException other arguments are invalid.
+ */
+ @SuppressWarnings("NullableProblems")
+ public synchronized int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ if (buffer.length - offset < length) {
+ throw new IndexOutOfBoundsException(
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ + ": request length =" + length
+ + ", with offset =" + offset
+ + "; buffer capacity =" + (buffer.length - offset));
+ }
+ verifyOpen();
+ if (!hasRemaining()) {
+ return -1;
+ }
+
+ int toRead = Math.min(length, available());
+ byteBuffer.get(buffer, offset, toRead);
+ return toRead;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ByteBufferInputStream{");
+ sb.append("size=").append(size);
+ ByteBuffer buffer = this.byteBuffer;
+ if (buffer != null) {
+ sb.append(", available=").append(buffer.remaining());
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Buffer blocks to disk.
+ */
+ static class DiskBlockFactory extends BlockFactory {
+
+ DiskBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ /**
+ * Create a temp file and a block which writes to it.
+ * @param limit limit of the block.
+ * @return the new block
+ * @throws IOException IO problems
+ */
+ @Override
+ DataBlock create(int limit) throws IOException {
+ File destFile = owner
+ .createTmpFileForWrite("s3ablock", limit, owner.getConf());
+ return new DiskBlock(destFile, limit);
+ }
+ }
+
+ /**
+ * Stream to a file.
+ * This will stop at the limit; the caller is expected to create a new
block
+ */
+ static class DiskBlock extends DataBlock {
+
+ protected int bytesWritten;
+ private final File bufferFile;
+ private final int limit;
+ private BufferedOutputStream out;
+ private InputStream uploadStream;
+
+ DiskBlock(File bufferFile, int limit)
+ throws FileNotFoundException {
+ this.limit = limit;
+ this.bufferFile = bufferFile;
+ out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+ }
+
+ @Override
+ int dataSize() {
+ return bytesWritten;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - bytesWritten;
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ out.write(b, offset, written);
+ bytesWritten += written;
+ return written;
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ out = null;
+ }
+ uploadStream = new FileInputStream(bufferFile);
+ return new FileDeletingInputStream(uploadStream);
+ }
+
+ /**
+ * The close operation will delete the destination file if it still
+ * exists.
+ * @throws IOException IO problems
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ final DestState state = getState();
+ LOG.debug("Closing {}", this);
+ switch (state) {
+ case Writing:
+ if (bufferFile.exists()) {
+ // file was not uploaded
+ LOG.debug("Deleting buffer file as upload did not start");
+ boolean deleted = bufferFile.delete();
+ if (!deleted && bufferFile.exists()) {
+ LOG.warn("Failed to delete buffer file {}", bufferFile);
+ }
+ }
+ break;
+
+ case Upload:
+ LOG.debug("Buffer file {} exists —close upload stream",
bufferFile);
+ break;
+
+ case Closed:
+ // no-op
+ break;
+
+ default:
+ // this state can never be reached, but checkstyle complains, so
+ // it is here.
+ }
+ }
+
+ /**
+ * Flush operation will flush to disk.
+ * @throws IOException IOE raised on FileOutputStream
+ */
+ @Override
+ void flush() throws IOException {
--- End diff --
Call `super.flush()` to trigger the validation check for `Writing` state.
> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
> Key: HADOOP-13560
> URL: https://issues.apache.org/jira/browse/HADOOP-13560
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 2.9.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Minor
> Attachments: HADOOP-13560-branch-2-001.patch,
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch,
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very
> large commit operations for committers using rename
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]