# IGNITE-386: WIP on internal namings (4).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/17c8d0d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/17c8d0d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/17c8d0d9 Branch: refs/heads/ignite-386 Commit: 17c8d0d90fe092a7454084126b7054f9078e8933 Parents: 1c4b00d Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 16:05:27 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 16:05:27 2015 +0300 ---------------------------------------------------------------------- .../fs/IgniteHadoopSecondaryFileSystem.java | 10 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 24 +- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 20 +- .../ignite/internal/igfs/hadoop/HadoopIgfs.java | 198 ++ .../HadoopIgfsCommunicationException.java | 57 + .../igfs/hadoop/HadoopIgfsEndpoint.java | 210 ++ .../internal/igfs/hadoop/HadoopIgfsEx.java | 88 + .../internal/igfs/hadoop/HadoopIgfsFuture.java | 94 + .../internal/igfs/hadoop/HadoopIgfsInProc.java | 409 ++++ .../internal/igfs/hadoop/HadoopIgfsIo.java | 76 + .../igfs/hadoop/HadoopIgfsIpcIoListener.java | 36 + .../igfs/hadoop/HadoopIgfsJclLogger.java | 112 + .../igfs/hadoop/HadoopIgfsProperties.java | 88 + .../igfs/hadoop/HadoopIgfsProxyInputStream.java | 335 +++ .../hadoop/HadoopIgfsProxyOutputStream.java | 165 ++ .../internal/igfs/hadoop/HadoopIgfsReader.java | 104 + .../igfs/hadoop/HadoopIgfsStreamDelegate.java | 96 + .../hadoop/HadoopIgfsStreamEventListener.java | 39 + .../internal/igfs/hadoop/HadoopIgfsUtils.java | 131 ++ .../internal/igfs/hadoop/HadoopIgfsWrapper.java | 511 +++++ .../igfs/hadoop/HadoopInputIgfsStream.java | 626 ++++++ .../internal/igfs/hadoop/HadoopIpcIgfsIo.java | 599 ++++++ .../internal/igfs/hadoop/HadoopOutProcIgfs.java | 466 +++++ .../igfs/hadoop/HadoopOutputIgfsStream.java | 201 ++ .../ignite/internal/igfs/hadoop/IgfsHadoop.java | 198 -- .../IgfsHadoopCommunicationException.java | 57 - .../igfs/hadoop/IgfsHadoopEndpoint.java | 210 -- .../internal/igfs/hadoop/IgfsHadoopEx.java | 88 - .../igfs/hadoop/IgfsHadoopFSProperties.java | 88 - .../internal/igfs/hadoop/IgfsHadoopFuture.java | 94 - .../internal/igfs/hadoop/IgfsHadoopInProc.java | 409 ---- .../igfs/hadoop/IgfsHadoopInputStream.java | 626 ------ .../internal/igfs/hadoop/IgfsHadoopIo.java | 76 - .../internal/igfs/hadoop/IgfsHadoopIpcIo.java | 599 ------ .../igfs/hadoop/IgfsHadoopIpcIoListener.java | 36 - .../igfs/hadoop/IgfsHadoopJclLogger.java | 112 - .../internal/igfs/hadoop/IgfsHadoopOutProc.java | 466 ----- .../igfs/hadoop/IgfsHadoopOutputStream.java | 201 -- .../igfs/hadoop/IgfsHadoopProxyInputStream.java | 335 --- .../hadoop/IgfsHadoopProxyOutputStream.java | 165 -- .../internal/igfs/hadoop/IgfsHadoopReader.java | 104 - .../igfs/hadoop/IgfsHadoopStreamDelegate.java | 96 - .../hadoop/IgfsHadoopStreamEventListener.java | 39 - .../internal/igfs/hadoop/IgfsHadoopUtils.java | 131 -- .../internal/igfs/hadoop/IgfsHadoopWrapper.java | 511 ----- .../planner/HadoopDefaultMapReducePlanner.java | 2 +- .../HadoopIgfs20FileSystemAbstractSelfTest.java | 1967 ++++++++++++++++++ ...Igfs20FileSystemLoopbackPrimarySelfTest.java | 74 + ...oopIgfs20FileSystemShmemPrimarySelfTest.java | 74 + .../igfs/HadoopIgfsDualAbstractSelfTest.java | 304 +++ .../igfs/HadoopIgfsDualAsyncSelfTest.java | 32 + .../ignite/igfs/HadoopIgfsDualSyncSelfTest.java | 32 + .../IgfsHadoop20FileSystemAbstractSelfTest.java | 1967 ------------------ ...doop20FileSystemLoopbackPrimarySelfTest.java | 74 - ...sHadoop20FileSystemShmemPrimarySelfTest.java | 74 - .../igfs/IgfsHadoopDualAbstractSelfTest.java | 304 --- .../igfs/IgfsHadoopDualAsyncSelfTest.java | 32 - .../ignite/igfs/IgfsHadoopDualSyncSelfTest.java | 32 - .../IgniteHadoopFileSystemAbstractSelfTest.java | 18 +- .../IgniteHadoopFileSystemClientSelfTest.java | 6 +- ...IgniteHadoopFileSystemHandshakeSelfTest.java | 2 +- .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 12 +- .../testsuites/IgniteHadoopTestSuite.java | 6 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 2 +- 64 files changed, 7175 insertions(+), 7175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java index 5f06a65..9547e9f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java @@ -157,7 +157,7 @@ public class IgniteHadoopSecondaryFileSystem implements Igfs, AutoCloseable { /** {@inheritDoc} */ @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); try { if (props0.userName() != null || props0.groupName() != null) @@ -211,7 +211,7 @@ public class IgniteHadoopSecondaryFileSystem implements Igfs, AutoCloseable { /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { try { - if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) + if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); } catch (IOException e) { @@ -272,7 +272,7 @@ public class IgniteHadoopSecondaryFileSystem implements Igfs, AutoCloseable { /** {@inheritDoc} */ @Override public IgfsReader open(IgfsPath path, int bufSize) { - return new IgfsHadoopReader(fileSys, convert(path), bufSize); + return new HadoopIgfsReader(fileSys, convert(path), bufSize); } /** {@inheritDoc} */ @@ -288,8 +288,8 @@ public class IgniteHadoopSecondaryFileSystem implements Igfs, AutoCloseable { /** {@inheritDoc} */ @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, long blockSize, @Nullable Map<String, String> props) { - IgfsHadoopFSProperties props0 = - new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap()); + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index f25b29f..9c95437 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*; import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; +import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*; /** * {@code IGFS} Hadoop 1.x file system driver over file system API. To use @@ -95,7 +95,7 @@ public class IgniteHadoopFileSystem extends FileSystem { private final AtomicBoolean closeGuard = new AtomicBoolean(); /** Grid remote client. */ - private IgfsHadoopWrapper rmtClient; + private HadoopIgfsWrapper rmtClient; /** User name for each thread. */ private final ThreadLocal<String> userName = new ThreadLocal<String>(){ @@ -243,7 +243,7 @@ public class IgniteHadoopFileSystem extends FileSystem { String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -507,13 +507,13 @@ public class IgniteHadoopFileSystem extends FileSystem { clientLog.logOpen(logId, path, PROXY, bufSize, size); - return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); + return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); } else return is; } else { - IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? + HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); long logId = -1; @@ -528,7 +528,7 @@ public class IgniteHadoopFileSystem extends FileSystem { LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + ", bufSize=" + bufSize + ']'); - IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), + HadoopInputIgfsStream igfsIn = new HadoopInputIgfsStream(stream, stream.length(), bufSize, LOG, clientLog, logId); if (LOG.isDebugEnabled()) @@ -575,14 +575,14 @@ public class IgniteHadoopFileSystem extends FileSystem { clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); } else return os; } else { // Create stream and close it in the 'finally' section if any sequential operation failed. - IgfsHadoopStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, + HadoopIgfsStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, F.asMap(PROP_PERMISSION, toString(perm), PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites))); @@ -599,7 +599,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (LOG.isDebugEnabled()) LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog, + HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); @@ -652,13 +652,13 @@ public class IgniteHadoopFileSystem extends FileSystem { clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); } else return os; } else { - IgfsHadoopStreamDelegate stream = rmtClient.append(path, false, null); + HadoopIgfsStreamDelegate stream = rmtClient.append(path, false, null); assert stream != null; @@ -673,7 +673,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (LOG.isDebugEnabled()) LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog, + HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 280af82..1c9165c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*; import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; +import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*; /** * {@code IGFS} Hadoop 2.x file system driver over file system API. To use @@ -89,7 +89,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea private final AtomicBoolean closeGuard = new AtomicBoolean(); /** Grid remote client. */ - private IgfsHadoopWrapper rmtClient; + private HadoopIgfsWrapper rmtClient; /** Working directory. */ private IgfsPath workingDir; @@ -137,7 +137,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea * @throws IOException If initialization failed. */ public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { - super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1); + super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1); uri = name; @@ -239,7 +239,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -446,13 +446,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea clientLog.logOpen(logId, path, PROXY, bufSize, size); - return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); + return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); } else return is; } else { - IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? + HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); long logId = -1; @@ -467,7 +467,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + ", bufSize=" + bufSize + ']'); - IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), + HadoopInputIgfsStream igfsIn = new HadoopInputIgfsStream(stream, stream.length(), bufSize, LOG, clientLog, logId); if (LOG.isDebugEnabled()) @@ -524,7 +524,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea else clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); } else return os; @@ -534,7 +534,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); // Create stream and close it in the 'finally' section if any sequential operation failed. - IgfsHadoopStreamDelegate stream; + HadoopIgfsStreamDelegate stream; long logId = -1; @@ -566,7 +566,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea assert stream != null; - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, + HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java new file mode 100644 index 0000000..6ee593e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Facade for communication with grid. + */ +public interface HadoopIgfs { + /** + * Perform handshake. + * + * @param logDir Log directory. + * @return Future with handshake result. + * @throws IgniteCheckedException If failed. + */ + public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; + + /** + * Close connection. + * + * @param force Force flag. + */ + public void close(boolean force); + + /** + * Command to retrieve file info for some IGFS path. + * + * @param path Path to get file info for. + * @return Future for info operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to update file properties. + * + * @param path IGFS path to update properties. + * @param props Properties to update. + * @return Future for update operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Sets last access time and last modification time for a file. + * + * @param path Path to update times. + * @param accessTime Last access time to set. + * @param modificationTime Last modification time to set. + * @throws IgniteCheckedException If failed. + */ + public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, + IOException; + + /** + * Command to rename given path. + * + * @param src Source path. + * @param dest Destination path. + * @return Future for rename operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; + + /** + * Command to delete given path. + * + * @param path Path to delete. + * @param recursive {@code True} if deletion is recursive. + * @return Future for delete operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; + + /** + * Command to get affinity for given path, offset and length. + * + * @param path Path to get affinity for. + * @param start Start position (offset). + * @param len Data length. + * @return Future for affinity command. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, + IOException; + + /** + * Gets path summary. + * + * @param path Path to get summary for. + * @return Future that will be completed when summary is received. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to create directories. + * + * @param path Path to create. + * @return Future for mkdirs operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Command to get list of files in directory. + * + * @param path Path to list. + * @return Future for listFiles operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to get directory listing. + * + * @param path Path to list. + * @return Future for listPaths operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Performs status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, + IOException; + + /** + * Command to create file and open it for output. + * + * @param path Path to file. + * @param overwrite If {@code true} then old file contents will be lost. + * @param colocate If {@code true} and called on data node, file will be written on that node. + * @param replication Replication factor. + * @param props File properties for creation. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Open file for output appending data to the end of a file. + * + * @param path Path to file. + * @param create If {@code true}, file will be created if does not exist. + * @param props File properties. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java new file mode 100644 index 0000000..ecaa61f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; + +/** + * Communication exception indicating a problem between file system and IGFS instance. + */ +public class HadoopIgfsCommunicationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given throwable as a nested cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public HadoopIgfsCommunicationException(Exception cause) { + super(cause); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + */ + public HadoopIgfsCommunicationException(String msg) { + super(msg); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + * @param cause Cause. + */ + public HadoopIgfsCommunicationException(String msg, Exception cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java new file mode 100644 index 0000000..dc8fcb8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * IGFS endpoint abstraction. + */ +public class HadoopIgfsEndpoint { + /** Localhost. */ + public static final String LOCALHOST = "127.0.0.1"; + + /** IGFS name. */ + private final String igfsName; + + /** Grid name. */ + private final String gridName; + + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Normalize IGFS URI. + * + * @param uri URI. + * @return Normalized URI. + * @throws IOException If failed. + */ + public static URI normalize(URI uri) throws IOException { + try { + if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme())) + throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); + + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority()); + + StringBuilder sb = new StringBuilder(); + + if (endpoint.igfs() != null) + sb.append(endpoint.igfs()); + + if (endpoint.grid() != null) + sb.append(":").append(endpoint.grid()); + + return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), + uri.getPath(), uri.getQuery(), uri.getFragment()); + } + catch (URISyntaxException | IgniteCheckedException e) { + throw new IOException("Failed to normalize URI: " + uri, e); + } + } + + /** + * Constructor. + * + * @param connStr Connection string. + * @throws IgniteCheckedException If failed to parse connection string. + */ + public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException { + if (connStr == null) + connStr = ""; + + String[] tokens = connStr.split("@", -1); + + IgniteBiTuple<String, Integer> hostPort; + + if (tokens.length == 1) { + igfsName = null; + gridName = null; + + hostPort = hostPort(connStr, connStr); + } + else if (tokens.length == 2) { + String authStr = tokens[0]; + + if (authStr.isEmpty()) { + gridName = null; + igfsName = null; + } + else { + String[] authTokens = authStr.split(":", -1); + + igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; + + if (authTokens.length == 1) + gridName = null; + else if (authTokens.length == 2) + gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + } + + hostPort = hostPort(connStr, tokens[1]); + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + host = hostPort.get1(); + + assert hostPort.get2() != null; + + port = hostPort.get2(); + } + + /** + * Parse host and port. + * + * @param connStr Full connection string. + * @param hostPortStr Host/port connection string part. + * @return Tuple with host and port. + * @throws IgniteCheckedException If failed to parse connection string. + */ + private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { + String[] tokens = hostPortStr.split(":", -1); + + String host = tokens[0]; + + if (F.isEmpty(host)) + host = LOCALHOST; + + int port; + + if (tokens.length == 1) + port = DFLT_IPC_PORT; + else if (tokens.length == 2) { + String portStr = tokens[1]; + + try { + port = Integer.valueOf(portStr); + + if (port < 0 || port > 65535) + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + catch (NumberFormatException e) { + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + return F.t(host, port); + } + + /** + * @return IGFS name. + */ + @Nullable public String igfs() { + return igfsName; + } + + /** + * @return Grid name. + */ + @Nullable public String grid() { + return gridName; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Host. + */ + public boolean isLocal() { + return F.eq(LOCALHOST, host); + } + + /** + * @return Port. + */ + public int port() { + return port; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsEndpoint.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java new file mode 100644 index 0000000..5321fa3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Extended IGFS server interface. + */ +public interface HadoopIgfsEx extends HadoopIgfs { + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param delegate Stream delegate. + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param delegate Stream delegate. + */ + public void removeEventListener(HadoopIgfsStreamDelegate delegate); + + /** + * Asynchronously reads specified amount of bytes from opened input stream. + * + * @param delegate Stream delegate. + * @param pos Position to read from. + * @param len Data length to read. + * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining + * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will + * be the result of read future. + * @param outOff Output offset. + * @param outLen Output length. + * @return Read data. + */ + public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable final byte[] outBuf, final int outOff, final int outLen); + + /** + * Writes data to the stream with given streamId. This method does not return any future since + * no response to write request is sent. + * + * @param delegate Stream delegate. + * @param data Data to write. + * @param off Offset. + * @param len Length. + * @throws IOException If failed. + */ + public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; + + /** + * Close server stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * Flush output stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java new file mode 100644 index 0000000..9ae0161 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +/** + * IGFS client future that holds response parse closure. + */ +public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> { + /** Output buffer. */ + private byte[] outBuf; + + /** Output offset. */ + private int outOff; + + /** Output length. */ + private int outLen; + + /** Read future flag. */ + private boolean read; + + /** + * @return Output buffer. + */ + public byte[] outputBuffer() { + return outBuf; + } + + /** + * @param outBuf Output buffer. + */ + public void outputBuffer(@Nullable byte[] outBuf) { + this.outBuf = outBuf; + } + + /** + * @return Offset in output buffer to write from. + */ + public int outputOffset() { + return outOff; + } + + /** + * @param outOff Offset in output buffer to write from. + */ + public void outputOffset(int outOff) { + this.outOff = outOff; + } + + /** + * @return Length to write to output buffer. + */ + public int outputLength() { + return outLen; + } + + /** + * @param outLen Length to write to output buffer. + */ + public void outputLength(int outLen) { + this.outLen = outLen; + } + + /** + * @param read {@code True} if this is a read future. + */ + public void read(boolean read) { + this.read = read; + } + + /** + * @return {@code True} if this is a read future. + */ + public boolean read() { + return read; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java new file mode 100644 index 0000000..b0da50d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Communication with grid in the same process. + */ +public class HadoopIgfsInProc implements HadoopIgfsEx { + /** Target IGFS. */ + private final IgfsEx igfs; + + /** Buffer size. */ + private final int bufSize; + + /** Event listeners. */ + private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> lsnrs = + new ConcurrentHashMap<>(); + + /** Logger. */ + private final Log log; + + /** + * Constructor. + * + * @param igfs Target IGFS. + * @param log Log. + */ + public HadoopIgfsInProc(IgfsEx igfs, Log log) { + this.igfs = igfs; + this.log = log; + + bufSize = igfs.configuration().getBlockSize() * 2; + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) { + igfs.clientLogDirectory(logDir); + + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + // Perform cleanup. + for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to notify stream event listener", e); + } + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + try { + return igfs.info(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + try { + return igfs.update(path, props); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + try { + igfs.setTimes(path, accessTime, modificationTime); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + try { + igfs.rename(src, dest); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); + } + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + try { + return igfs.delete(path, recursive); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + try { + return igfs.globalSpace(); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + + "stopping."); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + try { + return igfs.listPaths(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + try { + return igfs.listFiles(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + try { + igfs.mkdirs(path, props); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + try { + return igfs.summary(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + try { + return igfs.affinity(path, start, len); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + try { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + + return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + throws IgniteCheckedException { + try { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + + return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + try { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + + return new HadoopIgfsStreamDelegate(this, stream); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + try { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + + return new HadoopIgfsStreamDelegate(this, stream); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable byte[] outBuf, int outOff, int outLen) { + IgfsInputStreamAdapter stream = delegate.target(); + + try { + byte[] res = null; + + if (outBuf != null) { + int outTailLen = outBuf.length - outOff; + + if (len <= outTailLen) + stream.readFully(pos, outBuf, outOff, len); + else { + stream.readFully(pos, outBuf, outOff, outTailLen); + + int remainderLen = len - outTailLen; + + res = new byte[remainderLen]; + + stream.readFully(pos, res, 0, remainderLen); + } + } else { + res = new byte[len]; + + stream.readFully(pos, res, 0, len); + } + + return new GridPlainFutureAdapter<>(res); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + return new GridPlainFutureAdapter<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) + throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.write(data, off, len); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.flush(); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { + Closeable closeable = desc.target(); + + try { + closeable.close(); + } + catch (IllegalStateException e) { + throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsStreamDelegate delegate, + HadoopIgfsStreamEventListener lsnr) { + HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [delegate=" + delegate + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) { + HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [delegate=" + delegate + ']'); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java new file mode 100644 index 0000000..775e7d0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +/** + * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response + * and request without response. + */ +public interface HadoopIgfsIo { + /** + * Sends given IGFS client message and asynchronously awaits for response. + * + * @param msg Message to send. + * @return Future that will be completed. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Sends given IGFS client message and asynchronously awaits for response. When IO detects response + * beginning for given message it stops reading data and passes input stream to closure which can read + * response in a specific way. + * + * @param msg Message to send. + * @param outBuf Output buffer. If {@code null}, the output buffer is not used. + * @param outOff Output buffer offset. + * @param outLen Output buffer length. + * @return Future that will be completed when response is returned from closure. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) + throws IgniteCheckedException; + + /** + * Sends given message and does not wait for response. + * + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsIpcIoListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param lsnr Event listener. + */ + public void removeEventListener(HadoopIgfsIpcIoListener lsnr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java new file mode 100644 index 0000000..10d764e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +/** + * Listens to the events of {@link HadoopIpcIgfsIo}. + */ +public interface HadoopIgfsIpcIoListener { + /** + * Callback invoked when the IO is being closed. + */ + public void onClose(); + + /** + * Callback invoked when remote error occurs. + * + * @param streamId Stream ID. + * @param errMsg Error message. + */ + public void onError(long streamId, String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java new file mode 100644 index 0000000..1ba6e64 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +/** + * JCL logger wrapper for Hadoop. + */ +public class HadoopIgfsJclLogger implements IgniteLogger { + /** JCL implementation proxy. */ + private Log impl; + + /** + * Constructor. + * + * @param impl JCL implementation to use. + */ + HadoopIgfsJclLogger(Log impl) { + assert impl != null; + + this.impl = impl; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger getLogger(Object ctgr) { + return new HadoopIgfsJclLogger(LogFactory.getLog( + ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr))); + } + + /** {@inheritDoc} */ + @Override public void trace(String msg) { + impl.trace(msg); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + impl.debug(msg); + } + + /** {@inheritDoc} */ + @Override public void info(String msg) { + impl.info(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg) { + impl.warn(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg, @Nullable Throwable e) { + impl.warn(msg, e); + } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + impl.error(msg); + } + + /** {@inheritDoc} */ + @Override public boolean isQuiet() { + return !isInfoEnabled() && !isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public void error(String msg, @Nullable Throwable e) { + impl.error(msg, e); + } + + /** {@inheritDoc} */ + @Override public boolean isTraceEnabled() { + return impl.isTraceEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return impl.isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isInfoEnabled() { + return impl.isInfoEnabled(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String fileName() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "IgfsHadoopJclLogger [impl=" + impl + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java new file mode 100644 index 0000000..20c1d5d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; + +import java.util.*; + +import static org.apache.ignite.IgniteFs.*; + +/** + * Hadoop file system properties. + */ +public class HadoopIgfsProperties { + /** Username. */ + private String usrName; + + /** Group name. */ + private String grpName; + + /** Permissions. */ + private FsPermission perm; + + /** + * Constructor. + * + * @param props Properties. + * @throws IgniteException In case of error. + */ + public HadoopIgfsProperties(Map<String, String> props) throws IgniteException { + usrName = props.get(PROP_USER_NAME); + grpName = props.get(PROP_GROUP_NAME); + + String permStr = props.get(PROP_PERMISSION); + + if (permStr != null) { + try { + perm = new FsPermission((short)Integer.parseInt(permStr, 8)); + } + catch (NumberFormatException ignore) { + throw new IgniteException("Permissions cannot be parsed: " + permStr); + } + } + } + + /** + * Get user name. + * + * @return User name. + */ + public String userName() { + return usrName; + } + + /** + * Get group name. + * + * @return Group name. + */ + public String groupName() { + return grpName; + } + + /** + * Get permission. + * + * @return Permission. + */ + public FsPermission permission() { + return perm; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java new file mode 100644 index 0000000..cfc6949 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system input stream wrapper. + */ +public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable { + /** Actual input stream to the secondary file system. */ + private final FSDataInputStream is; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param is Actual input stream to the secondary file system. + * @param clientLog Client log. + */ + public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { + assert is != null; + assert clientLog != null; + + this.is = is; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b) throws IOException { + readStart(); + + int res; + + try { + res = is.read(b); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = super.read(b, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + readStart(); + + long res; + + try { + res = is.skip(n); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + readStart(); + + try { + return is.available(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + readStart(); + + try { + is.close(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + readStart(); + + try { + is.mark(readLimit); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + readStart(); + + try { + is.reset(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean markSupported() { + readStart(); + + try { + return is.markSupported(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + readStart(); + + int res; + + try { + res = is.read(); + } + finally { + readEnd(); + } + + if (res != -1) + total++; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = is.read(pos, buf, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + try { + is.readFully(pos, buf, off, len); + } + finally { + readEnd(); + } + + total += len; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readStart(); + + try { + is.readFully(pos, buf); + } + finally { + readEnd(); + } + + total += buf.length; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + readStart(); + + try { + is.seek(pos); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + readStart(); + + try { + return is.getPos(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { + readStart(); + + try { + return is.seekToNewSource(targetPos); + } + finally { + readEnd(); + } + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java new file mode 100644 index 0000000..a7266c4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system output stream wrapper. + */ +public class HadoopIgfsProxyOutputStream extends OutputStream { + /** Actual output stream. */ + private FSDataOutputStream os; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param os Actual output stream. + * @param clientLog Client logger. + * @param logStreamId Log stream ID. + */ + public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { + assert os != null; + assert clientLog != null; + + this.os = os; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total++; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total += b.length; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + writeStart(); + + try { + os.write(b, off, len); + } + finally { + writeEnd(); + } + + total += len; + } + + /** {@inheritDoc} */ + @Override public synchronized void flush() throws IOException { + writeStart(); + + try { + os.flush(); + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + writeStart(); + + try { + os.close(); + } + finally { + writeEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + } + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java new file mode 100644 index 0000000..17755dc --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly + * requested. + * <p> + * The class is expected to be used only from synchronized context and therefore is not tread-safe. + */ +public class HadoopIgfsReader implements IgfsReader { + /** Secondary file system. */ + private final FileSystem fs; + + /** Path to the file to open. */ + private final Path path; + + /** Buffer size. */ + private final int bufSize; + + /** Actual input stream. */ + private FSDataInputStream in; + + /** Cached error occurred during output stream open. */ + private IOException err; + + /** Flag indicating that the stream was already opened. */ + private boolean opened; + + /** + * Constructor. + * + * @param fs Secondary file system. + * @param path Path to the file to open. + * @param bufSize Buffer size. + */ + public HadoopIgfsReader(FileSystem fs, Path path, int bufSize) { + assert fs != null; + assert path != null; + + this.fs = fs; + this.path = path; + this.bufSize = bufSize; + } + + /** Get input stream. */ + private PositionedReadable in() throws IOException { + if (opened) { + if (err != null) + throw err; + } + else { + opened = true; + + try { + in = fs.open(path, bufSize); + + if (in == null) + throw new IOException("Failed to open input stream (file system returned null): " + path); + } + catch (IOException e) { + err = e; + + throw err; + } + } + + return in; + } + + /** + * Close wrapped input stream in case it was previously opened. + */ + @Override public void close() { + U.closeQuiet(in); + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + return in().read(pos, buf, off, len); + } +}