http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java new file mode 100644 index 0000000..ab5fa68 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * IGFS Hadoop output stream implementation. + */ +public class HadoopOutputIgfsStream extends OutputStream implements HadoopIgfsStreamEventListener { + /** Log instance. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Log stream ID. */ + private long logStreamId; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Error message. */ + private volatile String errMsg; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** + * Creates light output stream. + * + * @param delegate Server stream delegate. + * @param log Logger to use. + * @param clientLog Client logger. + */ + public HadoopOutputIgfsStream(HadoopIgfsStreamDelegate delegate, Log log, + IgfsLogger clientLog, long logStreamId) { + this.delegate = delegate; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + check(); + + writeStart(); + + try { + delegate.hadoop().writeData(delegate, b, off, len); + + total += len; + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + write(new byte[] {(byte)b}); + + total++; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + delegate.hadoop().flush(delegate); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (!closed) { + if (log.isDebugEnabled()) + log.debug("Closing output stream: " + delegate); + + writeStart(); + + delegate.hadoop().closeStream(delegate); + + markClosed(false); + + writeEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + + if (log.isDebugEnabled()) + log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + + ", userTime=" + userTime / 1000 + ']'); + } + else if(connBroken) + throw new IOException( + "Failed to close stream, because connection was broken (data could have been lost)."); + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + delegate.hadoop().removeEventListener(delegate); + + this.connBroken = connBroken; + } + } + + /** + * @throws IOException If check failed. + */ + private void check() throws IOException { + String errMsg0 = errMsg; + + if (errMsg0 != null) + throw new IOException(errMsg0); + + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() throws IgniteCheckedException { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + this.errMsg = errMsg; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java deleted file mode 100644 index 27d6e33..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Facade for communication with grid. - */ -public interface IgfsHadoop { - /** - * Perform handshake. - * - * @param logDir Log directory. - * @return Future with handshake result. - * @throws IgniteCheckedException If failed. - */ - public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; - - /** - * Close connection. - * - * @param force Force flag. - */ - public void close(boolean force); - - /** - * Command to retrieve file info for some IGFS path. - * - * @param path Path to get file info for. - * @return Future for info operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to update file properties. - * - * @param path IGFS path to update properties. - * @param props Properties to update. - * @return Future for update operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Sets last access time and last modification time for a file. - * - * @param path Path to update times. - * @param accessTime Last access time to set. - * @param modificationTime Last modification time to set. - * @throws IgniteCheckedException If failed. - */ - public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, - IOException; - - /** - * Command to rename given path. - * - * @param src Source path. - * @param dest Destination path. - * @return Future for rename operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; - - /** - * Command to delete given path. - * - * @param path Path to delete. - * @param recursive {@code True} if deletion is recursive. - * @return Future for delete operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; - - /** - * Command to get affinity for given path, offset and length. - * - * @param path Path to get affinity for. - * @param start Start position (offset). - * @param len Data length. - * @return Future for affinity command. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, - IOException; - - /** - * Gets path summary. - * - * @param path Path to get summary for. - * @return Future that will be completed when summary is received. - * @throws IgniteCheckedException If failed. - */ - public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to create directories. - * - * @param path Path to create. - * @return Future for mkdirs operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Command to get list of files in directory. - * - * @param path Path to list. - * @return Future for listFiles operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to get directory listing. - * - * @param path Path to list. - * @return Future for listPaths operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Performs status request. - * - * @return Status response. - * @throws IgniteCheckedException If failed. - */ - public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, - IOException; - - /** - * Command to create file and open it for output. - * - * @param path Path to file. - * @param overwrite If {@code true} then old file contents will be lost. - * @param colocate If {@code true} and called on data node, file will be written on that node. - * @param replication Replication factor. - * @param props File properties for creation. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Open file for output appending data to the end of a file. - * - * @param path Path to file. - * @param create If {@code true}, file will be created if does not exist. - * @param props File properties. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java deleted file mode 100644 index 03bf733..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; - -/** - * Communication exception indicating a problem between file system and IGFS instance. - */ -public class IgfsHadoopCommunicationException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given throwable as a nested cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public IgfsHadoopCommunicationException(Exception cause) { - super(cause); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - */ - public IgfsHadoopCommunicationException(String msg) { - super(msg); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - * @param cause Cause. - */ - public IgfsHadoopCommunicationException(String msg, Exception cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java deleted file mode 100644 index 35638ea..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * IGFS endpoint abstraction. - */ -public class IgfsHadoopEndpoint { - /** Localhost. */ - public static final String LOCALHOST = "127.0.0.1"; - - /** IGFS name. */ - private final String igfsName; - - /** Grid name. */ - private final String gridName; - - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Normalize IGFS URI. - * - * @param uri URI. - * @return Normalized URI. - * @throws IOException If failed. - */ - public static URI normalize(URI uri) throws IOException { - try { - if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme())) - throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); - - IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority()); - - StringBuilder sb = new StringBuilder(); - - if (endpoint.igfs() != null) - sb.append(endpoint.igfs()); - - if (endpoint.grid() != null) - sb.append(":").append(endpoint.grid()); - - return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), - uri.getPath(), uri.getQuery(), uri.getFragment()); - } - catch (URISyntaxException | IgniteCheckedException e) { - throw new IOException("Failed to normalize URI: " + uri, e); - } - } - - /** - * Constructor. - * - * @param connStr Connection string. - * @throws IgniteCheckedException If failed to parse connection string. - */ - public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException { - if (connStr == null) - connStr = ""; - - String[] tokens = connStr.split("@", -1); - - IgniteBiTuple<String, Integer> hostPort; - - if (tokens.length == 1) { - igfsName = null; - gridName = null; - - hostPort = hostPort(connStr, connStr); - } - else if (tokens.length == 2) { - String authStr = tokens[0]; - - if (authStr.isEmpty()) { - gridName = null; - igfsName = null; - } - else { - String[] authTokens = authStr.split(":", -1); - - igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; - - if (authTokens.length == 1) - gridName = null; - else if (authTokens.length == 2) - gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - } - - hostPort = hostPort(connStr, tokens[1]); - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - host = hostPort.get1(); - - assert hostPort.get2() != null; - - port = hostPort.get2(); - } - - /** - * Parse host and port. - * - * @param connStr Full connection string. - * @param hostPortStr Host/port connection string part. - * @return Tuple with host and port. - * @throws IgniteCheckedException If failed to parse connection string. - */ - private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { - String[] tokens = hostPortStr.split(":", -1); - - String host = tokens[0]; - - if (F.isEmpty(host)) - host = LOCALHOST; - - int port; - - if (tokens.length == 1) - port = DFLT_IPC_PORT; - else if (tokens.length == 2) { - String portStr = tokens[1]; - - try { - port = Integer.valueOf(portStr); - - if (port < 0 || port > 65535) - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - return F.t(host, port); - } - - /** - * @return IGFS name. - */ - @Nullable public String igfs() { - return igfsName; - } - - /** - * @return Grid name. - */ - @Nullable public String grid() { - return gridName; - } - - /** - * @return Host. - */ - public String host() { - return host; - } - - /** - * @return Host. - */ - public boolean isLocal() { - return F.eq(LOCALHOST, host); - } - - /** - * @return Port. - */ - public int port() { - return port; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopEndpoint.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java deleted file mode 100644 index da86e37..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Extended IGFS server interface. - */ -public interface IgfsHadoopEx extends IgfsHadoop { - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param delegate Stream delegate. - * @param lsnr Event listener. - */ - public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param delegate Stream delegate. - */ - public void removeEventListener(IgfsHadoopStreamDelegate delegate); - - /** - * Asynchronously reads specified amount of bytes from opened input stream. - * - * @param delegate Stream delegate. - * @param pos Position to read from. - * @param len Data length to read. - * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining - * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will - * be the result of read future. - * @param outOff Output offset. - * @param outLen Output length. - * @return Read data. - */ - public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable final byte[] outBuf, final int outOff, final int outLen); - - /** - * Writes data to the stream with given streamId. This method does not return any future since - * no response to write request is sent. - * - * @param delegate Stream delegate. - * @param data Data to write. - * @param off Offset. - * @param len Length. - * @throws IOException If failed. - */ - public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException; - - /** - * Close server stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException; - - /** - * Flush output stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void flush(IgfsHadoopStreamDelegate delegate) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java deleted file mode 100644 index c9d1322..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.fs.permission.*; -import org.apache.ignite.*; - -import java.util.*; - -import static org.apache.ignite.IgniteFs.*; - -/** - * Hadoop file system properties. - */ -public class IgfsHadoopFSProperties { - /** Username. */ - private String usrName; - - /** Group name. */ - private String grpName; - - /** Permissions. */ - private FsPermission perm; - - /** - * Constructor. - * - * @param props Properties. - * @throws IgniteException In case of error. - */ - public IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException { - usrName = props.get(PROP_USER_NAME); - grpName = props.get(PROP_GROUP_NAME); - - String permStr = props.get(PROP_PERMISSION); - - if (permStr != null) { - try { - perm = new FsPermission((short)Integer.parseInt(permStr, 8)); - } - catch (NumberFormatException ignore) { - throw new IgniteException("Permissions cannot be parsed: " + permStr); - } - } - } - - /** - * Get user name. - * - * @return User name. - */ - public String userName() { - return usrName; - } - - /** - * Get group name. - * - * @return Group name. - */ - public String groupName() { - return grpName; - } - - /** - * Get permission. - * - * @return Permission. - */ - public FsPermission permission() { - return perm; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java deleted file mode 100644 index 476641c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IGFS client future that holds response parse closure. - */ -public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> { - /** Output buffer. */ - private byte[] outBuf; - - /** Output offset. */ - private int outOff; - - /** Output length. */ - private int outLen; - - /** Read future flag. */ - private boolean read; - - /** - * @return Output buffer. - */ - public byte[] outputBuffer() { - return outBuf; - } - - /** - * @param outBuf Output buffer. - */ - public void outputBuffer(@Nullable byte[] outBuf) { - this.outBuf = outBuf; - } - - /** - * @return Offset in output buffer to write from. - */ - public int outputOffset() { - return outOff; - } - - /** - * @param outOff Offset in output buffer to write from. - */ - public void outputOffset(int outOff) { - this.outOff = outOff; - } - - /** - * @return Length to write to output buffer. - */ - public int outputLength() { - return outLen; - } - - /** - * @param outLen Length to write to output buffer. - */ - public void outputLength(int outLen) { - this.outLen = outLen; - } - - /** - * @param read {@code True} if this is a read future. - */ - public void read(boolean read) { - this.read = read; - } - - /** - * @return {@code True} if this is a read future. - */ - public boolean read() { - return read; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java deleted file mode 100644 index 8245125..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Communication with grid in the same process. - */ -public class IgfsHadoopInProc implements IgfsHadoopEx { - /** Target IGFS. */ - private final IgfsEx igfs; - - /** Buffer size. */ - private final int bufSize; - - /** Event listeners. */ - private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs = - new ConcurrentHashMap<>(); - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param igfs Target IGFS. - * @param log Log. - */ - public IgfsHadoopInProc(IgfsEx igfs, Log log) { - this.igfs = igfs; - this.log = log; - - bufSize = igfs.configuration().getBlockSize() * 2; - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); - - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - // Perform cleanup. - for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to notify stream event listener", e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.info(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - return igfs.update(path, props); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - try { - igfs.setTimes(path, accessTime, modificationTime); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - try { - igfs.rename(src, dest); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src); - } - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - try { - return igfs.delete(path, recursive); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - try { - return igfs.globalSpace(); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " + - "stopping."); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listPaths(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listFiles(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - igfs.mkdirs(path, props); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.summary(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - try { - return igfs.affinity(path, start, len); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) - throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStreamAdapter stream = delegate.target(); - - try { - byte[] res = null; - - if (outBuf != null) { - int outTailLen = outBuf.length - outOff; - - if (len <= outTailLen) - stream.readFully(pos, outBuf, outOff, len); - else { - stream.readFully(pos, outBuf, outOff, outTailLen); - - int remainderLen = len - outTailLen; - - res = new byte[remainderLen]; - - stream.readFully(pos, res, 0, remainderLen); - } - } else { - res = new byte[len]; - - stream.readFully(pos, res, 0, len); - } - - return new GridPlainFutureAdapter<>(res); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - return new GridPlainFutureAdapter<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) - throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.write(data, off, len); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.flush(); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { - Closeable closeable = desc.target(); - - try { - closeable.close(); - } - catch (IllegalStateException e) { - throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(IgfsHadoopStreamDelegate delegate, - IgfsHadoopStreamEventListener lsnr) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [delegate=" + delegate + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java deleted file mode 100644 index efc5264..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * IGFS input stream wrapper for hadoop interfaces. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public final class IgfsHadoopInputStream extends InputStream implements Seekable, PositionedReadable, - IgfsHadoopStreamEventListener { - /** Minimum buffer size. */ - private static final int MIN_BUF_SIZE = 4 * 1024; - - /** Server stream delegate. */ - private IgfsHadoopStreamDelegate delegate; - - /** Stream ID used by logger. */ - private long logStreamId; - - /** Stream position. */ - private long pos; - - /** Stream read limit. */ - private long limit; - - /** Mark position. */ - private long markPos = -1; - - /** Prefetch buffer. */ - private DoubleFetchBuffer buf = new DoubleFetchBuffer(); - - /** Buffer half size for double-buffering. */ - private int bufHalfSize; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Logger. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** - * Creates input stream. - * - * @param delegate Server stream delegate. - * @param limit Read limit. - * @param bufSize Buffer size. - * @param log Log. - * @param clientLog Client logger. - */ - public IgfsHadoopInputStream(IgfsHadoopStreamDelegate delegate, long limit, int bufSize, Log log, - IgfsLogger clientLog, long logStreamId) { - assert limit >= 0; - - this.delegate = delegate; - this.limit = limit; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - checkClosed(); - - readStart(); - - try { - if (eof()) - return -1; - - buf.refreshAhead(pos); - - int res = buf.atPosition(pos); - - pos++; - total++; - - buf.refreshAhead(pos); - - return res; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - checkClosed(); - - if (eof()) - return -1; - - readStart(); - - try { - long remaining = limit - pos; - - int read = buf.flatten(b, pos, off, len); - - pos += read; - total += read; - remaining -= read; - - if (remaining > 0 && read != len) { - int readAmt = (int)Math.min(remaining, len - read); - - delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); - - read += readAmt; - pos += readAmt; - total += readAmt; - } - - buf.refreshAhead(pos); - - return read; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, n); - - long oldPos = pos; - - if (pos + n <= limit) - pos += n; - else - pos = limit; - - buf.refreshAhead(pos); - - return pos - oldPos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - checkClosed(); - - int available = buf.available(pos); - - assert available >= 0; - - return available; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - readStart(); - - if (log.isDebugEnabled()) - log.debug("Closing input stream: " + delegate); - - delegate.hadoop().closeStream(delegate); - - readEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - - markClosed(false); - - if (log.isDebugEnabled()) - log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + - ", userTime=" + userTime + ']'); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - markPos = pos; - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - - if (markPos == -1) - throw new IOException("Stream was not marked."); - - pos = markPos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return true; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - int read = (int)Math.min(len, remaining); - - // Return -1 at EOF. - if (read == 0) - return -1; - - readFully(position, buf, off, read); - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - checkClosed(); - - if (len > remaining) - throw new EOFException("End of stream reached before data was fully read."); - - readStart(); - - try { - int read = this.buf.flatten(buf, position, off, len); - - total += read; - - if (read != len) { - int readAmt = len - read; - - delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); - - total += readAmt; - } - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, position, len); - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void readFully(long position, byte[] buf) throws IOException { - readFully(position, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - A.ensure(pos >= 0, "position must be non-negative"); - - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - - if (pos > limit) - pos = limit; - - if (log.isDebugEnabled()) - log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); - - this.pos = pos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** {@inheritDoc} */ - @Override public void onClose() { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - // No-op. - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - this.connBroken = connBroken; - - delegate.hadoop().removeEventListener(delegate); - } - } - - /** - * @throws IOException If check failed. - */ - private void checkClosed() throws IOException { - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** - * @return {@code True} if end of stream reached. - */ - private boolean eof() { - return limit == pos; - } - - /** - * Asynchronous prefetch buffer. - */ - private static class FetchBufferPart { - /** Read future. */ - private GridPlainFuture<byte[]> readFut; - - /** Position of cached chunk in file. */ - private long pos; - - /** Prefetch length. Need to store as read future result might be not available yet. */ - private int len; - - /** - * Creates fetch buffer part. - * - * @param readFut Read future for this buffer. - * @param pos Read position. - * @param len Chunk length. - */ - private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { - this.readFut = readFut; - this.pos = pos; - this.len = len; - } - - /** - * Copies cached data if specified position matches cached region. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Offset in destination buffer from which start writing. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If read future failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - // If read start position is within cached boundaries. - if (contains(pos)) { - byte[] data = readFut.get(); - - int srcPos = (int)(pos - this.pos); - int cpLen = Math.min(len, data.length - srcPos); - - U.arrayCopy(data, srcPos, dst, dstOff, cpLen); - - return cpLen; - } - - return 0; - } - - /** - * @return {@code True} if data is ready to be read. - */ - public boolean ready() { - return readFut.isDone(); - } - - /** - * Checks if current buffer part contains given position. - * - * @param pos Position to check. - * @return {@code True} if position matches buffer region. - */ - public boolean contains(long pos) { - return this.pos <= pos && this.pos + len > pos; - } - } - - private class DoubleFetchBuffer { - /** */ - private FetchBufferPart first; - - /** */ - private FetchBufferPart second; - - /** - * Copies fetched data from both buffers to destination array if cached region matched read position. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Destination buffer offset. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If any read operation failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - assert dstOff >= 0; - assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + - ", len=" + len + ']'; - - int bytesCopied = 0; - - if (first != null) { - bytesCopied += first.flatten(dst, pos, dstOff, len); - - if (bytesCopied != len && second != null) { - assert second.pos == first.pos + first.len; - - bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); - } - } - - return bytesCopied; - } - - /** - * Gets byte at specified position in buffer. - * - * @param pos Stream position. - * @return Read byte. - * @throws IgniteCheckedException If read failed. - */ - public int atPosition(long pos) throws IgniteCheckedException { - // Should not reach here if stream contains no data. - assert first != null; - - if (first.contains(pos)) { - byte[] bytes = first.readFut.get(); - - return bytes[((int)(pos - first.pos))] & 0xFF; - } - else { - assert second != null; - assert second.contains(pos); - - byte[] bytes = second.readFut.get(); - - return bytes[((int)(pos - second.pos))] & 0xFF; - } - } - - /** - * Starts asynchronous buffer refresh if needed, depending on current position. - * - * @param pos Current stream position. - */ - public void refreshAhead(long pos) { - if (fullPrefetch(pos)) { - first = fetch(pos, bufHalfSize); - second = fetch(pos + bufHalfSize, bufHalfSize); - } - else if (needFlip(pos)) { - first = second; - - second = fetch(first.pos + first.len, bufHalfSize); - } - } - - /** - * @param pos Position from which read is expected. - * @return Number of bytes available to be read without blocking. - */ - public int available(long pos) { - int available = 0; - - if (first != null) { - if (first.contains(pos)) { - if (first.ready()) { - available += (pos - first.pos); - - if (second != null && second.ready()) - available += second.len; - } - } - else { - if (second != null && second.contains(pos) && second.ready()) - available += (pos - second.pos); - } - } - - return available; - } - - /** - * Checks if position shifted enough to forget previous buffer. - * - * @param pos Current position. - * @return {@code True} if need flip buffers. - */ - private boolean needFlip(long pos) { - // Return true if we read more then half of second buffer. - return second != null && second.contains(pos); - } - - /** - * Determines if all cached bytes should be discarded and new region should be - * prefetched. - * - * @param curPos Current stream position. - * @return {@code True} if need to refresh both blocks. - */ - private boolean fullPrefetch(long curPos) { - // If no data was prefetched yet, return true. - return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); - } - - /** - * Starts asynchronous fetch for given region. - * - * @param pos Position to read from. - * @param size Number of bytes to read. - * @return Fetch buffer part. - */ - private FetchBufferPart fetch(long pos, int size) { - long remaining = limit - pos; - - size = (int)Math.min(size, remaining); - - return size <= 0 ? null : - new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java deleted file mode 100644 index 46f5a6c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response - * and request without response. - */ -public interface IgfsHadoopIo { - /** - * Sends given IGFS client message and asynchronously awaits for response. - * - * @param msg Message to send. - * @return Future that will be completed. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Sends given IGFS client message and asynchronously awaits for response. When IO detects response - * beginning for given message it stops reading data and passes input stream to closure which can read - * response in a specific way. - * - * @param msg Message to send. - * @param outBuf Output buffer. If {@code null}, the output buffer is not used. - * @param outOff Output buffer offset. - * @param outLen Output buffer length. - * @return Future that will be completed when response is returned from closure. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) - throws IgniteCheckedException; - - /** - * Sends given message and does not wait for response. - * - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param lsnr Event listener. - */ - public void addEventListener(IgfsHadoopIpcIoListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param lsnr Event listener. - */ - public void removeEventListener(IgfsHadoopIpcIoListener lsnr); -}
