Repository: incubator-ignite Updated Branches: refs/heads/ignite-26 c1ac5a5e6 -> 367ab94bb
# ignite-26 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/367ab94b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/367ab94b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/367ab94b Branch: refs/heads/ignite-26 Commit: 367ab94bbc34255c81de43b622b86c14de4b9ea9 Parents: c1ac5a5 Author: sboikov <sboi...@gridgain.com> Authored: Mon Feb 2 13:43:02 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Feb 2 13:43:02 2015 +0300 ---------------------------------------------------------------------- .../fs/mapreduce/IgniteFsRecordResolver.java | 4 +- .../ignite/fs/mapreduce/IgniteFsTask.java | 8 +- .../IgniteFsByteDelimiterRecordResolver.java | 2 +- .../IgniteFsFixedLengthRecordResolver.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 8 +- .../internal/processors/fs/GridGgfsImpl.java | 3 +- .../processors/fs/GridGgfsIpcHandler.java | 171 ++++++++++--------- .../internal/processors/fs/GridGgfsJobImpl.java | 8 +- .../processors/fs/GridGgfsMetaManager.java | 112 ++++++------ .../processors/fs/IgniteFsProcessor.java | 4 +- .../internal/visor/util/VisorTaskUtils.java | 2 +- .../fs/hadoop/GridGgfsHadoopInProc.java | 43 +++++ .../fs/hadoop/GridGgfsHadoopWrapper.java | 10 +- .../GridHadoopDefaultMapReducePlanner.java | 10 +- 14 files changed, 230 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java index 8d2c07e..c34c304 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java @@ -44,13 +44,13 @@ public interface IgniteFsRecordResolver extends Serializable { /** * Adjusts record start offset and length. * - * @param ggfs GGFS instance to use. + * @param fs IgniteFs instance to use. * @param stream Input stream for split file. * @param suggestedRecord Suggested file system record. * @return New adjusted record. If this method returns {@code null}, original record is ignored. * @throws IgniteException If resolve failed. * @throws IOException If resolve failed. */ - @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + @Nullable public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream, IgniteFsFileRange suggestedRecord) throws IgniteException, IOException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java index fe57135..1005194 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java @@ -88,7 +88,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask assert ignite != null; assert args != null; - IgniteFs ggfs = ignite.fileSystem(args.ggfsName()); + IgniteFs fs = ignite.fileSystem(args.ggfsName()); IgniteFsProcessorAdapter ggfsProc = ((IgniteKernal) ignite).context().ggfs(); Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); @@ -96,7 +96,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); for (IgniteFsPath path : args.paths()) { - IgniteFsFile file = ggfs.info(path); + IgniteFsFile file = fs.info(path); if (file == null) { if (args.skipNonExistentFiles()) @@ -105,7 +105,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask throw new IgniteException("Failed to process IgniteFs file because it doesn't exist: " + path); } - Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength()); + Collection<IgniteFsBlockLocation> aff = fs.affinity(path, 0, file.length(), args.maxRangeLength()); long totalLen = 0; @@ -126,7 +126,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); if (job != null) { - ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(), + ComputeJob jobImpl = ggfsProc.createJob(job, fs.name(), file.path(), loc.start(), loc.length(), args.recordResolver()); splitMap.put(jobImpl, node); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java index f959438..75228fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java @@ -77,7 +77,7 @@ public class IgniteFsByteDelimiterRecordResolver implements IgniteFsRecordResolv } /** {@inheritDoc} */ - @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + @Override public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream, IgniteFsFileRange suggestedRecord) throws IgniteException, IOException { long suggestedStart = suggestedRecord.start(); long suggestedEnd = suggestedStart + suggestedRecord.length(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java index 76d9e84..e3c64d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java @@ -52,7 +52,7 @@ public class IgniteFsFixedLengthRecordResolver implements IgniteFsRecordResolver } /** {@inheritDoc} */ - @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + @Override public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream, IgniteFsFileRange suggestedRecord) throws IgniteException, IOException { long suggestedEnd = suggestedRecord.start() + suggestedRecord.length(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index ab5f843..c0577b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3074,12 +3074,12 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit guard(); try{ - IgniteFs ggfs = ctx.ggfs().ggfs(name); + IgniteFs fs = ctx.ggfs().ggfs(name); - if (ggfs == null) - throw new IllegalArgumentException("GGFS is not configured: " + name); + if (fs == null) + throw new IllegalArgumentException("IgniteFs is not configured: " + name); - return ggfs; + return fs; } finally { unguard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java index 2dc2980..63f3e86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java @@ -1171,7 +1171,6 @@ public final class GridGgfsImpl implements GridGgfsEx { * @param props Properties. * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. * @return Output stream. - * @throws IgniteCheckedException If file creation failed. */ private IgniteFsOutputStream create0( final IgniteFsPath path, @@ -1196,7 +1195,7 @@ public final class GridGgfsImpl implements GridGgfsEx { GridGgfsFileWorkerBatch batch = null; if (mode == PROXY) - throw new IgniteCheckedException("PROXY mode cannot be used in GGFS directly: " + path); + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); else if (mode != PRIMARY) { assert mode == DUAL_SYNC || mode == DUAL_ASYNC; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java index dd9a13c..a4218a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java @@ -67,6 +67,9 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler { /** * Constructs GGFS IPC handler. + * + * @param ggfsCtx Context. + * @param mgmt Management connection flag. */ GridGgfsIpcHandler(GridGgfsContext ggfsCtx, boolean mgmt) { assert ggfsCtx != null; @@ -228,6 +231,7 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler { * Processes status request. * * @return Status response. + * @throws IgniteCheckedException If failed. */ private GridGgfsMessage processStatusRequest() throws IgniteCheckedException { GridGgfsStatus status = ggfs.globalSpace(); @@ -257,127 +261,132 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler { GridGgfsControlResponse res = new GridGgfsControlResponse(); - switch (cmd) { - case EXISTS: - res.response(ggfs.exists(req.path())); + try { + switch (cmd) { + case EXISTS: + res.response(ggfs.exists(req.path())); - break; + break; - case INFO: - res.response(ggfs.info(req.path())); + case INFO: + res.response(ggfs.info(req.path())); - break; + break; - case PATH_SUMMARY: - res.response(ggfs.summary(req.path())); + case PATH_SUMMARY: + res.response(ggfs.summary(req.path())); - break; + break; - case UPDATE: - res.response(ggfs.update(req.path(), req.properties())); + case UPDATE: + res.response(ggfs.update(req.path(), req.properties())); - break; + break; - case RENAME: - ggfs.rename(req.path(), req.destinationPath()); + case RENAME: + ggfs.rename(req.path(), req.destinationPath()); - res.response(true); + res.response(true); - break; + break; - case DELETE: - res.response(ggfs.delete(req.path(), req.flag())); + case DELETE: + res.response(ggfs.delete(req.path(), req.flag())); - break; + break; - case MAKE_DIRECTORIES: - ggfs.mkdirs(req.path(), req.properties()); + case MAKE_DIRECTORIES: + ggfs.mkdirs(req.path(), req.properties()); - res.response(true); + res.response(true); - break; + break; - case LIST_PATHS: - res.paths(ggfs.listPaths(req.path())); + case LIST_PATHS: + res.paths(ggfs.listPaths(req.path())); - break; + break; - case LIST_FILES: - res.files(ggfs.listFiles(req.path())); + case LIST_FILES: + res.files(ggfs.listFiles(req.path())); - break; + break; - case SET_TIMES: - ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + case SET_TIMES: + ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); - res.response(true); + res.response(true); - break; + break; - case AFFINITY: - res.locations(ggfs.affinity(req.path(), req.start(), req.length())); + case AFFINITY: + res.locations(ggfs.affinity(req.path(), req.start(), req.length())); - break; + break; - case OPEN_READ: { - GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) : - ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + case OPEN_READ: { + GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) : + ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); - long streamId = registerResource(ses, ggfsIn); + long streamId = registerResource(ses, ggfsIn); - if (log.isDebugEnabled()) - log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null, - ggfsIn.fileInfo().modificationTime()); + GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null, + ggfsIn.fileInfo().modificationTime()); - res.response(new GridGgfsInputStreamDescriptor(streamId, info.length())); + res.response(new GridGgfsInputStreamDescriptor(streamId, info.length())); - break; - } + break; + } - case OPEN_CREATE: { - long streamId = registerResource(ses, ggfs.create( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Overwrite if exists. - affinityKey(req), // Affinity key based on replication factor. - req.replication(),// Replication factor. - req.blockSize(), // Block size. - req.properties() // File properties. - )); + case OPEN_CREATE: { + long streamId = registerResource(ses, ggfs.create( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Overwrite if exists. + affinityKey(req), // Affinity key based on replication factor. + req.replication(),// Replication factor. + req.blockSize(), // Block size. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - case OPEN_APPEND: { - long streamId = registerResource(ses, ggfs.append( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Create if absent. - req.properties() // File properties. - )); + case OPEN_APPEND: { + long streamId = registerResource(ses, ggfs.append( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Create if absent. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - default: - assert false : "Unhandled path control request command: " + cmd; + default: + assert false : "Unhandled path control request command: " + cmd; - break; + break; + } + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java index d1ff698..70ca713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java @@ -79,13 +79,13 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs /** {@inheritDoc} */ @Override public Object execute() { - IgniteFs ggfs = ignite.fileSystem(ggfsName); + IgniteFs fs = ignite.fileSystem(ggfsName); - try (IgniteFsInputStream in = ggfs.open(path)) { + try (IgniteFsInputStream in = fs.open(path)) { IgniteFsFileRange split = new IgniteFsFileRange(path, start, len); if (rslvr != null) { - split = rslvr.resolveRecords(ggfs, in, split); + split = rslvr.resolveRecords(fs, in, split); if (split == null) { log.warning("No data found for split on local node after resolver is applied " + @@ -97,7 +97,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs in.seek(split.start()); - return job.execute(ggfs, new IgniteFsFileRange(path, split.start(), split.length()), in); + return job.execute(fs, new IgniteFsFileRange(path, split.start(), split.length()), in); } catch (IOException e) { throw new IgniteException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java index b640703..dca0327 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java @@ -757,7 +757,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { IgniteUuid fileId = newFileInfo.id(); if (!id2InfoPrj.putxIfAbsent(fileId, newFileInfo)) - throw new IgniteFsException("Failed to add file details into cache: " + newFileInfo); + throw new IgniteCheckedException("Failed to add file details into cache: " + newFileInfo); assert metaCache.get(parentId) != null; @@ -974,7 +974,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert parentInfo.isDirectory(); if (!rmvLocked && fileInfo.lockId() != null) - throw new IgniteFsException("Failed to remove file (file is opened for writing) [fileName=" + + throw new IgniteCheckedException("Failed to remove file (file is opened for writing) [fileName=" + fileName + ", fileId=" + fileId + ", lockId=" + fileInfo.lockId() + ']'); // Validate own directory listing. @@ -1478,15 +1478,15 @@ public class GridGgfsMetaManager extends GridGgfsManager { GridGgfsFileInfo newInfo = c.apply(oldInfo); if (newInfo == null) - throw new IgniteFsException("Failed to update file info with null value" + + throw new IgniteCheckedException("Failed to update file info with null value" + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); if (!oldInfo.id().equals(newInfo.id())) - throw new IgniteFsException("Failed to update file info (file IDs differ)" + + throw new IgniteCheckedException("Failed to update file info (file IDs differ)" + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); if (oldInfo.isDirectory() != newInfo.isDirectory()) - throw new IgniteFsException("Failed to update file info (file types differ)" + + throw new IgniteCheckedException("Failed to update file info (file types differ)" + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); boolean b = metaCache.replace(fileId, oldInfo, newInfo); @@ -1588,10 +1588,17 @@ public class GridGgfsMetaManager extends GridGgfsManager { * @return Output stream descriptor. * @throws IgniteCheckedException If file creation failed. */ - public GridGgfsSecondaryOutputStreamDescriptor createDual(final IgniteFsFileSystem fs, final IgniteFsPath path, - final boolean simpleCreate, @Nullable final Map<String, String> props, final boolean overwrite, final int bufSize, - final short replication, final long blockSize, final IgniteUuid affKey) - throws IgniteCheckedException { + public GridGgfsSecondaryOutputStreamDescriptor createDual(final IgniteFsFileSystem fs, + final IgniteFsPath path, + final boolean simpleCreate, + @Nullable final Map<String, String> props, + final boolean overwrite, + final int bufSize, + final short replication, + final long blockSize, + final IgniteUuid affKey) + throws IgniteCheckedException + { if (busyLock.enterBusy()) { try { assert fs != null; @@ -1651,10 +1658,10 @@ public class GridGgfsMetaManager extends GridGgfsManager { IgniteFsFile status = fs.info(path); if (status == null) - throw new IgniteFsException("Failed to open output stream to the file created in " + + throw new IgniteCheckedException("Failed to open output stream to the file created in " + "the secondary file system because it no longer exists: " + path); else if (status.isDirectory()) - throw new IgniteFsException("Failed to open output stream to the file created in " + + throw new IgniteCheckedException("Failed to open output stream to the file created in " + "the secondary file system because the path points to a directory: " + path); GridGgfsFileInfo newInfo = new GridGgfsFileInfo(status.blockSize(), status.length(), affKey, @@ -1715,11 +1722,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" + bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err); - if (err instanceof IgniteFsException) - throw (IgniteFsException)err; - else - throw new IgniteFsException("Failed to create the file due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to create the file due to secondary file system " + + "exception: " + path, err); } }; @@ -1765,7 +1769,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { GridGgfsFileInfo info = infos.get(path); if (info.isDirectory()) - throw new IgniteFsException("Failed to open output stream to the file in the " + + throw new IgniteCheckedException("Failed to open output stream to the file in the " + "secondary file system because the path points to a directory: " + path); out = fs.append(path, bufSize, false, null); @@ -1804,11 +1808,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + ']', err); - if (err instanceof IgniteFsException) - throw (IgniteFsException)err; - else - throw new IgniteCheckedException("Failed to append to the file due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to append to the file due to secondary file system " + + "exception: " + path, err); } }; @@ -1869,11 +1870,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { U.error(log, "File open in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + ']', err); - if (err instanceof IgniteFsException) - throw (IgniteCheckedException)err; - else - throw new IgniteCheckedException("Failed to open the path due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to open the path due to secondary file system " + + "exception: " + path, err); } }; @@ -1917,11 +1915,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { } @Override public GridGgfsFileInfo onFailure(@Nullable Exception err) throws IgniteCheckedException { - if (err instanceof IgniteFsException) - throw (IgniteCheckedException)err; - else - throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " + - "exception: " + path, err); + throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " + + "exception: " + path, err); } }; @@ -2072,7 +2067,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { else { // Move. if (destInfo.isFile()) - throw new IgniteFsException("Failed to rename the path in the local file system " + + throw new IgniteCheckedException("Failed to rename the path in the local file system " + "because destination path already exists and it is a file: " + dest); else moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), src.name(), destInfo.id()); @@ -2097,11 +2092,8 @@ public class GridGgfsMetaManager extends GridGgfsManager { U.error(log, "Path rename in DUAL mode failed [source=" + src + ", destination=" + dest + ']', err); - if (err instanceof IgniteFsException) - throw (IgniteCheckedException)err; - else - throw new IgniteCheckedException("Failed to rename the path due to secondary file system " + - "exception: " + src, err); + throw new IgniteCheckedException("Failed to rename the path due to secondary file system " + + "exception: " + src, err); } }; @@ -2250,9 +2242,14 @@ public class GridGgfsMetaManager extends GridGgfsManager { * @return File info of the end path. * @throws IgniteCheckedException If failed. */ - private GridGgfsFileInfo synchronize(IgniteFsFileSystem fs, IgniteFsPath startPath, GridGgfsFileInfo startPathInfo, - IgniteFsPath endPath, boolean strict, @Nullable Map<IgniteFsPath, GridGgfsFileInfo> created) - throws IgniteCheckedException { + private GridGgfsFileInfo synchronize(IgniteFsFileSystem fs, + IgniteFsPath startPath, + GridGgfsFileInfo startPathInfo, + IgniteFsPath endPath, + boolean strict, + @Nullable Map<IgniteFsPath, GridGgfsFileInfo> created) + throws IgniteCheckedException + { assert fs != null; assert startPath != null && startPathInfo != null && endPath != null; @@ -2272,7 +2269,14 @@ public class GridGgfsMetaManager extends GridGgfsManager { parentInfo = created.get(curPath); else { // Get file status from the secondary file system. - IgniteFsFile status = fs.info(curPath); + IgniteFsFile status; + + try { + status = fs.info(curPath); + } + catch (IgniteException e) { + throw new IgniteCheckedException("Failed to get path information: " + e, e); + } if (status != null) { if (!status.isDirectory() && !curPath.equals(endPath)) @@ -2322,9 +2326,12 @@ public class GridGgfsMetaManager extends GridGgfsManager { * @return Result of task execution. * @throws IgniteCheckedException If failed. */ - private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgniteFsFileSystem fs, boolean strict, + private <T> T synchronizeAndExecute(SynchronizationTask<T> task, + IgniteFsFileSystem fs, + boolean strict, IgniteFsPath... paths) - throws IgniteCheckedException { + throws IgniteCheckedException + { return synchronizeAndExecute(task, fs, strict, null, paths); } @@ -2340,8 +2347,13 @@ public class GridGgfsMetaManager extends GridGgfsManager { * @return Result of task execution. * @throws IgniteCheckedException If failed. */ - private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgniteFsFileSystem fs, boolean strict, - @Nullable Collection<IgniteUuid> extraLockIds, IgniteFsPath... paths) throws IgniteCheckedException { + private <T> T synchronizeAndExecute(SynchronizationTask<T> task, + IgniteFsFileSystem fs, + boolean strict, + @Nullable Collection<IgniteUuid> extraLockIds, + IgniteFsPath... paths) + throws IgniteCheckedException + { assert task != null; assert fs != null; assert paths != null && paths.length > 0; @@ -2477,8 +2489,12 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert firstParentPath != null; assert pathToId.get(firstParentPath) != null; - GridGgfsFileInfo info = synchronize(fs, firstParentPath, - idToInfo.get(pathToId.get(firstParentPath)), path, strict, created); + GridGgfsFileInfo info = synchronize(fs, + firstParentPath, + idToInfo.get(pathToId.get(firstParentPath)), + path, + strict, + created); assert strict && info != null || !strict; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java index 8087d11..707dc61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java @@ -230,8 +230,8 @@ public class IgniteFsProcessor extends IgniteFsProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, IgniteFsRecordResolver recRslv) { - return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); + long start, long len, IgniteFsRecordResolver recRslv) { + return new GridGgfsJobImpl(job, ggfsName, path, start, len, recRslv); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index 0b6b479..f43bea8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -616,7 +616,7 @@ public class VisorTaskUtils { String logsDir; if (ggfs instanceof GridGgfsEx) - logsDir = ((GridGgfsEx) ggfs).clientLogDirectory(); + logsDir = ((GridGgfsEx)ggfs).clientLogDirectory(); else if (ggfs == null) throw new IgniteCheckedException("Failed to get profiler log folder (GGFS instance not found)"); else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java index 19ec955..b229b09 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java @@ -49,6 +49,7 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { * Constructor. * * @param ggfs Target GGFS. + * @param log Log. */ public GridGgfsHadoopInProc(GridGgfsEx ggfs, Log log) { this.ggfs = ggfs; @@ -84,6 +85,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.info(path); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path); } @@ -94,6 +98,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.update(path, props); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path); } @@ -106,6 +113,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return true; } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " + path); @@ -119,6 +129,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return true; } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src); } @@ -129,6 +142,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.delete(path, recursive); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path); } @@ -150,6 +166,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.listPaths(path); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path); } @@ -160,6 +179,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.listFiles(path); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path); } @@ -172,6 +194,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return true; } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " + path); @@ -183,6 +208,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.summary(path); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " + path); @@ -195,6 +223,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { try { return ggfs.affinity(path, start, len); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path); } @@ -207,6 +238,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return new GridGgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); } @@ -220,6 +254,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return new GridGgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); } @@ -234,6 +271,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return new GridGgfsHadoopStreamDelegate(this, stream); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path); } @@ -247,6 +287,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx { return new GridGgfsHadoopStreamDelegate(this, stream); } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } catch (IllegalStateException e) { throw new GridGgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java index 71a3d81..c597899 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java @@ -339,7 +339,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { try { Ignite ignite = G.ignite(); - ggfs = (GridGgfsEx) ignite.fileSystem(endpoint.ggfs()); + ggfs = (GridGgfsEx)ignite.fileSystem(endpoint.ggfs()); } catch (Exception e) { err = e; @@ -348,7 +348,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { else { for (Ignite ignite : G.allGrids()) { try { - ggfs = (GridGgfsEx) ignite.fileSystem(endpoint.ggfs()); + ggfs = (GridGgfsEx)ignite.fileSystem(endpoint.ggfs()); break; } @@ -401,9 +401,9 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { } // 4. Try local TCP connection. - boolean skipLocalTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - if (!skipLocalTcp) { + if (!skipLocTcp) { if (curDelegate == null) { GridGgfsHadoopEx hadoop = null; @@ -426,7 +426,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop { } // 5. Try remote TCP connection. - if (curDelegate == null && (skipLocalTcp || !F.eq(LOCALHOST, endpoint.host()))) { + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { GridGgfsHadoopEx hadoop = null; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java index d98d8f4..fcb4bd6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java @@ -161,8 +161,14 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla ggfs = (GridGgfsEx)((IgniteEx)ignite).ggfsx(endpoint.ggfs()); if (ggfs != null && !ggfs.isProxy(split0.file())) { - Collection<IgniteFsBlockLocation> blocks = ggfs.affinity(new IgniteFsPath(split0.file()), - split0.start(), split0.length()); + Collection<IgniteFsBlockLocation> blocks; + + try { + blocks = ggfs.affinity(new IgniteFsPath(split0.file()), split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } assert blocks != null;