# IGNITE-386: Moving classes (5).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4c85f120 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4c85f120 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4c85f120 Branch: refs/heads/ignite-386 Commit: 4c85f1209b49a895d3bdff5817cbad406e9e1739 Parents: 50487c0 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 16:37:21 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 16:37:21 2015 +0300 ---------------------------------------------------------------------- .../fs/IgniteHadoopSecondaryFileSystem.java | 2 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 4 +- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 4 +- .../ignite/internal/igfs/hadoop/HadoopIgfs.java | 198 ------ .../HadoopIgfsCommunicationException.java | 57 -- .../igfs/hadoop/HadoopIgfsEndpoint.java | 210 ------- .../internal/igfs/hadoop/HadoopIgfsEx.java | 88 --- .../internal/igfs/hadoop/HadoopIgfsFuture.java | 94 --- .../internal/igfs/hadoop/HadoopIgfsInProc.java | 409 ------------ .../igfs/hadoop/HadoopIgfsInputStream.java | 626 ------------------- .../internal/igfs/hadoop/HadoopIgfsIo.java | 76 --- .../internal/igfs/hadoop/HadoopIgfsIpcIo.java | 599 ------------------ .../igfs/hadoop/HadoopIgfsIpcIoListener.java | 36 -- .../igfs/hadoop/HadoopIgfsJclLogger.java | 112 ---- .../internal/igfs/hadoop/HadoopIgfsOutProc.java | 466 -------------- .../igfs/hadoop/HadoopIgfsOutputStream.java | 201 ------ .../igfs/hadoop/HadoopIgfsProperties.java | 88 --- .../igfs/hadoop/HadoopIgfsProxyInputStream.java | 335 ---------- .../hadoop/HadoopIgfsProxyOutputStream.java | 165 ----- .../internal/igfs/hadoop/HadoopIgfsReader.java | 104 --- .../igfs/hadoop/HadoopIgfsStreamDelegate.java | 96 --- .../hadoop/HadoopIgfsStreamEventListener.java | 39 -- .../internal/igfs/hadoop/HadoopIgfsUtils.java | 131 ---- .../internal/igfs/hadoop/HadoopIgfsWrapper.java | 511 --------------- .../processors/hadoop/igfs/HadoopIgfs.java | 198 ++++++ .../igfs/HadoopIgfsCommunicationException.java | 57 ++ .../hadoop/igfs/HadoopIgfsEndpoint.java | 210 +++++++ .../processors/hadoop/igfs/HadoopIgfsEx.java | 88 +++ .../hadoop/igfs/HadoopIgfsFuture.java | 94 +++ .../hadoop/igfs/HadoopIgfsInProc.java | 409 ++++++++++++ .../hadoop/igfs/HadoopIgfsInputStream.java | 626 +++++++++++++++++++ .../processors/hadoop/igfs/HadoopIgfsIo.java | 76 +++ .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 599 ++++++++++++++++++ .../hadoop/igfs/HadoopIgfsIpcIoListener.java | 36 ++ .../hadoop/igfs/HadoopIgfsJclLogger.java | 112 ++++ .../hadoop/igfs/HadoopIgfsOutProc.java | 466 ++++++++++++++ .../hadoop/igfs/HadoopIgfsOutputStream.java | 201 ++++++ .../hadoop/igfs/HadoopIgfsProperties.java | 88 +++ .../hadoop/igfs/HadoopIgfsProxyInputStream.java | 335 ++++++++++ .../igfs/HadoopIgfsProxyOutputStream.java | 165 +++++ .../hadoop/igfs/HadoopIgfsReader.java | 104 +++ .../hadoop/igfs/HadoopIgfsStreamDelegate.java | 96 +++ .../igfs/HadoopIgfsStreamEventListener.java | 39 ++ .../processors/hadoop/igfs/HadoopIgfsUtils.java | 131 ++++ .../hadoop/igfs/HadoopIgfsWrapper.java | 511 +++++++++++++++ .../planner/HadoopDefaultMapReducePlanner.java | 2 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 2 +- .../IgniteHadoopFileSystemClientSelfTest.java | 2 +- ...IgniteHadoopFileSystemHandshakeSelfTest.java | 2 +- .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 2 +- 50 files changed, 4651 insertions(+), 4651 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java index 9547e9f..44e2c20 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.ipc.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index f7b5dda..659561d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -28,7 +28,7 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*; import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; /** * {@code IGFS} Hadoop 1.x file system driver over file system API. To use http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 016a068..023faa4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -28,7 +28,7 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*; import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; /** * {@code IGFS} Hadoop 2.x file system driver over file system API. To use http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java deleted file mode 100644 index 6ee593e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Facade for communication with grid. - */ -public interface HadoopIgfs { - /** - * Perform handshake. - * - * @param logDir Log directory. - * @return Future with handshake result. - * @throws IgniteCheckedException If failed. - */ - public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; - - /** - * Close connection. - * - * @param force Force flag. - */ - public void close(boolean force); - - /** - * Command to retrieve file info for some IGFS path. - * - * @param path Path to get file info for. - * @return Future for info operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to update file properties. - * - * @param path IGFS path to update properties. - * @param props Properties to update. - * @return Future for update operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Sets last access time and last modification time for a file. - * - * @param path Path to update times. - * @param accessTime Last access time to set. - * @param modificationTime Last modification time to set. - * @throws IgniteCheckedException If failed. - */ - public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, - IOException; - - /** - * Command to rename given path. - * - * @param src Source path. - * @param dest Destination path. - * @return Future for rename operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; - - /** - * Command to delete given path. - * - * @param path Path to delete. - * @param recursive {@code True} if deletion is recursive. - * @return Future for delete operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; - - /** - * Command to get affinity for given path, offset and length. - * - * @param path Path to get affinity for. - * @param start Start position (offset). - * @param len Data length. - * @return Future for affinity command. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, - IOException; - - /** - * Gets path summary. - * - * @param path Path to get summary for. - * @return Future that will be completed when summary is received. - * @throws IgniteCheckedException If failed. - */ - public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to create directories. - * - * @param path Path to create. - * @return Future for mkdirs operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Command to get list of files in directory. - * - * @param path Path to list. - * @return Future for listFiles operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to get directory listing. - * - * @param path Path to list. - * @return Future for listPaths operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Performs status request. - * - * @return Status response. - * @throws IgniteCheckedException If failed. - */ - public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, - IOException; - - /** - * Command to create file and open it for output. - * - * @param path Path to file. - * @param overwrite If {@code true} then old file contents will be lost. - * @param colocate If {@code true} and called on data node, file will be written on that node. - * @param replication Replication factor. - * @param props File properties for creation. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Open file for output appending data to the end of a file. - * - * @param path Path to file. - * @param create If {@code true}, file will be created if does not exist. - * @param props File properties. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java deleted file mode 100644 index ecaa61f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; - -/** - * Communication exception indicating a problem between file system and IGFS instance. - */ -public class HadoopIgfsCommunicationException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given throwable as a nested cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public HadoopIgfsCommunicationException(Exception cause) { - super(cause); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - */ - public HadoopIgfsCommunicationException(String msg) { - super(msg); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - * @param cause Cause. - */ - public HadoopIgfsCommunicationException(String msg, Exception cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java deleted file mode 100644 index dc8fcb8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * IGFS endpoint abstraction. - */ -public class HadoopIgfsEndpoint { - /** Localhost. */ - public static final String LOCALHOST = "127.0.0.1"; - - /** IGFS name. */ - private final String igfsName; - - /** Grid name. */ - private final String gridName; - - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Normalize IGFS URI. - * - * @param uri URI. - * @return Normalized URI. - * @throws IOException If failed. - */ - public static URI normalize(URI uri) throws IOException { - try { - if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme())) - throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); - - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority()); - - StringBuilder sb = new StringBuilder(); - - if (endpoint.igfs() != null) - sb.append(endpoint.igfs()); - - if (endpoint.grid() != null) - sb.append(":").append(endpoint.grid()); - - return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), - uri.getPath(), uri.getQuery(), uri.getFragment()); - } - catch (URISyntaxException | IgniteCheckedException e) { - throw new IOException("Failed to normalize URI: " + uri, e); - } - } - - /** - * Constructor. - * - * @param connStr Connection string. - * @throws IgniteCheckedException If failed to parse connection string. - */ - public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException { - if (connStr == null) - connStr = ""; - - String[] tokens = connStr.split("@", -1); - - IgniteBiTuple<String, Integer> hostPort; - - if (tokens.length == 1) { - igfsName = null; - gridName = null; - - hostPort = hostPort(connStr, connStr); - } - else if (tokens.length == 2) { - String authStr = tokens[0]; - - if (authStr.isEmpty()) { - gridName = null; - igfsName = null; - } - else { - String[] authTokens = authStr.split(":", -1); - - igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; - - if (authTokens.length == 1) - gridName = null; - else if (authTokens.length == 2) - gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - } - - hostPort = hostPort(connStr, tokens[1]); - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - host = hostPort.get1(); - - assert hostPort.get2() != null; - - port = hostPort.get2(); - } - - /** - * Parse host and port. - * - * @param connStr Full connection string. - * @param hostPortStr Host/port connection string part. - * @return Tuple with host and port. - * @throws IgniteCheckedException If failed to parse connection string. - */ - private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { - String[] tokens = hostPortStr.split(":", -1); - - String host = tokens[0]; - - if (F.isEmpty(host)) - host = LOCALHOST; - - int port; - - if (tokens.length == 1) - port = DFLT_IPC_PORT; - else if (tokens.length == 2) { - String portStr = tokens[1]; - - try { - port = Integer.valueOf(portStr); - - if (port < 0 || port > 65535) - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - return F.t(host, port); - } - - /** - * @return IGFS name. - */ - @Nullable public String igfs() { - return igfsName; - } - - /** - * @return Grid name. - */ - @Nullable public String grid() { - return gridName; - } - - /** - * @return Host. - */ - public String host() { - return host; - } - - /** - * @return Host. - */ - public boolean isLocal() { - return F.eq(LOCALHOST, host); - } - - /** - * @return Port. - */ - public int port() { - return port; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopIgfsEndpoint.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java deleted file mode 100644 index 5321fa3..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Extended IGFS server interface. - */ -public interface HadoopIgfsEx extends HadoopIgfs { - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param delegate Stream delegate. - * @param lsnr Event listener. - */ - public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param delegate Stream delegate. - */ - public void removeEventListener(HadoopIgfsStreamDelegate delegate); - - /** - * Asynchronously reads specified amount of bytes from opened input stream. - * - * @param delegate Stream delegate. - * @param pos Position to read from. - * @param len Data length to read. - * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining - * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will - * be the result of read future. - * @param outOff Output offset. - * @param outLen Output length. - * @return Read data. - */ - public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, - @Nullable final byte[] outBuf, final int outOff, final int outLen); - - /** - * Writes data to the stream with given streamId. This method does not return any future since - * no response to write request is sent. - * - * @param delegate Stream delegate. - * @param data Data to write. - * @param off Offset. - * @param len Length. - * @throws IOException If failed. - */ - public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; - - /** - * Close server stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; - - /** - * Flush output stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java deleted file mode 100644 index 9ae0161..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IGFS client future that holds response parse closure. - */ -public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> { - /** Output buffer. */ - private byte[] outBuf; - - /** Output offset. */ - private int outOff; - - /** Output length. */ - private int outLen; - - /** Read future flag. */ - private boolean read; - - /** - * @return Output buffer. - */ - public byte[] outputBuffer() { - return outBuf; - } - - /** - * @param outBuf Output buffer. - */ - public void outputBuffer(@Nullable byte[] outBuf) { - this.outBuf = outBuf; - } - - /** - * @return Offset in output buffer to write from. - */ - public int outputOffset() { - return outOff; - } - - /** - * @param outOff Offset in output buffer to write from. - */ - public void outputOffset(int outOff) { - this.outOff = outOff; - } - - /** - * @return Length to write to output buffer. - */ - public int outputLength() { - return outLen; - } - - /** - * @param outLen Length to write to output buffer. - */ - public void outputLength(int outLen) { - this.outLen = outLen; - } - - /** - * @param read {@code True} if this is a read future. - */ - public void read(boolean read) { - this.read = read; - } - - /** - * @return {@code True} if this is a read future. - */ - public boolean read() { - return read; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java deleted file mode 100644 index b0da50d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Communication with grid in the same process. - */ -public class HadoopIgfsInProc implements HadoopIgfsEx { - /** Target IGFS. */ - private final IgfsEx igfs; - - /** Buffer size. */ - private final int bufSize; - - /** Event listeners. */ - private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> lsnrs = - new ConcurrentHashMap<>(); - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param igfs Target IGFS. - * @param log Log. - */ - public HadoopIgfsInProc(IgfsEx igfs, Log log) { - this.igfs = igfs; - this.log = log; - - bufSize = igfs.configuration().getBlockSize() * 2; - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); - - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - // Perform cleanup. - for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to notify stream event listener", e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.info(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - return igfs.update(path, props); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - try { - igfs.setTimes(path, accessTime, modificationTime); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - try { - igfs.rename(src, dest); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); - } - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - try { - return igfs.delete(path, recursive); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - try { - return igfs.globalSpace(); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + - "stopping."); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listPaths(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listFiles(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - igfs.mkdirs(path, props); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.summary(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - try { - return igfs.affinity(path, start, len); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) - throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - - return new HadoopIgfsStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - - return new HadoopIgfsStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, - @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStreamAdapter stream = delegate.target(); - - try { - byte[] res = null; - - if (outBuf != null) { - int outTailLen = outBuf.length - outOff; - - if (len <= outTailLen) - stream.readFully(pos, outBuf, outOff, len); - else { - stream.readFully(pos, outBuf, outOff, outTailLen); - - int remainderLen = len - outTailLen; - - res = new byte[remainderLen]; - - stream.readFully(pos, res, 0, remainderLen); - } - } else { - res = new byte[len]; - - stream.readFully(pos, res, 0, len); - } - - return new GridPlainFutureAdapter<>(res); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - return new GridPlainFutureAdapter<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) - throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.write(data, off, len); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.flush(); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { - Closeable closeable = desc.target(); - - try { - closeable.close(); - } - catch (IllegalStateException e) { - throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(HadoopIgfsStreamDelegate delegate, - HadoopIgfsStreamEventListener lsnr) { - HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) { - HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [delegate=" + delegate + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java deleted file mode 100644 index 6293e2f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * IGFS input stream wrapper for hadoop interfaces. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable, - HadoopIgfsStreamEventListener { - /** Minimum buffer size. */ - private static final int MIN_BUF_SIZE = 4 * 1024; - - /** Server stream delegate. */ - private HadoopIgfsStreamDelegate delegate; - - /** Stream ID used by logger. */ - private long logStreamId; - - /** Stream position. */ - private long pos; - - /** Stream read limit. */ - private long limit; - - /** Mark position. */ - private long markPos = -1; - - /** Prefetch buffer. */ - private DoubleFetchBuffer buf = new DoubleFetchBuffer(); - - /** Buffer half size for double-buffering. */ - private int bufHalfSize; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Logger. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** - * Creates input stream. - * - * @param delegate Server stream delegate. - * @param limit Read limit. - * @param bufSize Buffer size. - * @param log Log. - * @param clientLog Client logger. - */ - public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log, - IgfsLogger clientLog, long logStreamId) { - assert limit >= 0; - - this.delegate = delegate; - this.limit = limit; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - checkClosed(); - - readStart(); - - try { - if (eof()) - return -1; - - buf.refreshAhead(pos); - - int res = buf.atPosition(pos); - - pos++; - total++; - - buf.refreshAhead(pos); - - return res; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - checkClosed(); - - if (eof()) - return -1; - - readStart(); - - try { - long remaining = limit - pos; - - int read = buf.flatten(b, pos, off, len); - - pos += read; - total += read; - remaining -= read; - - if (remaining > 0 && read != len) { - int readAmt = (int)Math.min(remaining, len - read); - - delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); - - read += readAmt; - pos += readAmt; - total += readAmt; - } - - buf.refreshAhead(pos); - - return read; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, n); - - long oldPos = pos; - - if (pos + n <= limit) - pos += n; - else - pos = limit; - - buf.refreshAhead(pos); - - return pos - oldPos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - checkClosed(); - - int available = buf.available(pos); - - assert available >= 0; - - return available; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - readStart(); - - if (log.isDebugEnabled()) - log.debug("Closing input stream: " + delegate); - - delegate.hadoop().closeStream(delegate); - - readEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - - markClosed(false); - - if (log.isDebugEnabled()) - log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + - ", userTime=" + userTime + ']'); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - markPos = pos; - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - - if (markPos == -1) - throw new IOException("Stream was not marked."); - - pos = markPos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return true; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - int read = (int)Math.min(len, remaining); - - // Return -1 at EOF. - if (read == 0) - return -1; - - readFully(position, buf, off, read); - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - checkClosed(); - - if (len > remaining) - throw new EOFException("End of stream reached before data was fully read."); - - readStart(); - - try { - int read = this.buf.flatten(buf, position, off, len); - - total += read; - - if (read != len) { - int readAmt = len - read; - - delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); - - total += readAmt; - } - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, position, len); - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void readFully(long position, byte[] buf) throws IOException { - readFully(position, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - A.ensure(pos >= 0, "position must be non-negative"); - - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - - if (pos > limit) - pos = limit; - - if (log.isDebugEnabled()) - log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); - - this.pos = pos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** {@inheritDoc} */ - @Override public void onClose() { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - // No-op. - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - this.connBroken = connBroken; - - delegate.hadoop().removeEventListener(delegate); - } - } - - /** - * @throws IOException If check failed. - */ - private void checkClosed() throws IOException { - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** - * @return {@code True} if end of stream reached. - */ - private boolean eof() { - return limit == pos; - } - - /** - * Asynchronous prefetch buffer. - */ - private static class FetchBufferPart { - /** Read future. */ - private GridPlainFuture<byte[]> readFut; - - /** Position of cached chunk in file. */ - private long pos; - - /** Prefetch length. Need to store as read future result might be not available yet. */ - private int len; - - /** - * Creates fetch buffer part. - * - * @param readFut Read future for this buffer. - * @param pos Read position. - * @param len Chunk length. - */ - private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { - this.readFut = readFut; - this.pos = pos; - this.len = len; - } - - /** - * Copies cached data if specified position matches cached region. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Offset in destination buffer from which start writing. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If read future failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - // If read start position is within cached boundaries. - if (contains(pos)) { - byte[] data = readFut.get(); - - int srcPos = (int)(pos - this.pos); - int cpLen = Math.min(len, data.length - srcPos); - - U.arrayCopy(data, srcPos, dst, dstOff, cpLen); - - return cpLen; - } - - return 0; - } - - /** - * @return {@code True} if data is ready to be read. - */ - public boolean ready() { - return readFut.isDone(); - } - - /** - * Checks if current buffer part contains given position. - * - * @param pos Position to check. - * @return {@code True} if position matches buffer region. - */ - public boolean contains(long pos) { - return this.pos <= pos && this.pos + len > pos; - } - } - - private class DoubleFetchBuffer { - /** */ - private FetchBufferPart first; - - /** */ - private FetchBufferPart second; - - /** - * Copies fetched data from both buffers to destination array if cached region matched read position. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Destination buffer offset. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If any read operation failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - assert dstOff >= 0; - assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + - ", len=" + len + ']'; - - int bytesCopied = 0; - - if (first != null) { - bytesCopied += first.flatten(dst, pos, dstOff, len); - - if (bytesCopied != len && second != null) { - assert second.pos == first.pos + first.len; - - bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); - } - } - - return bytesCopied; - } - - /** - * Gets byte at specified position in buffer. - * - * @param pos Stream position. - * @return Read byte. - * @throws IgniteCheckedException If read failed. - */ - public int atPosition(long pos) throws IgniteCheckedException { - // Should not reach here if stream contains no data. - assert first != null; - - if (first.contains(pos)) { - byte[] bytes = first.readFut.get(); - - return bytes[((int)(pos - first.pos))] & 0xFF; - } - else { - assert second != null; - assert second.contains(pos); - - byte[] bytes = second.readFut.get(); - - return bytes[((int)(pos - second.pos))] & 0xFF; - } - } - - /** - * Starts asynchronous buffer refresh if needed, depending on current position. - * - * @param pos Current stream position. - */ - public void refreshAhead(long pos) { - if (fullPrefetch(pos)) { - first = fetch(pos, bufHalfSize); - second = fetch(pos + bufHalfSize, bufHalfSize); - } - else if (needFlip(pos)) { - first = second; - - second = fetch(first.pos + first.len, bufHalfSize); - } - } - - /** - * @param pos Position from which read is expected. - * @return Number of bytes available to be read without blocking. - */ - public int available(long pos) { - int available = 0; - - if (first != null) { - if (first.contains(pos)) { - if (first.ready()) { - available += (pos - first.pos); - - if (second != null && second.ready()) - available += second.len; - } - } - else { - if (second != null && second.contains(pos) && second.ready()) - available += (pos - second.pos); - } - } - - return available; - } - - /** - * Checks if position shifted enough to forget previous buffer. - * - * @param pos Current position. - * @return {@code True} if need flip buffers. - */ - private boolean needFlip(long pos) { - // Return true if we read more then half of second buffer. - return second != null && second.contains(pos); - } - - /** - * Determines if all cached bytes should be discarded and new region should be - * prefetched. - * - * @param curPos Current stream position. - * @return {@code True} if need to refresh both blocks. - */ - private boolean fullPrefetch(long curPos) { - // If no data was prefetched yet, return true. - return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); - } - - /** - * Starts asynchronous fetch for given region. - * - * @param pos Position to read from. - * @param size Number of bytes to read. - * @return Fetch buffer part. - */ - private FetchBufferPart fetch(long pos, int size) { - long remaining = limit - pos; - - size = (int)Math.min(size, remaining); - - return size <= 0 ? null : - new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java deleted file mode 100644 index 775e7d0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response - * and request without response. - */ -public interface HadoopIgfsIo { - /** - * Sends given IGFS client message and asynchronously awaits for response. - * - * @param msg Message to send. - * @return Future that will be completed. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Sends given IGFS client message and asynchronously awaits for response. When IO detects response - * beginning for given message it stops reading data and passes input stream to closure which can read - * response in a specific way. - * - * @param msg Message to send. - * @param outBuf Output buffer. If {@code null}, the output buffer is not used. - * @param outOff Output buffer offset. - * @param outLen Output buffer length. - * @return Future that will be completed when response is returned from closure. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) - throws IgniteCheckedException; - - /** - * Sends given message and does not wait for response. - * - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param lsnr Event listener. - */ - public void addEventListener(HadoopIgfsIpcIoListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param lsnr Event listener. - */ - public void removeEventListener(HadoopIgfsIpcIoListener lsnr); -}